Add infrastructure to capture and store logs: stdout, stderr, python logs, and tracebacks [ENG-264]#89
Conversation
There was a problem hiding this comment.
Pull request overview
Introduces end-to-end observability for packet-function execution by capturing stdout/stderr/Python logging/tracebacks into a CapturedLogs payload, threading it through packet-function and executor APIs, and persisting structured per-packet log rows via a new observer/logger protocol.
Changes:
- Add
CapturedLogs+ capture infrastructure (local ContextVar tee streams; Ray fd-level capture wrapper) and propagate captured logs through packet-function/executor return types. - Add observability protocols plus
LoggingObserver/PacketLoggerto persist per-packet logs to anArrowDatabaseProtocol, with pipeline-path-mirrored storage and tag keys as queryable columns. - Update orchestrators/nodes and a broad set of tests to support new observer hooks (
on_run_start/end,on_packet_crash) and new tuple return signatures.
Reviewed changes
Copilot reviewed 36 out of 36 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_pipeline/test_sync_orchestrator.py | Updates test observers to implement new observer hooks and packet logger factory. |
| tests/test_pipeline/test_pipeline.py | Adjusts mock executor return types to match (result, captured) API. |
| tests/test_pipeline/test_orchestrator.py | Updates async error-handling expectations and adds on_packet_crash test coverage. |
| tests/test_pipeline/test_observer.py | Expands NoOp observer/logger protocol conformance tests. |
| tests/test_pipeline/test_node_protocols.py | Updates protocol-satisfaction tests for new observer requirements. |
| tests/test_pipeline/test_logging_observer_integration.py | Adds integration tests verifying log capture, storage layout, and tag columns. |
| tests/test_pipeline/test_logging_capture.py | Adds unit tests for local tee capture and Ray wrapper capture behavior. |
| tests/test_core/test_result_cache.py | Updates direct call usage to unpack (output, captured). |
| tests/test_core/test_regression_fixes.py | Adjusts regression tests for new exception-swallowing behavior and updated Ray async execution API. |
| tests/test_core/packet_function/test_packet_function.py | Updates packet function call/async call tests for tuple return types. |
| tests/test_core/packet_function/test_executor.py | Updates executor protocol tests for (result, CapturedLogs) return values. |
| tests/test_core/packet_function/test_cached_packet_function.py | Updates cached packet function tests for tuple return values. |
| tests/test_channels/test_node_async_execute.py | Updates channel-based async execution tests for tuple return values. |
| tests/test_channels/test_copilot_review_issues.py | Updates tests to unpack tuple return values from direct_call. |
| tests/test_channels/test_async_execute.py | Updates async execution/error tests for (None, captured) failure semantics. |
| src/orcapod/protocols/observability_protocols.py | Adds runtime-checkable protocols for observers and per-packet loggers. |
| src/orcapod/protocols/node_protocols.py | Switches node observer typing to ExecutionObserverProtocol and adds error_policy to sync function-node execute protocol. |
| src/orcapod/protocols/core_protocols/packet_function.py | Updates packet function protocol signatures to return `(packet |
| src/orcapod/protocols/core_protocols/executor.py | Updates executor protocol signatures to return (result, CapturedLogs) tuples and documents capture semantics. |
| src/orcapod/protocols/init.py | Re-exports observability protocols from orcapod.protocols. |
| src/orcapod/pipeline/sync_orchestrator.py | Adds run_id lifecycle hooks and plumbs error_policy into function node execution. |
| src/orcapod/pipeline/observer.py | Replaces legacy observer protocol with NoOpObserver/NoOpLogger implementations of the new protocols. |
| src/orcapod/pipeline/logging_observer.py | Adds LoggingObserver and PacketLogger to persist structured execution logs to an Arrow DB. |
| src/orcapod/pipeline/logging_capture.py | Adds CapturedLogs, ContextVar tee streams, logging handler, and LocalCaptureContext. |
| src/orcapod/pipeline/async_orchestrator.py | Adds run lifecycle hooks and an error_policy parameter (currently stored but not applied). |
| src/orcapod/pipeline/init.py | Exposes LoggingObserver/PacketLogger in pipeline public exports. |
| src/orcapod/core/packet_function.py | Implements tuple-return execution semantics, local capture, and exception swallowing in PythonPacketFunction/wrappers/cached PFs. |
| src/orcapod/core/nodes/source_node.py | Updates observer typing import to new protocol. |
| src/orcapod/core/nodes/operator_node.py | Updates observer typing import to new protocol. |
| src/orcapod/core/nodes/function_node.py | Records captured logs via observer-created packet loggers; adds error_policy handling for sync execution and on_packet_crash hooks. |
| src/orcapod/core/function_pod.py | Adds process_packet_with_capture variants to return captured logs alongside results. |
| src/orcapod/core/executors/ray.py | Unifies Ray capture via _make_capture_wrapper() and returns (raw, CapturedLogs) from callable execution. |
| src/orcapod/core/executors/local.py | Adds local capture to execute_callable/async_execute_callable, returning (raw, CapturedLogs) and swallowing exceptions. |
| src/orcapod/core/executors/base.py | Updates base executor signatures to return (raw, CapturedLogs) (default capture is empty). |
| src/orcapod/core/cached_function_pod.py | Adds caching variants that preserve CapturedLogs (empty on cache hits). |
| src/orcapod/init.py | Re-exports ArrowTableSource at top level. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| from uuid_utils import uuid7 | ||
|
|
| # Dynamic tag columns — each tag key becomes its own column | ||
| for key, value in self._tag_data.items(): | ||
| columns[key] = pa.array([str(value)], type=pa.large_utf8()) | ||
|
|
| effective_run_id = run_id or str(uuid.uuid4()) | ||
| if self._observer is not None: | ||
| self._observer.on_run_start(effective_run_id) | ||
|
|
| effective_run_id = run_id or str(uuid.uuid4()) | ||
| if self._observer is not None: | ||
| self._observer.on_run_start(effective_run_id) | ||
|
|
| ) -> None: | ||
| self._observer = observer | ||
| self._buffer_size = buffer_size | ||
| self._error_policy = error_policy |
| pkt_logger.record(captured) | ||
| if not captured.success: | ||
| if observer is not None: | ||
| observer.on_packet_crash( | ||
| self, | ||
| tag, | ||
| packet, | ||
| RuntimeError(captured.traceback or "packet function failed"), | ||
| ) |
eywalker
left a comment
There was a problem hiding this comment.
Let's discuss the design as 1) the latest commits on dev has incorporated notable changes in the executor protocol which will strongly influences the implementation in the PR and 2) I don't think it'd be a good idea for PacketFunction and FunctionPod to be extended to return a logging related object as part of its return signature. Not only this often leads to invocation returning an object (log) that needs to be ignored (many cases in tests use result, _log = packet_funciton.call(...) kind of signature to dismiss the log returned as second item), but it also places the burden (and trust) of correctly preparing the logging object within packet function and executor. Since "how" we handle logging should be decoupled from the action of logging itself, I think it'd make sense for us to continue with the dependency injection pattern, passing in logger object into the function pod -> packet function -> executor, and remove the return pathway. Let's discuss!
| from typing import TYPE_CHECKING, Protocol, runtime_checkable | ||
|
|
||
| if TYPE_CHECKING: | ||
| from orcapod.core.nodes import GraphNode |
There was a problem hiding this comment.
Let's not use concrete classes within protocols except for globally defined classes as found in orcapod.types
|
|
||
| if TYPE_CHECKING: | ||
| from orcapod.core.nodes import GraphNode | ||
| from orcapod.pipeline.logging_capture import CapturedLogs |
There was a problem hiding this comment.
Similarly, this should be defined in terms of protocols or add the class into orcapod.types
| if TYPE_CHECKING: | ||
| from orcapod.core.nodes import GraphNode | ||
| from orcapod.pipeline.logging_capture import CapturedLogs | ||
| from orcapod.protocols.core_protocols import PacketProtocol, TagProtocol |
There was a problem hiding this comment.
protocol import is cheap -- no need to place this under TYPE_CHECKING
|
|
||
| def on_packet_start( | ||
| self, | ||
| node: "GraphNode", |
There was a problem hiding this comment.
Is it really necessary for an observer to know how to work with a GraphNode directly? I'd rather that we use more lower-level interface that allows for Observer protocol to be more generically useful.
| FunctionPod, | ||
| function_pod, | ||
| ) | ||
| from .core.sources.arrow_table_source import ArrowTableSource |
There was a problem hiding this comment.
I've adjusted import in the latest dev branch so that the entire sources subpackage (as in orcapod.core.sources) is available in the root namespace of the package. Let's discuss what specific Python objects other than Pipeline, FunctionPod, function_pod, if any, should be made available in the package root scope.
Logging Infrastructure Overview
What it captures
Every time a packet function executes, the infrastructure captures stdout, stderr, Python logging output, and
tracebacks (on failure). These are bundled into a CapturedLogs dataclass.
How capture works
There are two capture mechanisms depending on where the function runs:
Local execution (LocalExecutor / no executor): LocalCaptureContext sets per-task/thread ContextVar buffers.
sys.stdout/sys.stderr are globally replaced (once, at observer creation) with ContextLocalTeeStream objects
that write to both the terminal and the active buffer. A ContextVarLoggingHandler on the root logger captures
Python logging the same way. Concurrent packets never intermingle because each asyncio task/thread gets its own
ContextVar copy.
Ray execution (RayExecutor): _make_capture_wrapper() returns a self-contained closure (no orcapod imports) that
does fd-level capture (os.dup2) on the worker process. It returns a plain 6-tuple over the Ray object store;
the driver reconstructs CapturedLogs from it.
How CapturedLogs flows through the call chain
User function
↓ catches exception, builds CapturedLogs
PythonPacketFunction.direct_call() → (PacketProtocol | None, CapturedLogs)
↓ or via executor.execute_callable() which also returns (raw, CapturedLogs)
PythonPacketFunction.call() → (PacketProtocol | None, CapturedLogs)
↓
CachedPacketFunction.call() → (PacketProtocol | None, CapturedLogs)
↓ (cache hit returns empty CapturedLogs)
_FunctionPodBase.process_packet_with_capture() → (Tag, Packet | None, CapturedLogs)
↓
FunctionNode._process_packet_internal() → (Tag, Packet | None, CapturedLogs)
↓
FunctionNode.execute() — passes CapturedLogs to pkt_logger.record()
The key design decision: CapturedLogs travels as a return value, not through a ContextVar side-channel.
direct_call() catches user-function exceptions internally and returns (None, captured_failure) — it never
re-raises.
Observer/Logger interaction
The orchestrator injects an ExecutionObserverProtocol into each node's execute() call. The observer provides
lifecycle hooks (on_run_start, on_node_start, on_packet_start, etc.) and a factory method:
observer.create_packet_logger(node, tag, packet, pipeline_path=...) → PacketExecutionLoggerProtocol
FunctionNode.execute() calls this factory before each non-cached packet, then after execution:
tag_out, result, captured = self._process_packet_internal(tag, packet)
pkt_logger.record(captured) # writes to database
Concrete implementations
(InMemoryArrowDatabase, DeltaTableDatabase, etc.).
Log storage structure
LoggingObserver stores logs at pipeline-path-mirrored locations. If a function node's pipeline path is
("my_pipeline", "transform_a", "v0", ...), its logs go to ("my_pipeline", "logs", "transform_a", "v0", ...).
Each function node gets its own table. Retrieve with obs.get_logs(pipeline_path=node.pipeline_path).
Each log row has fixed columns (log_id, run_id, node_label, stdout, stderr, python_logs, traceback, success,
timestamp) plus dynamic tag columns — each tag key becomes its own queryable column rather than being
JSON-encoded.
Changes to orcapod-python
Core change: call(), direct_call(), async_call(), direct_async_call() on packet functions now return
tuple[PacketProtocol | None, CapturedLogs] instead of PacketProtocol | None. Similarly
executor.execute()/async_execute() return tuples. direct_call() catches user-function exceptions internally and
returns (None, captured_failure) instead of re-raising.
execute_callable()
PacketFunctionWrapper return tuples
unchanged (still 2-tuple, discards captured)
more _captured_logs ContextVar reads. Simplified execute logic (no try/except needed)
streams)
~20 test files updated to unpack the new tuple return types, and RecordingObserver classes updated to accept
pipeline_path kwarg.