Skip to content

Add infrastructure to capture and store logs: stdout, stderr, python logs, and tracebacks [ENG-264]#89

Draft
brian-arnold wants to merge 1 commit intonauticalab:devfrom
brian-arnold:claude/logging
Draft

Add infrastructure to capture and store logs: stdout, stderr, python logs, and tracebacks [ENG-264]#89
brian-arnold wants to merge 1 commit intonauticalab:devfrom
brian-arnold:claude/logging

Conversation

@brian-arnold
Copy link
Collaborator

Logging Infrastructure Overview

What it captures

Every time a packet function executes, the infrastructure captures stdout, stderr, Python logging output, and
tracebacks (on failure). These are bundled into a CapturedLogs dataclass.

How capture works

There are two capture mechanisms depending on where the function runs:

Local execution (LocalExecutor / no executor): LocalCaptureContext sets per-task/thread ContextVar buffers.
sys.stdout/sys.stderr are globally replaced (once, at observer creation) with ContextLocalTeeStream objects
that write to both the terminal and the active buffer. A ContextVarLoggingHandler on the root logger captures
Python logging the same way. Concurrent packets never intermingle because each asyncio task/thread gets its own
ContextVar copy.

Ray execution (RayExecutor): _make_capture_wrapper() returns a self-contained closure (no orcapod imports) that
does fd-level capture (os.dup2) on the worker process. It returns a plain 6-tuple over the Ray object store;
the driver reconstructs CapturedLogs from it.

How CapturedLogs flows through the call chain

User function
↓ catches exception, builds CapturedLogs
PythonPacketFunction.direct_call() → (PacketProtocol | None, CapturedLogs)
↓ or via executor.execute_callable() which also returns (raw, CapturedLogs)
PythonPacketFunction.call() → (PacketProtocol | None, CapturedLogs)

CachedPacketFunction.call() → (PacketProtocol | None, CapturedLogs)
↓ (cache hit returns empty CapturedLogs)
_FunctionPodBase.process_packet_with_capture() → (Tag, Packet | None, CapturedLogs)

FunctionNode._process_packet_internal() → (Tag, Packet | None, CapturedLogs)

FunctionNode.execute() — passes CapturedLogs to pkt_logger.record()

The key design decision: CapturedLogs travels as a return value, not through a ContextVar side-channel.
direct_call() catches user-function exceptions internally and returns (None, captured_failure) — it never
re-raises.

Observer/Logger interaction

The orchestrator injects an ExecutionObserverProtocol into each node's execute() call. The observer provides
lifecycle hooks (on_run_start, on_node_start, on_packet_start, etc.) and a factory method:

observer.create_packet_logger(node, tag, packet, pipeline_path=...) → PacketExecutionLoggerProtocol

FunctionNode.execute() calls this factory before each non-cached packet, then after execution:
tag_out, result, captured = self._process_packet_internal(tag, packet)
pkt_logger.record(captured) # writes to database

Concrete implementations

  • NoOpObserver / NoOpLogger — Default when no observability is configured. Zero-cost no-ops.
  • LoggingObserver / PacketLogger — Writes structured log rows to any ArrowDatabaseProtocol
    (InMemoryArrowDatabase, DeltaTableDatabase, etc.).

Log storage structure

LoggingObserver stores logs at pipeline-path-mirrored locations. If a function node's pipeline path is
("my_pipeline", "transform_a", "v0", ...), its logs go to ("my_pipeline", "logs", "transform_a", "v0", ...).
Each function node gets its own table. Retrieve with obs.get_logs(pipeline_path=node.pipeline_path).

Each log row has fixed columns (log_id, run_id, node_label, stdout, stderr, python_logs, traceback, success,
timestamp) plus dynamic tag columns — each tag key becomes its own queryable column rather than being
JSON-encoded.

Changes to orcapod-python

  1. CapturedLogs as return values (removed ContextVar side-channel)

Core change: call(), direct_call(), async_call(), direct_async_call() on packet functions now return
tuple[PacketProtocol | None, CapturedLogs] instead of PacketProtocol | None. Similarly
executor.execute()/async_execute() return tuples. direct_call() catches user-function exceptions internally and
returns (None, captured_failure) instead of re-raising.

  • Protocols — packet_function.py, executor.py, node_protocols.py updated with new return types
  • Executors — base.py, local.py, ray.py pass through CapturedLogs; Ray's execute() now delegates to
    execute_callable()
  • packet_function.py — All _captured_logs.set() calls removed; PythonPacketFunction, CachedPacketFunction,
    PacketFunctionWrapper return tuples
  • function_pod.py — Added process_packet_with_capture() (returns 3-tuple with CapturedLogs); process_packet()
    unchanged (still 2-tuple, discards captured)
  • cached_function_pod.py — Added process_packet_with_capture() preserving pod-level caching
  • function_node.py — Uses process_packet_with_capture() to get CapturedLogs directly from the return value. No
    more _captured_logs ContextVar reads. Simplified execute logic (no try/except needed)
  1. Removed duplicate _ray_capture_wrapper
  • Deleted the module-level function from logging_capture.py (unused imports cleaned up too)
  • RayExecutor._make_capture_wrapper() is the single capture mechanism for Ray
  1. Pipeline-path-mirrored log storage with queryable tag columns
  • create_packet_logger() accepts pipeline_path kwarg on protocol, NoOpObserver, and LoggingObserver
  • Logs stored at pipeline_path[:1] + ("logs",) + pipeline_path[1:] — each function node gets its own log table
  • Tag data stored as individual columns (not a single JSON "tags" column)
  • get_logs(pipeline_path=...) retrieves node-specific logs
  1. Error handling modes
  • FunctionNode.execute() accepts error_policy: "continue" | "fail_fast"
  • SyncPipelineOrchestrator and AsyncPipelineOrchestrator accept error_policy and pass it through
  1. New files
  • src/orcapod/pipeline/logging_capture.py — Capture infrastructure (CapturedLogs, LocalCaptureContext, tee
    streams)
  • src/orcapod/pipeline/logging_observer.py — LoggingObserver + PacketLogger
  • src/orcapod/protocols/observability_protocols.py — Observer/Logger protocols
  • tests/test_pipeline/test_logging_capture.py — Unit tests for capture infrastructure
  • tests/test_pipeline/test_logging_observer_integration.py — 9 end-to-end integration tests
  1. Test updates

~20 test files updated to unpack the new tuple return types, and RecordingObserver classes updated to accept
pipeline_path kwarg.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Introduces end-to-end observability for packet-function execution by capturing stdout/stderr/Python logging/tracebacks into a CapturedLogs payload, threading it through packet-function and executor APIs, and persisting structured per-packet log rows via a new observer/logger protocol.

Changes:

  • Add CapturedLogs + capture infrastructure (local ContextVar tee streams; Ray fd-level capture wrapper) and propagate captured logs through packet-function/executor return types.
  • Add observability protocols plus LoggingObserver/PacketLogger to persist per-packet logs to an ArrowDatabaseProtocol, with pipeline-path-mirrored storage and tag keys as queryable columns.
  • Update orchestrators/nodes and a broad set of tests to support new observer hooks (on_run_start/end, on_packet_crash) and new tuple return signatures.

Reviewed changes

Copilot reviewed 36 out of 36 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
tests/test_pipeline/test_sync_orchestrator.py Updates test observers to implement new observer hooks and packet logger factory.
tests/test_pipeline/test_pipeline.py Adjusts mock executor return types to match (result, captured) API.
tests/test_pipeline/test_orchestrator.py Updates async error-handling expectations and adds on_packet_crash test coverage.
tests/test_pipeline/test_observer.py Expands NoOp observer/logger protocol conformance tests.
tests/test_pipeline/test_node_protocols.py Updates protocol-satisfaction tests for new observer requirements.
tests/test_pipeline/test_logging_observer_integration.py Adds integration tests verifying log capture, storage layout, and tag columns.
tests/test_pipeline/test_logging_capture.py Adds unit tests for local tee capture and Ray wrapper capture behavior.
tests/test_core/test_result_cache.py Updates direct call usage to unpack (output, captured).
tests/test_core/test_regression_fixes.py Adjusts regression tests for new exception-swallowing behavior and updated Ray async execution API.
tests/test_core/packet_function/test_packet_function.py Updates packet function call/async call tests for tuple return types.
tests/test_core/packet_function/test_executor.py Updates executor protocol tests for (result, CapturedLogs) return values.
tests/test_core/packet_function/test_cached_packet_function.py Updates cached packet function tests for tuple return values.
tests/test_channels/test_node_async_execute.py Updates channel-based async execution tests for tuple return values.
tests/test_channels/test_copilot_review_issues.py Updates tests to unpack tuple return values from direct_call.
tests/test_channels/test_async_execute.py Updates async execution/error tests for (None, captured) failure semantics.
src/orcapod/protocols/observability_protocols.py Adds runtime-checkable protocols for observers and per-packet loggers.
src/orcapod/protocols/node_protocols.py Switches node observer typing to ExecutionObserverProtocol and adds error_policy to sync function-node execute protocol.
src/orcapod/protocols/core_protocols/packet_function.py Updates packet function protocol signatures to return `(packet
src/orcapod/protocols/core_protocols/executor.py Updates executor protocol signatures to return (result, CapturedLogs) tuples and documents capture semantics.
src/orcapod/protocols/init.py Re-exports observability protocols from orcapod.protocols.
src/orcapod/pipeline/sync_orchestrator.py Adds run_id lifecycle hooks and plumbs error_policy into function node execution.
src/orcapod/pipeline/observer.py Replaces legacy observer protocol with NoOpObserver/NoOpLogger implementations of the new protocols.
src/orcapod/pipeline/logging_observer.py Adds LoggingObserver and PacketLogger to persist structured execution logs to an Arrow DB.
src/orcapod/pipeline/logging_capture.py Adds CapturedLogs, ContextVar tee streams, logging handler, and LocalCaptureContext.
src/orcapod/pipeline/async_orchestrator.py Adds run lifecycle hooks and an error_policy parameter (currently stored but not applied).
src/orcapod/pipeline/init.py Exposes LoggingObserver/PacketLogger in pipeline public exports.
src/orcapod/core/packet_function.py Implements tuple-return execution semantics, local capture, and exception swallowing in PythonPacketFunction/wrappers/cached PFs.
src/orcapod/core/nodes/source_node.py Updates observer typing import to new protocol.
src/orcapod/core/nodes/operator_node.py Updates observer typing import to new protocol.
src/orcapod/core/nodes/function_node.py Records captured logs via observer-created packet loggers; adds error_policy handling for sync execution and on_packet_crash hooks.
src/orcapod/core/function_pod.py Adds process_packet_with_capture variants to return captured logs alongside results.
src/orcapod/core/executors/ray.py Unifies Ray capture via _make_capture_wrapper() and returns (raw, CapturedLogs) from callable execution.
src/orcapod/core/executors/local.py Adds local capture to execute_callable/async_execute_callable, returning (raw, CapturedLogs) and swallowing exceptions.
src/orcapod/core/executors/base.py Updates base executor signatures to return (raw, CapturedLogs) (default capture is empty).
src/orcapod/core/cached_function_pod.py Adds caching variants that preserve CapturedLogs (empty on cache hits).
src/orcapod/init.py Re-exports ArrowTableSource at top level.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +75 to +76
from uuid_utils import uuid7

Comment on lines +138 to +141
# Dynamic tag columns — each tag key becomes its own column
for key, value in self._tag_data.items():
columns[key] = pa.array([str(value)], type=pa.large_utf8())

Comment on lines +72 to +75
effective_run_id = run_id or str(uuid.uuid4())
if self._observer is not None:
self._observer.on_run_start(effective_run_id)

Comment on lines +108 to +111
effective_run_id = run_id or str(uuid.uuid4())
if self._observer is not None:
self._observer.on_run_start(effective_run_id)

) -> None:
self._observer = observer
self._buffer_size = buffer_size
self._error_policy = error_policy
Comment on lines +1334 to +1342
pkt_logger.record(captured)
if not captured.success:
if observer is not None:
observer.on_packet_crash(
self,
tag,
packet,
RuntimeError(captured.traceback or "packet function failed"),
)
@brian-arnold brian-arnold changed the title Add infrastructure to capture and store logs: stdout, stderr, python logs, and tracebacks Add infrastructure to capture and store logs: stdout, stderr, python logs, and tracebacks [ENG-264] Mar 17, 2026
Copy link
Contributor

@eywalker eywalker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss the design as 1) the latest commits on dev has incorporated notable changes in the executor protocol which will strongly influences the implementation in the PR and 2) I don't think it'd be a good idea for PacketFunction and FunctionPod to be extended to return a logging related object as part of its return signature. Not only this often leads to invocation returning an object (log) that needs to be ignored (many cases in tests use result, _log = packet_funciton.call(...) kind of signature to dismiss the log returned as second item), but it also places the burden (and trust) of correctly preparing the logging object within packet function and executor. Since "how" we handle logging should be decoupled from the action of logging itself, I think it'd make sense for us to continue with the dependency injection pattern, passing in logger object into the function pod -> packet function -> executor, and remove the return pathway. Let's discuss!

from typing import TYPE_CHECKING, Protocol, runtime_checkable

if TYPE_CHECKING:
from orcapod.core.nodes import GraphNode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not use concrete classes within protocols except for globally defined classes as found in orcapod.types


if TYPE_CHECKING:
from orcapod.core.nodes import GraphNode
from orcapod.pipeline.logging_capture import CapturedLogs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, this should be defined in terms of protocols or add the class into orcapod.types

if TYPE_CHECKING:
from orcapod.core.nodes import GraphNode
from orcapod.pipeline.logging_capture import CapturedLogs
from orcapod.protocols.core_protocols import PacketProtocol, TagProtocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protocol import is cheap -- no need to place this under TYPE_CHECKING


def on_packet_start(
self,
node: "GraphNode",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really necessary for an observer to know how to work with a GraphNode directly? I'd rather that we use more lower-level interface that allows for Observer protocol to be more generically useful.

FunctionPod,
function_pod,
)
from .core.sources.arrow_table_source import ArrowTableSource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've adjusted import in the latest dev branch so that the entire sources subpackage (as in orcapod.core.sources) is available in the root namespace of the package. Let's discuss what specific Python objects other than Pipeline, FunctionPod, function_pod, if any, should be made available in the package root scope.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants