From 44491f641417e31e0f7e24d1567626b32104480a Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 09:46:49 +0000 Subject: [PATCH 01/23] docs(specs): add async orchestrator refactor design spec (PLT-922) Design for refactoring AsyncPipelineOrchestrator to use node protocols, slim execute/async_execute interface, observer injection, and tightened per-type async signatures. Co-Authored-By: Claude Opus 4.6 (1M context) --- ...3-15-async-orchestrator-refactor-design.md | 294 ++++++++++++++++++ 1 file changed, 294 insertions(+) create mode 100644 superpowers/specs/2026-03-15-async-orchestrator-refactor-design.md diff --git a/superpowers/specs/2026-03-15-async-orchestrator-refactor-design.md b/superpowers/specs/2026-03-15-async-orchestrator-refactor-design.md new file mode 100644 index 00000000..6830740d --- /dev/null +++ b/superpowers/specs/2026-03-15-async-orchestrator-refactor-design.md @@ -0,0 +1,294 @@ +# Async Orchestrator Refactor Design + +PLT-922: Refactor AsyncPipelineOrchestrator to use node protocols and orchestrator interface. + +## Context + +The `SyncPipelineOrchestrator` (PLT-921) introduced node protocols +(`SourceNodeProtocol`, `FunctionNodeProtocol`, `OperatorNodeProtocol`) with +TypeGuard dispatch. The sync orchestrator currently drives per-packet execution +for function nodes — calling `get_cached_results`, `execute_packet`, and firing +observer hooks from the orchestrator side. + +The `AsyncPipelineOrchestrator` uses a different pattern: it calls a uniform +`node.async_execute(inputs, output)` on all nodes, letting nodes handle +everything internally via channels. + +This refactor aligns both orchestrators on a common design where: + +- Nodes own their execution (caching, per-packet logic, persistence). +- Orchestrators are topology schedulers that call `execute` / `async_execute`. +- Observability is achieved via observer injection, not orchestrator-driven hooks. + +## Design Decisions + +### Slim node protocols + +The three node protocols expose only `execute` (sync) and `async_execute` +(async). All per-packet methods (`get_cached_results`, `execute_packet`, +`compute_pipeline_entry_id`) are removed from the protocol surface — they remain +as internal methods on the node classes. + +### Observer injection via parameter + +Both `execute` and `async_execute` accept an optional `observer` keyword +argument. Nodes call the observer hooks internally (`on_node_start`, +`on_node_end`, `on_packet_start`, `on_packet_end`). The `ExecutionObserver` +protocol itself is unchanged. + +### Orchestrators are topology schedulers + +Neither orchestrator inspects packet content, manages caches, or drives +per-packet loops. They: + +1. Walk the graph in topological order. +2. Call `execute` or `async_execute` on each node with the correct inputs. +3. Collect results into `OrchestratorResult`. + +### Tightened async signatures per node type + +Instead of a uniform `async_execute(inputs: Sequence[ReadableChannel], output)` +for all nodes, each protocol has a signature matching its arity: + +- Source: `async_execute(output)` — no inputs +- Function: `async_execute(input_channel, output)` — single input +- Operator: `async_execute(inputs: Sequence[ReadableChannel], output)` — N inputs + +### prefer_async flag on FunctionNode + +`FunctionNode` gains a `prefer_async` constructor parameter (default `False`). +When `execute()` is called (sync path), the node checks this flag: + +- `prefer_async=True` and pod/executor supports async → run async path internally +- Otherwise → run sync path + +This allows a sync orchestrator to still leverage async-capable executors +per-node without complicating the orchestrator or `execute` signature. + +## Revised Node Protocols + +```python +@runtime_checkable +class SourceNodeProtocol(Protocol): + node_type: str # == "source" + + def execute( + self, *, observer: ExecutionObserver | None = None + ) -> list[tuple[TagProtocol, PacketProtocol]]: ... + + async def async_execute( + self, + output: WritableChannel[tuple[TagProtocol, PacketProtocol]], + *, + observer: ExecutionObserver | None = None, + ) -> None: ... + + +@runtime_checkable +class FunctionNodeProtocol(Protocol): + node_type: str # == "function" + + def execute( + self, + input_stream: StreamProtocol, + *, + observer: ExecutionObserver | None = None, + ) -> list[tuple[TagProtocol, PacketProtocol]]: ... + + async def async_execute( + self, + input_channel: ReadableChannel[tuple[TagProtocol, PacketProtocol]], + output: WritableChannel[tuple[TagProtocol, PacketProtocol]], + *, + observer: ExecutionObserver | None = None, + ) -> None: ... + + +@runtime_checkable +class OperatorNodeProtocol(Protocol): + node_type: str # == "operator" + + def execute( + self, + *input_streams: StreamProtocol, + observer: ExecutionObserver | None = None, + ) -> list[tuple[TagProtocol, PacketProtocol]]: ... + + async def async_execute( + self, + inputs: Sequence[ReadableChannel[tuple[TagProtocol, PacketProtocol]]], + output: WritableChannel[tuple[TagProtocol, PacketProtocol]], + *, + observer: ExecutionObserver | None = None, + ) -> None: ... +``` + +TypeGuard dispatch functions (`is_source_node`, `is_function_node`, +`is_operator_node`) remain unchanged. + +## Sync Orchestrator + +The orchestrator simplifies to a pure topology scheduler: + +```python +class SyncPipelineOrchestrator: + def __init__(self, observer=None): + self._observer = observer + + def run(self, graph, materialize_results=True) -> OrchestratorResult: + for node in topological_sort(graph): + if is_source_node(node): + buffers[node] = node.execute(observer=self._observer) + elif is_function_node(node): + stream = self._materialize_as_stream(buffers[pred], pred) + buffers[node] = node.execute(stream, observer=self._observer) + elif is_operator_node(node): + streams = [self._materialize_as_stream(buffers[p], p) + for p in sorted_preds] + buffers[node] = node.execute(*streams, observer=self._observer) + return OrchestratorResult(node_outputs=buffers) +``` + +`_materialize_as_stream` is retained — operators need `StreamProtocol` inputs. +`_gather_upstream`, `_gather_upstream_multi`, `_gc_buffers` helpers are retained. +The per-packet `_execute_function` method is removed. + +## Async Orchestrator + +The async orchestrator preserves the channel-based concurrent execution model +but uses TypeGuard dispatch with tightened per-type signatures. + +```python +class AsyncPipelineOrchestrator: + def __init__(self, observer=None, buffer_size=64): + self._observer = observer + self._buffer_size = buffer_size + + def run(self, graph, materialize_results=True) -> OrchestratorResult: + return asyncio.run(self._run_async(graph, materialize_results)) + + async def run_async(self, graph, materialize_results=True) -> OrchestratorResult: + return await self._run_async(graph, materialize_results) + + async def _run_async(self, graph, materialize_results) -> OrchestratorResult: + # Wire channels between nodes (same logic as today) + # For materialize_results=True: tee each output channel to collect items + + async with asyncio.TaskGroup() as tg: + for node in topo_order: + if is_source_node(node): + tg.create_task( + node.async_execute(writer, observer=self._observer) + ) + elif is_function_node(node): + tg.create_task( + node.async_execute( + input_reader, writer, observer=self._observer + ) + ) + elif is_operator_node(node): + tg.create_task( + node.async_execute( + input_readers, writer, observer=self._observer + ) + ) + + return OrchestratorResult(node_outputs=collected if materialize else {}) +``` + +Key changes from current implementation: + +- Takes `graph: nx.DiGraph` instead of `Pipeline` + `PipelineConfig`. +- Returns `OrchestratorResult` instead of `None`. +- TypeGuard dispatch with per-type signatures instead of uniform call. +- Observer injection via constructor + parameter forwarding. +- `buffer_size` is a constructor parameter (not from `PipelineConfig`). +- `materialize_results` controls whether intermediate outputs are collected. + +### Result collection + +When `materialize_results=True`, each node's output channel is tapped to collect +items into a list as they flow through. This uses a lightweight wrapper that +appends each item to a per-node list before forwarding to downstream readers. +When `materialize_results=False`, no collection occurs and `OrchestratorResult` +has empty `node_outputs`. + +## Node Internal Changes + +### SourceNode + +- `execute(observer=None)`: calls observer hooks, materializes `iter_packets()`, + returns list. +- `async_execute(output, observer=None)`: tightened signature (no `inputs` + param), adds observer hooks internally. + +### FunctionNode + +- `execute(input_stream, observer=None)`: internally handles cache lookup, + per-packet execution with observer hooks, persistence. Respects + `prefer_async` flag to choose sync vs async execution path. +- `async_execute(input_channel, output, observer=None)`: tightened signature + (single input channel), adds observer hooks internally. +- `get_cached_results`, `execute_packet`, `compute_pipeline_entry_id` remain as + class methods but are removed from the protocol. +- Constructor gains `prefer_async: bool = False` parameter. + +### OperatorNode + +- `execute(*input_streams, observer=None)`: internally handles cache check, + operator delegation, persistence, observer hooks. +- `async_execute(inputs, output, observer=None)`: signature unchanged (already + takes sequence), adds observer hooks internally. +- `get_cached_output` remains as class method but removed from protocol. + +## Pipeline.run() Changes + +```python +def run(self, orchestrator=None, config=None, ...): + if not self._compiled: + self.compile() + if effective_engine is not None: + self._apply_execution_engine(effective_engine, effective_opts) + + if orchestrator is not None: + orchestrator.run(self._node_graph) + else: + use_async = ... # same logic as today + if use_async: + AsyncPipelineOrchestrator().run(self._node_graph) + else: + SyncPipelineOrchestrator().run(self._node_graph) + + self.flush() +``` + +The `_run_async` helper method is removed. Both orchestrators receive +`self._node_graph` directly. + +## Removals + +**From node protocols:** + +- `FunctionNodeProtocol.get_cached_results()` +- `FunctionNodeProtocol.compute_pipeline_entry_id()` +- `FunctionNodeProtocol.execute_packet()` +- `OperatorNodeProtocol.get_cached_output()` + +**From protocols package:** + +- `AsyncExecutableProtocol` (entire file `async_executable.py`) + +**From Pipeline:** + +- `_run_async()` helper method + +## Testing Strategy + +- Update existing sync orchestrator tests to use the new `node.execute()` path. +- Update existing async orchestrator tests for the new signature + (`graph` instead of `Pipeline`). +- Sync/async parity tests remain — both orchestrators should produce identical + DB results. +- Observer tests verify hooks fire from inside nodes (same events, same order). +- Add tests for `materialize_results=True/False` on async orchestrator. +- Add tests for `prefer_async` flag on `FunctionNode`. From 8117a7e8babc62b80e4b50c73fe6fe9f496884cc Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 15:42:36 +0000 Subject: [PATCH 02/23] docs(specs): address spec review feedback for PLT-922 - Remove pipeline_config from async_execute signature entirely - Add iter_packets removal from SourceNodeProtocol - Clarify SourceNode.execute populates _cached_results - Clarify OperatorNode.execute calls get_cached_output first - Detail FunctionNode.execute internal per-packet logic - Add buffer_size forwarding in Pipeline.run default async path - Add notes on terminal channel draining and existing run() methods Co-Authored-By: Claude Opus 4.6 (1M context) --- ...3-15-async-orchestrator-refactor-design.md | 178 ++++++++++++++---- 1 file changed, 140 insertions(+), 38 deletions(-) diff --git a/superpowers/specs/2026-03-15-async-orchestrator-refactor-design.md b/superpowers/specs/2026-03-15-async-orchestrator-refactor-design.md index 6830740d..6ab65a38 100644 --- a/superpowers/specs/2026-03-15-async-orchestrator-refactor-design.md +++ b/superpowers/specs/2026-03-15-async-orchestrator-refactor-design.md @@ -54,16 +54,17 @@ for all nodes, each protocol has a signature matching its arity: - Function: `async_execute(input_channel, output)` — single input - Operator: `async_execute(inputs: Sequence[ReadableChannel], output)` — N inputs -### prefer_async flag on FunctionNode +### Deferred: prefer_async and concurrency config -`FunctionNode` gains a `prefer_async` constructor parameter (default `False`). -When `execute()` is called (sync path), the node checks this flag: +Two features are deferred to follow-up issues: -- `prefer_async=True` and pod/executor supports async → run async path internally -- Otherwise → run sync path - -This allows a sync orchestrator to still leverage async-capable executors -per-node without complicating the orchestrator or `execute` signature. +- **PLT-929**: `prefer_async` flag on `FunctionNode` — allows sync `execute()` + to internally use the async execution path when the pod/executor supports it. +- **PLT-930**: Move async concurrency config to node-level construction — + currently `FunctionNode.async_execute` receives `pipeline_config` for + `max_concurrency`. In this refactor, `pipeline_config` is removed from the + `async_execute` signature entirely. Nodes use their existing default + concurrency until PLT-930 adds proper node-level config. ## Revised Node Protocols @@ -151,7 +152,8 @@ class SyncPipelineOrchestrator: `_materialize_as_stream` is retained — operators need `StreamProtocol` inputs. `_gather_upstream`, `_gather_upstream_multi`, `_gc_buffers` helpers are retained. -The per-packet `_execute_function` method is removed. +The per-packet methods (`_execute_source`, `_execute_function`, +`_execute_operator`) are removed — each becomes a single `node.execute()` call. ## Async Orchestrator @@ -211,35 +213,103 @@ When `materialize_results=True`, each node's output channel is tapped to collect items into a list as they flow through. This uses a lightweight wrapper that appends each item to a per-node list before forwarding to downstream readers. When `materialize_results=False`, no collection occurs and `OrchestratorResult` -has empty `node_outputs`. +has empty `node_outputs`. Terminal sink channels are still drained regardless of +this flag, since nodes write to them unconditionally. + +### Channel wiring + +Channel wiring logic is preserved from the current implementation: + +- Single downstream: plain `Channel(buffer_size=self._buffer_size)`. +- Fan-out (multiple downstreams): `BroadcastChannel` with a reader per + downstream. +- Terminal nodes (no outgoing edges): sink `Channel` so `async_execute` has + somewhere to write. Drained after execution. + +### Existing node `run()` methods + +The existing `run()` method on `SourceNode`, `FunctionNode`, and `OperatorNode` +(the non-orchestrator pull-based execution path) is left intact. It serves a +different purpose — standalone node execution outside of pipeline orchestration. + +### Error handling + +If a node raises during `async_execute`, `asyncio.TaskGroup` cancels all +sibling tasks and propagates the exception. Observer hooks (`on_node_end`) are +not guaranteed to fire on failure — this matches the sync orchestrator's +behavior where an exception in `node.execute()` also skips `on_node_end`. ## Node Internal Changes ### SourceNode -- `execute(observer=None)`: calls observer hooks, materializes `iter_packets()`, - returns list. -- `async_execute(output, observer=None)`: tightened signature (no `inputs` - param), adds observer hooks internally. +**New method:** `execute(*, observer=None) -> list[(tag, packet)]` + +- Calls `observer.on_node_start(self)` if observer provided. +- Materializes `self.iter_packets()` into a list. +- Populates `_cached_results` so subsequent `iter_packets()` calls return the + cached version. +- Calls `observer.on_node_end(self)`. +- Returns the list. + +**Signature change:** `async_execute(output, *, observer=None) -> None` + +- Tightened from `async_execute(inputs, output)` — no `inputs` parameter + (source has no upstream). +- Adds observer `on_node_start` / `on_node_end` hooks internally. + +**Removed from protocol:** `iter_packets()` — replaced by `execute()`. The +method remains on the class for internal use and backward compatibility, but it +is no longer part of `SourceNodeProtocol`. ### FunctionNode -- `execute(input_stream, observer=None)`: internally handles cache lookup, - per-packet execution with observer hooks, persistence. Respects - `prefer_async` flag to choose sync vs async execution path. -- `async_execute(input_channel, output, observer=None)`: tightened signature - (single input channel), adds observer hooks internally. -- `get_cached_results`, `execute_packet`, `compute_pipeline_entry_id` remain as - class methods but are removed from the protocol. -- Constructor gains `prefer_async: bool = False` parameter. +**Signature change:** `execute(input_stream, *, observer=None) -> list[(tag, packet)]` + +- The existing `execute` method already takes `input_stream: StreamProtocol` and + returns `list[(tag, packet)]`. +- Adds `observer` parameter. Internally calls `on_node_start` / `on_node_end` + and per-packet `on_packet_start` / `on_packet_end(cached=...)` hooks. +- Internally uses `get_cached_results`, `compute_pipeline_entry_id`, and + `execute_packet` — these are implementation details, not protocol surface. + +**Signature change:** `async_execute(input_channel, output, *, observer=None) -> None` + +- Tightened from `async_execute(inputs, output, pipeline_config)` — single + `input_channel` instead of `Sequence[ReadableChannel]`. +- `pipeline_config` parameter removed entirely. Node uses its existing default + concurrency. Proper node-level concurrency config deferred to PLT-930. +- Adds observer hooks internally. + +**Internal execution logic in `execute()`:** The current +`SyncPipelineOrchestrator._execute_function` drives a per-packet loop with +cache lookup (`get_cached_results`) and observer hooks. This logic moves inside +`FunctionNode.execute()`: iterate over the input stream's packets, call +`compute_pipeline_entry_id` to check the pipeline DB, call `execute_packet` for +misses, and fire `on_packet_start` / `on_packet_end(cached=...)` around each +packet. The node's internal `CachedFunctionPod` handles function-level +memoization as before. + +**Removed from protocol (kept as class methods):** +`get_cached_results`, `execute_packet`, `compute_pipeline_entry_id`. ### OperatorNode -- `execute(*input_streams, observer=None)`: internally handles cache check, - operator delegation, persistence, observer hooks. -- `async_execute(inputs, output, observer=None)`: signature unchanged (already - takes sequence), adds observer hooks internally. -- `get_cached_output` remains as class method but removed from protocol. +**Signature change:** `execute(*input_streams, observer=None) -> list[(tag, packet)]` + +- The existing `execute` method already takes `*input_streams: StreamProtocol` + and returns `list[(tag, packet)]`. +- Adds `observer` parameter. Internally calls `on_node_start` / `on_node_end`. +- Internally calls `get_cached_output()` first — if it returns a stream + (REPLAY mode), materializes it and returns without computing. Otherwise + delegates to the operator's `process()` and handles persistence. + +**Signature change:** `async_execute(inputs, output, *, observer=None) -> None` + +- Signature already takes `Sequence[ReadableChannel]` — no arity change. +- Adds observer hooks internally. + +**Removed from protocol (kept as class method):** `get_cached_output`. ## Pipeline.run() Changes @@ -255,20 +325,41 @@ def run(self, orchestrator=None, config=None, ...): else: use_async = ... # same logic as today if use_async: - AsyncPipelineOrchestrator().run(self._node_graph) + AsyncPipelineOrchestrator( + buffer_size=config.channel_buffer_size, + ).run(self._node_graph) else: SyncPipelineOrchestrator().run(self._node_graph) self.flush() ``` -The `_run_async` helper method is removed. Both orchestrators receive -`self._node_graph` directly. +The `_run_async()` helper method is removed. Both orchestrators receive +`self._node_graph` directly. The default async path instantiates +`AsyncPipelineOrchestrator` inline and calls `.run(self._node_graph)`, matching +the sync path pattern. + +## File-level Change Summary + +| File | Changes | +|------|---------| +| `protocols/node_protocols.py` | Remove `get_cached_results`, `execute_packet`, `compute_pipeline_entry_id` from `FunctionNodeProtocol`. Remove `get_cached_output` from `OperatorNodeProtocol`. Remove `iter_packets` from `SourceNodeProtocol`. Add `execute` and `async_execute` with observer param to all three protocols. | +| `protocols/core_protocols/async_executable.py` | Delete entire file. | +| `protocols/core_protocols/__init__.py` | Remove `AsyncExecutableProtocol` re-export. | +| `core/nodes/source_node.py` | Add `execute(observer=None)` method. Change `async_execute` signature (remove `inputs` param, add `observer`). Add observer hooks. | +| `core/nodes/function_node.py` | Add `observer` param to `execute`. Change `async_execute` signature (single `input_channel`, remove `pipeline_config`, add `observer`). Move per-packet cache lookup + observer hook calls inside `execute()`. | +| `core/nodes/operator_node.py` | Add `observer` param to `execute`. Add `observer` param to `async_execute`. Move observer hook calls inside both methods. | +| `pipeline/sync_orchestrator.py` | Remove `_execute_source`, `_execute_function`, `_execute_operator`. Simplify `run()` to call `node.execute(...)` directly. Pass observer to nodes. | +| `pipeline/async_orchestrator.py` | Change `run` / `run_async` to take `graph` + `materialize_results`. Add TypeGuard dispatch. Tighten per-node `async_execute` calls. Add `buffer_size` constructor param. Add observer support. Return `OrchestratorResult`. | +| `pipeline/graph.py` | Remove `_run_async()`. Update default async path to instantiate `AsyncPipelineOrchestrator` and call `.run(self._node_graph)`. | +| `tests/test_pipeline/test_sync_orchestrator.py` | Update tests for new `node.execute()` path. Observer tests verify hooks fire from inside nodes. | +| `tests/test_pipeline/test_orchestrator.py` | Update async tests for new signature (`graph` instead of `Pipeline`). Add `materialize_results` tests. Add fan-out and terminal node tests. | ## Removals **From node protocols:** +- `SourceNodeProtocol.iter_packets()` - `FunctionNodeProtocol.get_cached_results()` - `FunctionNodeProtocol.compute_pipeline_entry_id()` - `FunctionNodeProtocol.execute_packet()` @@ -282,13 +373,24 @@ The `_run_async` helper method is removed. Both orchestrators receive - `_run_async()` helper method +**From SyncPipelineOrchestrator:** + +- `_execute_source()`, `_execute_function()`, `_execute_operator()` methods + ## Testing Strategy -- Update existing sync orchestrator tests to use the new `node.execute()` path. -- Update existing async orchestrator tests for the new signature - (`graph` instead of `Pipeline`). -- Sync/async parity tests remain — both orchestrators should produce identical - DB results. -- Observer tests verify hooks fire from inside nodes (same events, same order). -- Add tests for `materialize_results=True/False` on async orchestrator. -- Add tests for `prefer_async` flag on `FunctionNode`. +- **Sync orchestrator tests**: Update existing tests to verify `node.execute()` + is called (not the removed per-packet orchestrator logic). Observer tests + verify hooks fire from inside nodes with the same events and order. +- **Async orchestrator tests**: Update for new signature (`graph` instead of + `Pipeline`). Verify `OrchestratorResult` is returned. +- **Sync/async parity tests**: Both orchestrators should produce identical DB + results. Existing parity tests updated for new signatures. +- **`materialize_results` tests**: Verify `True` collects all node outputs, + `False` returns empty `node_outputs` (both orchestrators). +- **Fan-out tests**: Verify `BroadcastChannel` wiring when one node fans out to + multiple downstreams (async orchestrator). +- **Terminal node tests**: Verify sink channels are created and drained for nodes + with no outgoing edges (async orchestrator). +- **Error propagation tests**: Verify that a node failure in `TaskGroup` + propagates correctly and doesn't hang. From 7916d9041a292ae1dabafe229792fb4de7e00734 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 16:00:19 +0000 Subject: [PATCH 03/23] docs(plans): add async orchestrator refactor implementation plan (PLT-922) 14-task plan covering: protocol slimming, node execute/async_execute with observer injection, sync/async orchestrator simplification, Pipeline.run() update, and comprehensive test coverage. Co-Authored-By: Claude Opus 4.6 (1M context) --- ...-03-15-async-orchestrator-refactor-plan.md | 1717 +++++++++++++++++ 1 file changed, 1717 insertions(+) create mode 100644 superpowers/plans/2026-03-15-async-orchestrator-refactor-plan.md diff --git a/superpowers/plans/2026-03-15-async-orchestrator-refactor-plan.md b/superpowers/plans/2026-03-15-async-orchestrator-refactor-plan.md new file mode 100644 index 00000000..2414ca10 --- /dev/null +++ b/superpowers/plans/2026-03-15-async-orchestrator-refactor-plan.md @@ -0,0 +1,1717 @@ +# Async Orchestrator Refactor Implementation Plan + +> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Refactor both orchestrators to use slim node protocols where nodes own their execution and orchestrators are pure topology schedulers. + +**Architecture:** Node protocols slim to `execute()` + `async_execute()` with observer injection. Orchestrators call these methods and collect results. Per-packet logic (cache lookup, observer hooks) moves inside nodes. AsyncPipelineOrchestrator adopts the same `run(graph) -> OrchestratorResult` interface as the sync orchestrator. + +**Tech Stack:** Python, asyncio, networkx, pyarrow, pytest, pytest-asyncio + +**Spec:** `superpowers/specs/2026-03-15-async-orchestrator-refactor-design.md` + +--- + +## Chunk 1: Protocol Changes and SourceNode + +### Task 1: Slim down node protocols + +**Files:** +- Modify: `src/orcapod/protocols/node_protocols.py` + +- [ ] **Step 1: Write failing test — protocols have new shape** + +Add a test that imports the new protocol shapes and verifies TypeGuard dispatch still works. + +```python +# tests/test_pipeline/test_node_protocols.py (new file) +"""Tests for revised node protocols.""" + +from __future__ import annotations + +import pytest +from unittest.mock import MagicMock, AsyncMock + +from orcapod.protocols.node_protocols import ( + SourceNodeProtocol, + FunctionNodeProtocol, + OperatorNodeProtocol, + is_source_node, + is_function_node, + is_operator_node, +) + + +class TestSourceNodeProtocol: + def test_requires_execute(self): + """SourceNodeProtocol requires execute method.""" + + class GoodSource: + node_type = "source" + + def execute(self, *, observer=None): + return [] + + async def async_execute(self, output, *, observer=None): + pass + + assert isinstance(GoodSource(), SourceNodeProtocol) + + def test_rejects_old_iter_packets_only(self): + """SourceNodeProtocol no longer accepts iter_packets alone.""" + + class OldSource: + node_type = "source" + + def iter_packets(self): + return iter([]) + + assert not isinstance(OldSource(), SourceNodeProtocol) + + +class TestFunctionNodeProtocol: + def test_requires_execute_and_async_execute(self): + class GoodFunction: + node_type = "function" + + def execute(self, input_stream, *, observer=None): + return [] + + async def async_execute(self, input_channel, output, *, observer=None): + pass + + assert isinstance(GoodFunction(), FunctionNodeProtocol) + + def test_rejects_old_protocol(self): + """Old protocol with get_cached_results etc. is not sufficient.""" + + class OldFunction: + node_type = "function" + + def get_cached_results(self, entry_ids): + return {} + + def compute_pipeline_entry_id(self, tag, packet): + return "" + + def execute_packet(self, tag, packet): + return (tag, None) + + def execute(self, input_stream): + return [] + + # Missing async_execute → not a valid FunctionNodeProtocol + assert not isinstance(OldFunction(), FunctionNodeProtocol) + + +class TestOperatorNodeProtocol: + def test_requires_execute_and_async_execute(self): + class GoodOperator: + node_type = "operator" + + def execute(self, *input_streams, observer=None): + return [] + + async def async_execute(self, inputs, output, *, observer=None): + pass + + assert isinstance(GoodOperator(), OperatorNodeProtocol) + + def test_rejects_old_protocol(self): + """Old protocol with get_cached_output is not sufficient.""" + + class OldOperator: + node_type = "operator" + + def execute(self, *input_streams): + return [] + + def get_cached_output(self): + return None + + # Missing async_execute → not valid + assert not isinstance(OldOperator(), OperatorNodeProtocol) + + +class TestTypeGuardDispatch: + def test_dispatch_source(self): + node = MagicMock() + node.node_type = "source" + assert is_source_node(node) + assert not is_function_node(node) + assert not is_operator_node(node) + + def test_dispatch_function(self): + node = MagicMock() + node.node_type = "function" + assert is_function_node(node) + + def test_dispatch_operator(self): + node = MagicMock() + node.node_type = "operator" + assert is_operator_node(node) +``` + +Create `tests/test_pipeline/test_node_protocols.py` with the above content. + +- [ ] **Step 2: Run test to verify it fails** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py -v` +Expected: FAIL — protocol shapes don't match yet. + +- [ ] **Step 3: Update node protocols** + +Replace the contents of `src/orcapod/protocols/node_protocols.py`: + +```python +"""Node protocols for orchestrator interaction. + +Defines the three node protocols (Source, Function, Operator) that +formalize the interface between orchestrators and graph nodes, plus +TypeGuard dispatch functions for runtime type narrowing. + +Each protocol exposes ``execute`` (sync) and ``async_execute`` (async). +Nodes own their execution — caching, per-packet logic, and persistence +are internal. Orchestrators are topology schedulers. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import TYPE_CHECKING, Protocol, TypeGuard, runtime_checkable + +if TYPE_CHECKING: + from orcapod.channels import ReadableChannel, WritableChannel + from orcapod.core.nodes import GraphNode + from orcapod.pipeline.observer import ExecutionObserver + from orcapod.protocols.core_protocols import ( + PacketProtocol, + StreamProtocol, + TagProtocol, + ) + + +@runtime_checkable +class SourceNodeProtocol(Protocol): + """Protocol for source nodes in orchestrated execution.""" + + node_type: str + + def execute( + self, + *, + observer: "ExecutionObserver | None" = None, + ) -> list[tuple["TagProtocol", "PacketProtocol"]]: ... + + async def async_execute( + self, + output: "WritableChannel[tuple[TagProtocol, PacketProtocol]]", + *, + observer: "ExecutionObserver | None" = None, + ) -> None: ... + + +@runtime_checkable +class FunctionNodeProtocol(Protocol): + """Protocol for function nodes in orchestrated execution.""" + + node_type: str + + def execute( + self, + input_stream: "StreamProtocol", + *, + observer: "ExecutionObserver | None" = None, + ) -> list[tuple["TagProtocol", "PacketProtocol"]]: ... + + async def async_execute( + self, + input_channel: "ReadableChannel[tuple[TagProtocol, PacketProtocol]]", + output: "WritableChannel[tuple[TagProtocol, PacketProtocol]]", + *, + observer: "ExecutionObserver | None" = None, + ) -> None: ... + + +@runtime_checkable +class OperatorNodeProtocol(Protocol): + """Protocol for operator nodes in orchestrated execution.""" + + node_type: str + + def execute( + self, + *input_streams: "StreamProtocol", + observer: "ExecutionObserver | None" = None, + ) -> list[tuple["TagProtocol", "PacketProtocol"]]: ... + + async def async_execute( + self, + inputs: "Sequence[ReadableChannel[tuple[TagProtocol, PacketProtocol]]]", + output: "WritableChannel[tuple[TagProtocol, PacketProtocol]]", + *, + observer: "ExecutionObserver | None" = None, + ) -> None: ... + + +def is_source_node(node: "GraphNode") -> TypeGuard[SourceNodeProtocol]: + """Check if a node is a source node.""" + return node.node_type == "source" + + +def is_function_node(node: "GraphNode") -> TypeGuard[FunctionNodeProtocol]: + """Check if a node is a function node.""" + return node.node_type == "function" + + +def is_operator_node(node: "GraphNode") -> TypeGuard[OperatorNodeProtocol]: + """Check if a node is an operator node.""" + return node.node_type == "operator" +``` + +- [ ] **Step 4: Run protocol tests to verify they pass** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add tests/test_pipeline/test_node_protocols.py src/orcapod/protocols/node_protocols.py +git commit -m "refactor(protocols): slim node protocols to execute + async_execute with observer (PLT-922)" +``` + +### Task 2: Delete AsyncExecutableProtocol + +**Files:** +- Delete: `src/orcapod/protocols/core_protocols/async_executable.py` +- Modify: `src/orcapod/protocols/core_protocols/__init__.py` + +- [ ] **Step 1: Remove AsyncExecutableProtocol import and re-export** + +In `src/orcapod/protocols/core_protocols/__init__.py`, remove line 4 +(`from .async_executable import AsyncExecutableProtocol`) and remove +`"AsyncExecutableProtocol"` from `__all__`. + +- [ ] **Step 2: Delete the file** + +```bash +rm src/orcapod/protocols/core_protocols/async_executable.py +``` + +- [ ] **Step 3: Check for other imports of AsyncExecutableProtocol** + +Run: `uv run grep -r "AsyncExecutableProtocol" src/ tests/` + +If any imports remain, remove them. This protocol was defined but not used +by any caller. + +- [ ] **Step 4: Run full test suite to verify nothing breaks** + +Run: `uv run pytest tests/ -x -q` +Expected: All tests pass. + +- [ ] **Step 5: Commit** + +```bash +git add -u +git commit -m "refactor(protocols): remove AsyncExecutableProtocol (PLT-922)" +``` + +### Task 3: Add SourceNode.execute() with observer injection + +**Files:** +- Modify: `src/orcapod/core/nodes/source_node.py:228-255` +- Test: `tests/test_pipeline/test_node_protocols.py` (extend) + +- [ ] **Step 1: Write failing test for SourceNode.execute()** + +Append to `tests/test_pipeline/test_node_protocols.py`: + +```python +import pyarrow as pa +from orcapod.core.sources import ArrowTableSource +from orcapod.core.nodes import SourceNode + + +class TestSourceNodeExecute: + def _make_source_node(self): + table = pa.table({ + "key": pa.array(["a", "b", "c"], type=pa.large_string()), + "value": pa.array([1, 2, 3], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + return SourceNode(src) + + def test_execute_returns_list(self): + node = self._make_source_node() + result = node.execute() + assert isinstance(result, list) + assert len(result) == 3 + + def test_execute_populates_cached_results(self): + node = self._make_source_node() + node.execute() + assert node._cached_results is not None + assert len(node._cached_results) == 3 + + def test_execute_with_observer(self): + node = self._make_source_node() + events = [] + + class Obs: + def on_node_start(self, n): + events.append(("start", n.node_type)) + def on_node_end(self, n): + events.append(("end", n.node_type)) + def on_packet_start(self, n, t, p): + pass + def on_packet_end(self, n, t, ip, op, cached): + pass + + node.execute(observer=Obs()) + assert events == [("start", "source"), ("end", "source")] + + def test_execute_without_observer(self): + """execute() works fine with no observer.""" + node = self._make_source_node() + result = node.execute() + assert len(result) == 3 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py::TestSourceNodeExecute -v` +Expected: FAIL — `execute()` method doesn't exist yet. + +- [ ] **Step 3: Implement SourceNode.execute()** + +Add to `src/orcapod/core/nodes/source_node.py`, before the `run()` method +(around line 237): + +```python +def execute( + self, + *, + observer: Any = None, +) -> list[tuple[cp.TagProtocol, cp.PacketProtocol]]: + """Execute this source: materialize packets and return. + + Args: + observer: Optional execution observer for hooks. + + Returns: + List of (tag, packet) tuples. + """ + if self.stream is None: + raise RuntimeError( + "SourceNode in read-only mode has no stream data available" + ) + if observer is not None: + observer.on_node_start(self) + result = list(self.stream.iter_packets()) + self._cached_results = result + if observer is not None: + observer.on_node_end(self) + return result +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py::TestSourceNodeExecute -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/core/nodes/source_node.py tests/test_pipeline/test_node_protocols.py +git commit -m "feat(source-node): add execute() with observer injection (PLT-922)" +``` + +### Task 4: Tighten SourceNode.async_execute() signature + observer + +**Files:** +- Modify: `src/orcapod/core/nodes/source_node.py:240-254` +- Test: `tests/test_pipeline/test_node_protocols.py` (extend) + +- [ ] **Step 1: Write failing test for tightened async_execute** + +Append to `tests/test_pipeline/test_node_protocols.py`: + +```python +import pytest +from orcapod.channels import Channel + + +class TestSourceNodeAsyncExecuteProtocol: + @pytest.mark.asyncio + async def test_tightened_signature(self): + """async_execute takes output only, no inputs.""" + table = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "value": pa.array([1, 2], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + node = SourceNode(src) + + output_ch = Channel(buffer_size=16) + # New signature: just output + observer + await node.async_execute(output_ch.writer, observer=None) + rows = await output_ch.reader.collect() + assert len(rows) == 2 + + @pytest.mark.asyncio + async def test_async_execute_with_observer(self): + table = pa.table({ + "key": pa.array(["a"], type=pa.large_string()), + "value": pa.array([1], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + node = SourceNode(src) + events = [] + + class Obs: + def on_node_start(self, n): + events.append("start") + def on_node_end(self, n): + events.append("end") + def on_packet_start(self, n, t, p): + pass + def on_packet_end(self, n, t, ip, op, cached): + pass + + output_ch = Channel(buffer_size=16) + await node.async_execute(output_ch.writer, observer=Obs()) + assert events == ["start", "end"] +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py::TestSourceNodeAsyncExecuteProtocol -v` +Expected: FAIL — old signature takes `inputs, output`. + +- [ ] **Step 3: Update SourceNode.async_execute()** + +Replace the `async_execute` method in `src/orcapod/core/nodes/source_node.py` +(around line 240): + +```python +async def async_execute( + self, + output: WritableChannel[tuple[cp.TagProtocol, cp.PacketProtocol]], + *, + observer: Any = None, +) -> None: + """Push all (tag, packet) pairs from the wrapped stream to the output channel. + + Args: + output: Channel to write results to. + observer: Optional execution observer for hooks. + """ + if self.stream is None: + raise RuntimeError( + "SourceNode in read-only mode has no stream data available" + ) + try: + if observer is not None: + observer.on_node_start(self) + for tag, packet in self.stream.iter_packets(): + await output.send((tag, packet)) + if observer is not None: + observer.on_node_end(self) + finally: + await output.close() +``` + +Also remove `Sequence` from the imports since it's no longer needed for the +signature (keep `Iterator`). + +- [ ] **Step 4: Run tests** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py -v` +Expected: All PASS. + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/core/nodes/source_node.py tests/test_pipeline/test_node_protocols.py +git commit -m "refactor(source-node): tighten async_execute signature + observer (PLT-922)" +``` + +## Chunk 2: FunctionNode and OperatorNode Changes + +### Task 5: Add observer parameter to FunctionNode.execute() + +**Files:** +- Modify: `src/orcapod/core/nodes/function_node.py:488-512` +- Test: `tests/test_pipeline/test_node_protocols.py` (extend) + +- [ ] **Step 1: Write failing test** + +Append to `tests/test_pipeline/test_node_protocols.py`: + +```python +from orcapod.core.function_pod import FunctionPod +from orcapod.core.packet_function import PythonPacketFunction +from orcapod.core.nodes import FunctionNode + + +def double_value(value: int) -> int: + return value * 2 + + +class TestFunctionNodeExecute: + def _make_function_node(self): + table = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "value": pa.array([1, 2], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + pf = PythonPacketFunction(double_value, output_keys="result") + pod = FunctionPod(pf) + return FunctionNode(pod, src) + + def test_execute_with_observer(self): + node = self._make_function_node() + events = [] + + class Obs: + def on_node_start(self, n): + events.append(("node_start", n.node_type)) + def on_node_end(self, n): + events.append(("node_end", n.node_type)) + def on_packet_start(self, n, t, p): + events.append(("packet_start",)) + def on_packet_end(self, n, t, ip, op, cached): + events.append(("packet_end", cached)) + + input_stream = node._input_stream + result = node.execute(input_stream, observer=Obs()) + + assert len(result) == 2 + assert events[0] == ("node_start", "function") + assert events[-1] == ("node_end", "function") + # Should have packet_start/packet_end for each packet + packet_events = [e for e in events if e[0].startswith("packet")] + assert len(packet_events) == 4 # 2 start + 2 end + + def test_execute_without_observer(self): + node = self._make_function_node() + input_stream = node._input_stream + result = node.execute(input_stream) + assert len(result) == 2 + values = sorted([pkt.as_dict()["result"] for _, pkt in result]) + assert values == [2, 4] +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py::TestFunctionNodeExecute -v` +Expected: FAIL — `execute()` doesn't accept `observer` keyword. + +- [ ] **Step 3: Update FunctionNode.execute()** + +In `src/orcapod/core/nodes/function_node.py`, modify the `execute` method +(line 488) to add observer parameter and internal hooks. The method should: + +1. Accept `*, observer=None` keyword parameter +2. Call `observer.on_node_start(self)` at the start +3. For each packet: compute entry ID, check cache, call + `observer.on_packet_start` / `on_packet_end(cached=...)` around execution +4. Call `observer.on_node_end(self)` at the end + +```python +def execute( + self, + input_stream: StreamProtocol, + *, + observer: Any = None, +) -> list[tuple[TagProtocol, PacketProtocol]]: + """Execute all packets from a stream: compute, persist, and cache. + + Args: + input_stream: The input stream to process. + observer: Optional execution observer for hooks. + + Returns: + Materialized list of (tag, output_packet) pairs, excluding + None outputs. + """ + if observer is not None: + observer.on_node_start(self) + + # Gather entry IDs and check cache + upstream_entries = [ + (tag, packet, self.compute_pipeline_entry_id(tag, packet)) + for tag, packet in input_stream.iter_packets() + ] + entry_ids = [eid for _, _, eid in upstream_entries] + cached = self.get_cached_results(entry_ids=entry_ids) + + output: list[tuple[TagProtocol, PacketProtocol]] = [] + for tag, packet, entry_id in upstream_entries: + if observer is not None: + observer.on_packet_start(self, tag, packet) + + if entry_id in cached: + tag_out, result = cached[entry_id] + if observer is not None: + observer.on_packet_end(self, tag, packet, result, cached=True) + output.append((tag_out, result)) + else: + tag_out, result = self._process_packet_internal(tag, packet) + if observer is not None: + observer.on_packet_end(self, tag, packet, result, cached=False) + if result is not None: + output.append((tag_out, result)) + + if observer is not None: + observer.on_node_end(self) + return output +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py::TestFunctionNodeExecute -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/core/nodes/function_node.py tests/test_pipeline/test_node_protocols.py +git commit -m "feat(function-node): add observer injection to execute() (PLT-922)" +``` + +### Task 6: Tighten FunctionNode.async_execute() signature + observer + +**Files:** +- Modify: `src/orcapod/core/nodes/function_node.py:1142-1263` +- Test: `tests/test_pipeline/test_node_protocols.py` (extend) + +- [ ] **Step 1: Write failing test** + +Append to `tests/test_pipeline/test_node_protocols.py`: + +```python +class TestFunctionNodeAsyncExecute: + @pytest.mark.asyncio + async def test_tightened_signature(self): + """async_execute takes single input_channel, not Sequence.""" + table = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "value": pa.array([1, 2], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + pf = PythonPacketFunction(double_value, output_keys="result") + pod = FunctionPod(pf) + node = FunctionNode(pod, src) + + input_ch = Channel(buffer_size=16) + output_ch = Channel(buffer_size=16) + + for tag, packet in src.iter_packets(): + await input_ch.writer.send((tag, packet)) + await input_ch.writer.close() + + # New signature: single input_channel, not list + await node.async_execute(input_ch.reader, output_ch.writer) + rows = await output_ch.reader.collect() + assert len(rows) == 2 + values = sorted([pkt.as_dict()["result"] for _, pkt in rows]) + assert values == [2, 4] + + @pytest.mark.asyncio + async def test_async_execute_with_observer(self): + table = pa.table({ + "key": pa.array(["a"], type=pa.large_string()), + "value": pa.array([1], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + pf = PythonPacketFunction(double_value, output_keys="result") + pod = FunctionPod(pf) + node = FunctionNode(pod, src) + + events = [] + class Obs: + def on_node_start(self, n): events.append("node_start") + def on_node_end(self, n): events.append("node_end") + def on_packet_start(self, n, t, p): events.append("pkt_start") + def on_packet_end(self, n, t, ip, op, cached): events.append("pkt_end") + + input_ch = Channel(buffer_size=16) + output_ch = Channel(buffer_size=16) + for tag, packet in src.iter_packets(): + await input_ch.writer.send((tag, packet)) + await input_ch.writer.close() + + await node.async_execute(input_ch.reader, output_ch.writer, observer=Obs()) + assert "node_start" in events + assert "node_end" in events +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py::TestFunctionNodeAsyncExecute -v` +Expected: FAIL — old signature takes `inputs` (Sequence) and `pipeline_config`. + +- [ ] **Step 3: Update FunctionNode.async_execute()** + +Replace the `async_execute` method in `src/orcapod/core/nodes/function_node.py` +(line 1142). Key changes: +- First positional arg: `input_channel: ReadableChannel[...]` (not `inputs: Sequence[...]`) +- Remove `pipeline_config` parameter +- Add `*, observer=None` keyword +- Replace all `inputs[0]` references with `input_channel` +- Use hardcoded default concurrency (defer to PLT-930) +- Add observer hooks: `on_node_start`/`on_node_end` around the whole method, + `on_packet_start`/`on_packet_end(cached=...)` around each packet in Phase 2 + +```python +async def async_execute( + self, + input_channel: ReadableChannel[tuple[TagProtocol, PacketProtocol]], + output: WritableChannel[tuple[TagProtocol, PacketProtocol]], + *, + observer: Any = None, +) -> None: + """Streaming async execution for FunctionNode. + + When a database is attached, uses two-phase execution: replay cached + results first, then compute missing packets concurrently. Otherwise, + routes each packet through ``_async_process_packet_internal`` directly. + + Args: + input_channel: Single input channel to read from. + output: Output channel to write results to. + observer: Optional execution observer for hooks. + """ + try: + if observer is not None: + observer.on_node_start(self) + + if self._cached_function_pod is not None: + # Two-phase async execution with DB backing + PIPELINE_ENTRY_ID_COL = "__pipeline_entry_id" + existing_entry_ids: set[str] = set() + + taginfo = self._pipeline_database.get_all_records( + self.pipeline_path, + record_id_column=PIPELINE_ENTRY_ID_COL, + ) + results = self._cached_function_pod._result_database.get_all_records( + self._cached_function_pod.record_path, + record_id_column=constants.PACKET_RECORD_ID, + ) + + if taginfo is not None and results is not None: + joined = ( + pl.DataFrame(taginfo) + .join( + pl.DataFrame(results), + on=constants.PACKET_RECORD_ID, + how="inner", + ) + .to_arrow() + ) + if joined.num_rows > 0: + tag_keys = self._input_stream.keys()[0] + existing_entry_ids = set( + cast( + list[str], + joined.column(PIPELINE_ENTRY_ID_COL).to_pylist(), + ) + ) + drop_cols = [ + c + for c in joined.column_names + if c.startswith(constants.META_PREFIX) + or c == PIPELINE_ENTRY_ID_COL + ] + data_table = joined.drop( + [c for c in drop_cols if c in joined.column_names] + ) + existing_stream = ArrowTableStream( + data_table, tag_columns=tag_keys + ) + for tag, packet in existing_stream.iter_packets(): + await output.send((tag, packet)) + + # Phase 2: process new packets concurrently + async def process_one_db( + tag: TagProtocol, packet: PacketProtocol + ) -> None: + try: + if observer is not None: + observer.on_packet_start(self, tag, packet) + ( + tag_out, + result_packet, + ) = await self._async_process_packet_internal(tag, packet) + if observer is not None: + observer.on_packet_end( + self, tag, packet, result_packet, cached=False + ) + if result_packet is not None: + await output.send((tag_out, result_packet)) + finally: + pass + + async with asyncio.TaskGroup() as tg: + async for tag, packet in input_channel: + entry_id = self.compute_pipeline_entry_id(tag, packet) + if entry_id in existing_entry_ids: + if observer is not None: + observer.on_packet_start(self, tag, packet) + observer.on_packet_end( + self, tag, packet, None, cached=True + ) + continue + tg.create_task(process_one_db(tag, packet)) + else: + # Simple async execution without DB + async def process_one( + tag: TagProtocol, packet: PacketProtocol + ) -> None: + if observer is not None: + observer.on_packet_start(self, tag, packet) + ( + tag_out, + result_packet, + ) = await self._async_process_packet_internal(tag, packet) + if observer is not None: + observer.on_packet_end( + self, tag, packet, result_packet, cached=False + ) + if result_packet is not None: + await output.send((tag_out, result_packet)) + + async with asyncio.TaskGroup() as tg: + async for tag, packet in input_channel: + tg.create_task(process_one(tag, packet)) + + if observer is not None: + observer.on_node_end(self) + finally: + await output.close() +``` + +Note: Concurrency limiting (semaphore) is removed for now. PLT-930 will +re-add it as node-level config. + +- [ ] **Step 4: Run new tests** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py::TestFunctionNodeAsyncExecute -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/core/nodes/function_node.py tests/test_pipeline/test_node_protocols.py +git commit -m "refactor(function-node): tighten async_execute signature + observer (PLT-922)" +``` + +### Task 7: Add observer to OperatorNode.execute() + cache check + +**Files:** +- Modify: `src/orcapod/core/nodes/operator_node.py:432-473` +- Test: `tests/test_pipeline/test_node_protocols.py` (extend) + +- [ ] **Step 1: Write failing test** + +Append to `tests/test_pipeline/test_node_protocols.py`: + +```python +from orcapod.core.nodes import OperatorNode +from orcapod.core.operators.join import Join + + +class TestOperatorNodeExecute: + def _make_join_node(self): + table_a = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "value": pa.array([10, 20], type=pa.int64()), + }) + table_b = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "score": pa.array([100, 200], type=pa.int64()), + }) + src_a = ArrowTableSource(table_a, tag_columns=["key"]) + src_b = ArrowTableSource(table_b, tag_columns=["key"]) + return OperatorNode(Join(), input_streams=[src_a, src_b]) + + def test_execute_with_observer(self): + node = self._make_join_node() + events = [] + + class Obs: + def on_node_start(self, n): + events.append(("node_start", n.node_type)) + def on_node_end(self, n): + events.append(("node_end", n.node_type)) + def on_packet_start(self, n, t, p): + pass + def on_packet_end(self, n, t, ip, op, cached): + pass + + result = node.execute( + *node._input_streams, observer=Obs() + ) + assert len(result) == 2 + assert events == [("node_start", "operator"), ("node_end", "operator")] + + def test_execute_without_observer(self): + node = self._make_join_node() + result = node.execute(*node._input_streams) + assert len(result) == 2 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py::TestOperatorNodeExecute -v` +Expected: FAIL — `execute()` doesn't accept `observer`. + +- [ ] **Step 3: Update OperatorNode.execute()** + +Modify `src/orcapod/core/nodes/operator_node.py` `execute` method (line 432): + +```python +def execute( + self, + *input_streams: StreamProtocol, + observer: Any = None, +) -> list[tuple[TagProtocol, PacketProtocol]]: + """Execute input streams: compute, persist, and cache. + + Args: + *input_streams: Input streams to execute. + observer: Optional execution observer for hooks. + + Returns: + Materialized list of (tag, packet) pairs. + """ + if observer is not None: + observer.on_node_start(self) + + # Check REPLAY cache first + cached_output = self.get_cached_output() + if cached_output is not None: + output = list(cached_output.iter_packets()) + if observer is not None: + observer.on_node_end(self) + return output + + # Compute + result_stream = self._operator.process(*input_streams) + + # Materialize + output = list(result_stream.iter_packets()) + + # Cache + if output: + self._cached_output_stream = StaticOutputOperatorPod._materialize_to_stream( + output + ) + else: + self._cached_output_stream = result_stream + + self._update_modified_time() + + # Persist to DB only in LOG mode + if ( + self._pipeline_database is not None + and self._cache_mode == CacheMode.LOG + and self._cached_output_stream is not None + ): + self._store_output_stream(self._cached_output_stream) + + if observer is not None: + observer.on_node_end(self) + return output +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py::TestOperatorNodeExecute -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/core/nodes/operator_node.py tests/test_pipeline/test_node_protocols.py +git commit -m "feat(operator-node): add observer + cache check to execute() (PLT-922)" +``` + +### Task 8: Add observer to OperatorNode.async_execute() + +**Files:** +- Modify: `src/orcapod/core/nodes/operator_node.py:627-688` +- Test: `tests/test_pipeline/test_node_protocols.py` (extend) + +- [ ] **Step 1: Write failing test** + +Append to `tests/test_pipeline/test_node_protocols.py`: + +```python +class TestOperatorNodeAsyncExecute: + @pytest.mark.asyncio + async def test_async_execute_with_observer(self): + table_a = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "value": pa.array([10, 20], type=pa.int64()), + }) + src_a = ArrowTableSource(table_a, tag_columns=["key"]) + from orcapod.core.operators import SelectPacketColumns + op = SelectPacketColumns(columns=["value"]) + op_node = OperatorNode(op, input_streams=[src_a]) + + events = [] + class Obs: + def on_node_start(self, n): events.append("start") + def on_node_end(self, n): events.append("end") + def on_packet_start(self, n, t, p): pass + def on_packet_end(self, n, t, ip, op, cached): pass + + input_ch = Channel(buffer_size=16) + output_ch = Channel(buffer_size=16) + for tag, packet in src_a.iter_packets(): + await input_ch.writer.send((tag, packet)) + await input_ch.writer.close() + + await op_node.async_execute([input_ch.reader], output_ch.writer, observer=Obs()) + assert "start" in events + assert "end" in events +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py::TestOperatorNodeAsyncExecute -v` +Expected: FAIL — `async_execute()` doesn't accept `observer`. + +- [ ] **Step 3: Update OperatorNode.async_execute()** + +Modify `src/orcapod/core/nodes/operator_node.py` `async_execute` (line 627) +to add `*, observer=None` keyword and call hooks: + +```python +async def async_execute( + self, + inputs: Sequence[ReadableChannel[tuple[TagProtocol, PacketProtocol]]], + output: WritableChannel[tuple[TagProtocol, PacketProtocol]], + *, + observer: Any = None, +) -> None: +``` + +Add `if observer: observer.on_node_start(self)` near the top of the try block, +and `if observer: observer.on_node_end(self)` before the `finally`. + +- [ ] **Step 4: Run tests** + +Run: `uv run pytest tests/test_pipeline/test_node_protocols.py::TestOperatorNodeAsyncExecute -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/core/nodes/operator_node.py tests/test_pipeline/test_node_protocols.py +git commit -m "feat(operator-node): add observer to async_execute() (PLT-922)" +``` + +## Chunk 3: Orchestrator Refactoring + +### Task 9: Simplify SyncPipelineOrchestrator + +**Files:** +- Modify: `src/orcapod/pipeline/sync_orchestrator.py` +- Modify: `tests/test_pipeline/test_sync_orchestrator.py` + +- [ ] **Step 1: Update SyncPipelineOrchestrator to use node.execute()** + +Rewrite `src/orcapod/pipeline/sync_orchestrator.py`. The `run()` method +calls `node.execute(...)` directly. Remove `_execute_source`, +`_execute_function`, `_execute_operator`. Keep `_materialize_as_stream`, +`_gather_upstream`, `_gather_upstream_multi`, `_gc_buffers`. + +```python +def run( + self, + graph: "nx.DiGraph", + materialize_results: bool = True, +) -> OrchestratorResult: + """Execute the node graph synchronously. + + Args: + graph: A NetworkX DiGraph with GraphNode objects as vertices. + materialize_results: If True, keep all node outputs in memory. + If False, discard buffers after downstream consumption. + + Returns: + OrchestratorResult with node outputs. + """ + import networkx as nx + + topo_order = list(nx.topological_sort(graph)) + buffers: dict[Any, list[tuple[TagProtocol, PacketProtocol]]] = {} + processed: set[Any] = set() + + for node in topo_order: + if is_source_node(node): + buffers[node] = node.execute(observer=self._observer) + elif is_function_node(node): + upstream_buf = self._gather_upstream(node, graph, buffers) + upstream_node = list(graph.predecessors(node))[0] + input_stream = self._materialize_as_stream(upstream_buf, upstream_node) + buffers[node] = node.execute(input_stream, observer=self._observer) + elif is_operator_node(node): + upstream_buffers = self._gather_upstream_multi(node, graph, buffers) + input_streams = [ + self._materialize_as_stream(buf, upstream_node) + for buf, upstream_node in upstream_buffers + ] + buffers[node] = node.execute(*input_streams, observer=self._observer) + else: + raise TypeError( + f"Unknown node type: {getattr(node, 'node_type', None)!r}" + ) + + processed.add(node) + + if not materialize_results: + self._gc_buffers(node, graph, buffers, processed) + + return OrchestratorResult(node_outputs=buffers) +``` + +- [ ] **Step 2: Run existing sync orchestrator tests** + +Run: `uv run pytest tests/test_pipeline/test_sync_orchestrator.py -v` +Expected: All PASS — the simplified orchestrator should produce the same results. + +- [ ] **Step 3: Commit** + +```bash +git add src/orcapod/pipeline/sync_orchestrator.py +git commit -m "refactor(sync-orchestrator): delegate to node.execute(), remove per-packet logic (PLT-922)" +``` + +### Task 10: Refactor AsyncPipelineOrchestrator + +**Files:** +- Modify: `src/orcapod/pipeline/async_orchestrator.py` +- Modify: `tests/test_pipeline/test_orchestrator.py` + +- [ ] **Step 1: Rewrite AsyncPipelineOrchestrator** + +Replace the contents of `src/orcapod/pipeline/async_orchestrator.py`: + +```python +"""Async pipeline orchestrator for push-based channel execution. + +Walks a compiled pipeline's node graph and launches all nodes concurrently +via ``asyncio.TaskGroup``, wiring them together with bounded channels. +Uses TypeGuard dispatch with tightened per-type async_execute signatures. +""" + +from __future__ import annotations + +import asyncio +import logging +from collections import defaultdict +from typing import TYPE_CHECKING, Any + +from orcapod.channels import BroadcastChannel, Channel +from orcapod.pipeline.observer import NoOpObserver +from orcapod.pipeline.result import OrchestratorResult +from orcapod.protocols.node_protocols import ( + is_function_node, + is_operator_node, + is_source_node, +) + +if TYPE_CHECKING: + import networkx as nx + + from orcapod.pipeline.observer import ExecutionObserver + from orcapod.protocols.core_protocols import PacketProtocol, TagProtocol + +logger = logging.getLogger(__name__) + + +class AsyncPipelineOrchestrator: + """Execute a compiled pipeline asynchronously using channels. + + After compilation, the orchestrator: + + 1. Walks the node graph in topological order. + 2. Creates bounded channels (or broadcast channels for fan-out) + between connected nodes. + 3. Launches every node's ``async_execute`` concurrently via + ``asyncio.TaskGroup``, using TypeGuard dispatch for per-type + signatures. + + Args: + observer: Optional execution observer for hooks. + buffer_size: Channel buffer size. Defaults to 64. + """ + + def __init__( + self, + observer: "ExecutionObserver | None" = None, + buffer_size: int = 64, + ) -> None: + self._observer = observer + self._buffer_size = buffer_size + + def run( + self, + graph: "nx.DiGraph", + materialize_results: bool = True, + ) -> OrchestratorResult: + """Synchronous entry point — runs the async pipeline to completion. + + Args: + graph: A NetworkX DiGraph with GraphNode objects as vertices. + materialize_results: If True, collect all node outputs into + the result. If False, return empty node_outputs. + + Returns: + OrchestratorResult with node outputs. + """ + return asyncio.run(self._run_async(graph, materialize_results)) + + async def run_async( + self, + graph: "nx.DiGraph", + materialize_results: bool = True, + ) -> OrchestratorResult: + """Async entry point for callers already inside an event loop. + + Args: + graph: A NetworkX DiGraph with GraphNode objects as vertices. + materialize_results: If True, collect all node outputs. + + Returns: + OrchestratorResult with node outputs. + """ + return await self._run_async(graph, materialize_results) + + async def _run_async( + self, + graph: "nx.DiGraph", + materialize_results: bool, + ) -> OrchestratorResult: + """Core async logic: wire channels, launch tasks, collect results.""" + import networkx as nx + + topo_order = list(nx.topological_sort(graph)) + buf = self._buffer_size + + # Build edge maps + out_edges: dict[Any, list[Any]] = defaultdict(list) + in_edges: dict[Any, list[Any]] = defaultdict(list) + for upstream_node, downstream_node in graph.edges(): + out_edges[upstream_node].append(downstream_node) + in_edges[downstream_node].append(upstream_node) + + # Create channels for each edge + node_output_channels: dict[Any, Channel | BroadcastChannel] = {} + edge_readers: dict[tuple[Any, Any], Any] = {} + + for node, downstreams in out_edges.items(): + if len(downstreams) == 1: + ch = Channel(buffer_size=buf) + node_output_channels[node] = ch + edge_readers[(node, downstreams[0])] = ch.reader + else: + bch = BroadcastChannel(buffer_size=buf) + node_output_channels[node] = bch + for ds in downstreams: + edge_readers[(node, ds)] = bch.add_reader() + + # Terminal nodes need sink channels + terminal_channels: list[Channel] = [] + for node in topo_order: + if node not in node_output_channels: + ch = Channel(buffer_size=buf) + node_output_channels[node] = ch + terminal_channels.append(ch) + + # Result collection: tap each node's output + collectors: dict[Any, list[tuple[TagProtocol, PacketProtocol]]] = {} + if materialize_results: + for node in topo_order: + collectors[node] = [] + + # Launch all nodes concurrently + async with asyncio.TaskGroup() as tg: + for node in topo_order: + writer = node_output_channels[node].writer + + if materialize_results: + # Wrap writer to collect items + collector = collectors[node] + writer = _CollectingWriter(writer, collector) + + if is_source_node(node): + tg.create_task( + node.async_execute(writer, observer=self._observer) + ) + elif is_function_node(node): + input_reader = edge_readers[ + (list(in_edges[node])[0], node) + ] + tg.create_task( + node.async_execute( + input_reader, writer, observer=self._observer + ) + ) + elif is_operator_node(node): + input_readers = [ + edge_readers[(upstream, node)] + for upstream in in_edges.get(node, []) + ] + tg.create_task( + node.async_execute( + input_readers, writer, observer=self._observer + ) + ) + else: + raise TypeError( + f"Unknown node type: {getattr(node, 'node_type', None)!r}" + ) + + # Drain terminal channels + for ch in terminal_channels: + await ch.reader.collect() + + return OrchestratorResult( + node_outputs=collectors if materialize_results else {} + ) + + +class _CollectingWriter: + """Wrapper that collects items while forwarding to real writer.""" + + def __init__(self, writer: Any, collector: list) -> None: + self._writer = writer + self._collector = collector + + async def send(self, item: Any) -> None: + self._collector.append(item) + await self._writer.send(item) + + async def close(self) -> None: + await self._writer.close() +``` + +- [ ] **Step 2: Update async orchestrator tests** + +Update `tests/test_pipeline/test_orchestrator.py` with these mechanical changes +throughout the file: + +**Signature changes (find and replace):** +- `orchestrator.run(pipeline)` → `pipeline.compile(); orchestrator.run(pipeline._node_graph); pipeline.flush()` +- `orchestrator.run(pipeline, config=config)` → `pipeline.compile(); AsyncPipelineOrchestrator(buffer_size=config.channel_buffer_size).run(pipeline._node_graph); pipeline.flush()` +- `await orchestrator.run_async(pipeline)` → `pipeline.compile(); await orchestrator.run_async(pipeline._node_graph); pipeline.flush()` +- `await node.async_execute([], output_ch.writer)` → `await node.async_execute(output_ch.writer)` +- `await node.async_execute([input_ch.reader], output_ch.writer)` → `await node.async_execute(input_ch.reader, output_ch.writer)` + +**Affected test classes and specific changes:** + +`TestSourceNodeAsyncExecute`: Change `await node.async_execute([], output_ch.writer)` to +`await node.async_execute(output_ch.writer)` in both test methods. + +`TestFunctionNodeAsyncExecute`: Change `await node.async_execute([input_ch.reader], output_ch.writer)` +to `await node.async_execute(input_ch.reader, output_ch.writer)`. + +`TestOrchestratorLinearPipeline`: Both tests — add `pipeline.compile()` before +and `pipeline.flush()` after `orchestrator.run(pipeline._node_graph)`. + +`TestOrchestratorOperatorPipeline`: Same compile/run/flush pattern. + +`TestOrchestratorDiamondDag`: Both tests — same pattern. + +`TestOrchestratorRunAsync`: Change `await orchestrator.run_async(pipeline)` to +`pipeline.compile(); await orchestrator.run_async(pipeline._node_graph); pipeline.flush()`. + +`TestPipelineConfigIntegration`: Replace: +```python +config = PipelineConfig(executor=ExecutorType.ASYNC_CHANNELS, channel_buffer_size=4) +orchestrator = AsyncPipelineOrchestrator() +orchestrator.run(pipeline, config=config) +``` +with: +```python +pipeline.compile() +orchestrator = AsyncPipelineOrchestrator(buffer_size=4) +orchestrator.run(pipeline._node_graph) +pipeline.flush() +``` + +- [ ] **Step 3: Run updated tests** + +Run: `uv run pytest tests/test_pipeline/test_orchestrator.py -v` +Expected: All PASS. + +- [ ] **Step 4: Commit** + +```bash +git add src/orcapod/pipeline/async_orchestrator.py tests/test_pipeline/test_orchestrator.py +git commit -m "refactor(async-orchestrator): use node protocols, graph interface, OrchestratorResult (PLT-922)" +``` + +### Task 11: Update Pipeline.run() + +**Files:** +- Modify: `src/orcapod/pipeline/graph.py:359-474` + +- [ ] **Step 1: Update Pipeline.run() and remove _run_async()** + +In `src/orcapod/pipeline/graph.py`: + +1. In the `run()` method (line 412-429), change the async path to: + ```python + if use_async: + from orcapod.pipeline.async_orchestrator import AsyncPipelineOrchestrator + AsyncPipelineOrchestrator( + buffer_size=config.channel_buffer_size, + ).run(self._node_graph) + ``` + And change the explicit orchestrator path (line 413) to also pass the graph: + ```python + if orchestrator is not None: + orchestrator.run(self._node_graph) + ``` + +2. Delete the `_run_async` method (lines 469-474). + +- [ ] **Step 2: Run all pipeline tests** + +Run: `uv run pytest tests/test_pipeline/ -v` +Expected: All PASS. + +- [ ] **Step 3: Run full test suite** + +Run: `uv run pytest tests/ -x -q` +Expected: All PASS. + +- [ ] **Step 4: Commit** + +```bash +git add src/orcapod/pipeline/graph.py +git commit -m "refactor(pipeline): update run() to pass graph to orchestrators, remove _run_async (PLT-922)" +``` + +## Chunk 4: Parity Tests and Cleanup + +### Task 12: Update parity tests + +**Files:** +- Modify: `tests/test_pipeline/test_sync_orchestrator.py` + +- [ ] **Step 1: Update parity test signatures** + +In `tests/test_pipeline/test_sync_orchestrator.py`, class `TestSyncAsyncParity`: + +Update the async orchestrator calls to use the new interface: +```python +# Old: +AsyncPipelineOrchestrator().run(async_pipeline) + +# New: +async_pipeline.compile() +AsyncPipelineOrchestrator().run(async_pipeline._node_graph) +async_pipeline.flush() +``` + +Do this for both `test_linear_pipeline_parity` and `test_diamond_pipeline_parity`. + +- [ ] **Step 2: Run parity tests** + +Run: `uv run pytest tests/test_pipeline/test_sync_orchestrator.py::TestSyncAsyncParity -v` +Expected: PASS + +- [ ] **Step 3: Add materialize_results tests** + +Append to `tests/test_pipeline/test_sync_orchestrator.py`: + +```python +class TestMaterializeResults: + def test_sync_materialize_false_returns_empty(self): + src = _make_source("key", "value", {"key": ["a", "b"], "value": [1, 2]}) + pf = PythonPacketFunction(double_value, output_keys="result") + pod = FunctionPod(pf) + + pipeline = Pipeline(name="mat", pipeline_database=InMemoryArrowDatabase()) + with pipeline: + pod(src, label="doubler") + + orch = SyncPipelineOrchestrator() + result = orch.run(pipeline._node_graph, materialize_results=False) + assert result.node_outputs == {} + + def test_async_materialize_true_collects_all(self): + src = _make_source("key", "value", {"key": ["a", "b"], "value": [1, 2]}) + pf = PythonPacketFunction(double_value, output_keys="result") + pod = FunctionPod(pf) + + pipeline = Pipeline(name="mat_async", pipeline_database=InMemoryArrowDatabase()) + with pipeline: + pod(src, label="doubler") + + pipeline.compile() + orch = AsyncPipelineOrchestrator() + result = orch.run(pipeline._node_graph, materialize_results=True) + assert len(result.node_outputs) > 0 + + def test_async_materialize_false_returns_empty(self): + src = _make_source("key", "value", {"key": ["a", "b"], "value": [1, 2]}) + pf = PythonPacketFunction(double_value, output_keys="result") + pod = FunctionPod(pf) + + pipeline = Pipeline(name="mat_async2", pipeline_database=InMemoryArrowDatabase()) + with pipeline: + pod(src, label="doubler") + + pipeline.compile() + orch = AsyncPipelineOrchestrator() + result = orch.run(pipeline._node_graph, materialize_results=False) + assert result.node_outputs == {} +``` + +- [ ] **Step 4: Run new tests** + +Run: `uv run pytest tests/test_pipeline/test_sync_orchestrator.py::TestMaterializeResults -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add tests/test_pipeline/test_sync_orchestrator.py +git commit -m "test(orchestrator): update parity tests + add materialize_results tests (PLT-922)" +``` + +### Task 13: Add async-specific tests (fan-out, terminal, error) + +**Files:** +- Modify: `tests/test_pipeline/test_orchestrator.py` (extend) + +- [ ] **Step 1: Add fan-out, terminal node, and error propagation tests** + +Append to `tests/test_pipeline/test_orchestrator.py`: + +```python +class TestAsyncOrchestratorFanOut: + """One source fans out to multiple downstream nodes.""" + + def test_fan_out_source_to_two_functions(self): + src = _make_source("key", "value", {"key": ["a", "b"], "value": [1, 2]}) + pf1 = PythonPacketFunction(double_value, output_keys="result") + pod1 = FunctionPod(pf1) + pf2 = PythonPacketFunction(double_value, output_keys="result") + pod2 = FunctionPod(pf2) + + pipeline = Pipeline(name="fanout", pipeline_database=InMemoryArrowDatabase()) + with pipeline: + pod1(src, label="doubler1") + pod2(src, label="doubler2") + + pipeline.compile() + orch = AsyncPipelineOrchestrator() + result = orch.run(pipeline._node_graph, materialize_results=True) + pipeline.flush() + + fn_outputs = [ + v for k, v in result.node_outputs.items() if k.node_type == "function" + ] + assert len(fn_outputs) == 2 + for output in fn_outputs: + values = sorted([pkt.as_dict()["result"] for _, pkt in output]) + assert values == [2, 4] + + +class TestAsyncOrchestratorTerminalNode: + """Terminal nodes with no downstream should work correctly.""" + + def test_single_terminal_source(self): + """A pipeline with just a source (terminal) should work.""" + src = _make_source("key", "value", {"key": ["a"], "value": [1]}) + pipeline = Pipeline(name="terminal", pipeline_database=InMemoryArrowDatabase()) + with pipeline: + # Just register the source, no downstream + pass + + # Manually build a minimal graph with just a source node + import networkx as nx + from orcapod.core.nodes import SourceNode + + node = SourceNode(src) + G = nx.DiGraph() + G.add_node(node) + + orch = AsyncPipelineOrchestrator() + result = orch.run(G, materialize_results=True) + assert len(result.node_outputs) == 1 + + +class TestAsyncOrchestratorErrorPropagation: + """Node failures should propagate correctly.""" + + def test_node_failure_propagates(self): + def failing_fn(value: int) -> int: + raise ValueError("intentional failure") + + src = _make_source("key", "value", {"key": ["a"], "value": [1]}) + pf = PythonPacketFunction(failing_fn, output_keys="result") + pod = FunctionPod(pf) + + pipeline = Pipeline(name="error", pipeline_database=InMemoryArrowDatabase()) + with pipeline: + pod(src, label="failer") + + pipeline.compile() + orch = AsyncPipelineOrchestrator() + + with pytest.raises(ExceptionGroup): + orch.run(pipeline._node_graph) +``` + +- [ ] **Step 2: Run the new tests** + +Run: `uv run pytest tests/test_pipeline/test_orchestrator.py::TestAsyncOrchestratorFanOut tests/test_pipeline/test_orchestrator.py::TestAsyncOrchestratorTerminalNode tests/test_pipeline/test_orchestrator.py::TestAsyncOrchestratorErrorPropagation -v` +Expected: PASS + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_pipeline/test_orchestrator.py +git commit -m "test(async-orchestrator): add fan-out, terminal, and error propagation tests (PLT-922)" +``` + +### Task 14: Full test suite verification and cleanup + +- [ ] **Step 1: Verify no references to removed protocol methods** + +Run these searches: +```bash +uv run grep -r "AsyncExecutableProtocol" src/ tests/ +uv run grep -r "SourceNodeProtocol.*iter_packets" src/ +uv run grep -r "FunctionNodeProtocol.*get_cached_results" src/ +uv run grep -r "FunctionNodeProtocol.*execute_packet" src/ +uv run grep -r "FunctionNodeProtocol.*compute_pipeline_entry_id" src/ +uv run grep -r "OperatorNodeProtocol.*get_cached_output" src/ +``` + +Expected: No matches. + +- [ ] **Step 2: Run full test suite one final time** + +Run: `uv run pytest tests/ -q` +Expected: All pass. + +- [ ] **Step 3: Final commit if needed** + +```bash +git add -u +git commit -m "chore(cleanup): remove stale references to old protocol methods (PLT-922)" +``` From 653c993793d78db79641d1f175988912cc68e8ea Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 18:24:45 +0000 Subject: [PATCH 04/23] refactor(protocols): slim node protocols to execute + async_execute with observer (PLT-922) Replace old fine-grained protocol methods (iter_packets, get_cached_results, compute_pipeline_entry_id, execute_packet, get_cached_output) with uniform execute/async_execute signatures that accept an observer param. Add tests verifying the new shapes and that old-only implementations no longer satisfy the protocols. Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/protocols/node_protocols.py | 58 +++++++--- tests/test_pipeline/test_node_protocols.py | 126 +++++++++++++++++++++ 2 files changed, 167 insertions(+), 17 deletions(-) create mode 100644 tests/test_pipeline/test_node_protocols.py diff --git a/src/orcapod/protocols/node_protocols.py b/src/orcapod/protocols/node_protocols.py index cf32ec7b..95677412 100644 --- a/src/orcapod/protocols/node_protocols.py +++ b/src/orcapod/protocols/node_protocols.py @@ -3,15 +3,21 @@ Defines the three node protocols (Source, Function, Operator) that formalize the interface between orchestrators and graph nodes, plus TypeGuard dispatch functions for runtime type narrowing. + +Each protocol exposes ``execute`` (sync) and ``async_execute`` (async). +Nodes own their execution — caching, per-packet logic, and persistence +are internal. Orchestrators are topology schedulers. """ from __future__ import annotations -from collections.abc import Iterator +from collections.abc import Sequence from typing import TYPE_CHECKING, Protocol, TypeGuard, runtime_checkable if TYPE_CHECKING: + from orcapod.channels import ReadableChannel, WritableChannel from orcapod.core.nodes import GraphNode + from orcapod.pipeline.observer import ExecutionObserver from orcapod.protocols.core_protocols import ( PacketProtocol, StreamProtocol, @@ -25,7 +31,18 @@ class SourceNodeProtocol(Protocol): node_type: str - def iter_packets(self) -> Iterator[tuple["TagProtocol", "PacketProtocol"]]: ... + def execute( + self, + *, + observer: "ExecutionObserver | None" = None, + ) -> list[tuple["TagProtocol", "PacketProtocol"]]: ... + + async def async_execute( + self, + output: "WritableChannel[tuple[TagProtocol, PacketProtocol]]", + *, + observer: "ExecutionObserver | None" = None, + ) -> None: ... @runtime_checkable @@ -34,22 +51,21 @@ class FunctionNodeProtocol(Protocol): node_type: str - def get_cached_results( - self, entry_ids: list[str] - ) -> dict[str, tuple["TagProtocol", "PacketProtocol"]]: ... - - def compute_pipeline_entry_id( - self, tag: "TagProtocol", packet: "PacketProtocol" - ) -> str: ... - - def execute_packet( - self, tag: "TagProtocol", packet: "PacketProtocol" - ) -> tuple["TagProtocol", "PacketProtocol | None"]: ... - def execute( - self, input_stream: "StreamProtocol" + self, + input_stream: "StreamProtocol", + *, + observer: "ExecutionObserver | None" = None, ) -> list[tuple["TagProtocol", "PacketProtocol"]]: ... + async def async_execute( + self, + input_channel: "ReadableChannel[tuple[TagProtocol, PacketProtocol]]", + output: "WritableChannel[tuple[TagProtocol, PacketProtocol]]", + *, + observer: "ExecutionObserver | None" = None, + ) -> None: ... + @runtime_checkable class OperatorNodeProtocol(Protocol): @@ -58,10 +74,18 @@ class OperatorNodeProtocol(Protocol): node_type: str def execute( - self, *input_streams: "StreamProtocol" + self, + *input_streams: "StreamProtocol", + observer: "ExecutionObserver | None" = None, ) -> list[tuple["TagProtocol", "PacketProtocol"]]: ... - def get_cached_output(self) -> "StreamProtocol | None": ... + async def async_execute( + self, + inputs: "Sequence[ReadableChannel[tuple[TagProtocol, PacketProtocol]]]", + output: "WritableChannel[tuple[TagProtocol, PacketProtocol]]", + *, + observer: "ExecutionObserver | None" = None, + ) -> None: ... def is_source_node(node: "GraphNode") -> TypeGuard[SourceNodeProtocol]: diff --git a/tests/test_pipeline/test_node_protocols.py b/tests/test_pipeline/test_node_protocols.py new file mode 100644 index 00000000..62b74b76 --- /dev/null +++ b/tests/test_pipeline/test_node_protocols.py @@ -0,0 +1,126 @@ +# tests/test_pipeline/test_node_protocols.py +"""Tests for revised node protocols.""" + +from __future__ import annotations + +import pytest +from unittest.mock import MagicMock, AsyncMock + +from orcapod.protocols.node_protocols import ( + SourceNodeProtocol, + FunctionNodeProtocol, + OperatorNodeProtocol, + is_source_node, + is_function_node, + is_operator_node, +) + + +class TestSourceNodeProtocol: + def test_requires_execute(self): + """SourceNodeProtocol requires execute method.""" + + class GoodSource: + node_type = "source" + + def execute(self, *, observer=None): + return [] + + async def async_execute(self, output, *, observer=None): + pass + + assert isinstance(GoodSource(), SourceNodeProtocol) + + def test_rejects_old_iter_packets_only(self): + """SourceNodeProtocol no longer accepts iter_packets alone.""" + + class OldSource: + node_type = "source" + + def iter_packets(self): + return iter([]) + + assert not isinstance(OldSource(), SourceNodeProtocol) + + +class TestFunctionNodeProtocol: + def test_requires_execute_and_async_execute(self): + class GoodFunction: + node_type = "function" + + def execute(self, input_stream, *, observer=None): + return [] + + async def async_execute(self, input_channel, output, *, observer=None): + pass + + assert isinstance(GoodFunction(), FunctionNodeProtocol) + + def test_rejects_old_protocol(self): + """Old protocol with get_cached_results etc. is not sufficient.""" + + class OldFunction: + node_type = "function" + + def get_cached_results(self, entry_ids): + return {} + + def compute_pipeline_entry_id(self, tag, packet): + return "" + + def execute_packet(self, tag, packet): + return (tag, None) + + def execute(self, input_stream): + return [] + + # Missing async_execute → not a valid FunctionNodeProtocol + assert not isinstance(OldFunction(), FunctionNodeProtocol) + + +class TestOperatorNodeProtocol: + def test_requires_execute_and_async_execute(self): + class GoodOperator: + node_type = "operator" + + def execute(self, *input_streams, observer=None): + return [] + + async def async_execute(self, inputs, output, *, observer=None): + pass + + assert isinstance(GoodOperator(), OperatorNodeProtocol) + + def test_rejects_old_protocol(self): + """Old protocol with get_cached_output is not sufficient.""" + + class OldOperator: + node_type = "operator" + + def execute(self, *input_streams): + return [] + + def get_cached_output(self): + return None + + # Missing async_execute → not valid + assert not isinstance(OldOperator(), OperatorNodeProtocol) + + +class TestTypeGuardDispatch: + def test_dispatch_source(self): + node = MagicMock() + node.node_type = "source" + assert is_source_node(node) + assert not is_function_node(node) + assert not is_operator_node(node) + + def test_dispatch_function(self): + node = MagicMock() + node.node_type = "function" + assert is_function_node(node) + + def test_dispatch_operator(self): + node = MagicMock() + node.node_type = "operator" + assert is_operator_node(node) From 2653ee398f69e662c3ddc0a5fc7751d13f9cfe69 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 18:43:46 +0000 Subject: [PATCH 05/23] refactor(protocols): remove AsyncExecutableProtocol (PLT-922) Co-Authored-By: Claude Sonnet 4.6 --- .../protocols/core_protocols/__init__.py | 2 - .../core_protocols/async_executable.py | 31 ---------------- tests/test_channels/test_async_execute.py | 28 +------------- .../test_channels/test_node_async_execute.py | 37 +------------------ 4 files changed, 2 insertions(+), 96 deletions(-) delete mode 100644 src/orcapod/protocols/core_protocols/async_executable.py diff --git a/src/orcapod/protocols/core_protocols/__init__.py b/src/orcapod/protocols/core_protocols/__init__.py index 1f274033..c658e86f 100644 --- a/src/orcapod/protocols/core_protocols/__init__.py +++ b/src/orcapod/protocols/core_protocols/__init__.py @@ -1,7 +1,6 @@ from orcapod.types import ColumnConfig from orcapod.protocols.hashing_protocols import PipelineElementProtocol -from .async_executable import AsyncExecutableProtocol from .datagrams import DatagramProtocol, PacketProtocol, TagProtocol from .executor import PacketFunctionExecutorProtocol, PythonFunctionExecutorProtocol from .function_pod import FunctionPodProtocol @@ -13,7 +12,6 @@ from .trackers import TrackerProtocol, TrackerManagerProtocol __all__ = [ - "AsyncExecutableProtocol", "ColumnConfig", "DatagramProtocol", "TagProtocol", diff --git a/src/orcapod/protocols/core_protocols/async_executable.py b/src/orcapod/protocols/core_protocols/async_executable.py deleted file mode 100644 index caee303d..00000000 --- a/src/orcapod/protocols/core_protocols/async_executable.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Protocol for async channel-based pipeline execution.""" - -from __future__ import annotations - -from collections.abc import Sequence -from typing import Protocol, runtime_checkable - -from orcapod.channels import ReadableChannel, WritableChannel -from orcapod.protocols.core_protocols.datagrams import PacketProtocol, TagProtocol - - -@runtime_checkable -class AsyncExecutableProtocol(Protocol): - """Unified interface for all pipeline nodes in async channel mode. - - Every pipeline node — source, operator, or function pod — implements - this single method. The orchestrator wires up channels and launches - tasks without needing to know what kind of node it is. - """ - - async def async_execute( - self, - inputs: Sequence[ReadableChannel[tuple[TagProtocol, PacketProtocol]]], - output: WritableChannel[tuple[TagProtocol, PacketProtocol]], - ) -> None: - """Consume (tag, packet) pairs from input channels, produce to output channel. - - Implementations MUST call ``await output.close()`` when done to signal - completion to downstream consumers. - """ - ... diff --git a/tests/test_channels/test_async_execute.py b/tests/test_channels/test_async_execute.py index 325464a4..6aa4124d 100644 --- a/tests/test_channels/test_async_execute.py +++ b/tests/test_channels/test_async_execute.py @@ -2,7 +2,6 @@ Comprehensive tests for async_execute on operators and FunctionPod. Covers: -- AsyncExecutableProtocol conformance - StaticOutputPod._materialize_to_stream round-trip - UnaryOperator barrier-mode async_execute (Select, Drop, Map, Filter, Batch) - BinaryOperator barrier-mode async_execute (MergeJoin, SemiJoin) @@ -42,7 +41,6 @@ from orcapod.core.operators.static_output_pod import StaticOutputOperatorPod from orcapod.core.packet_function import PythonPacketFunction from orcapod.core.streams.arrow_table_stream import ArrowTableStream -from orcapod.protocols.core_protocols import AsyncExecutableProtocol from orcapod.types import NodeConfig, PipelineConfig # --------------------------------------------------------------------------- @@ -97,31 +95,7 @@ async def collect_output(ch: Channel) -> list[tuple]: # --------------------------------------------------------------------------- -# 1. AsyncExecutableProtocol conformance -# --------------------------------------------------------------------------- - - -class TestProtocolConformance: - def test_function_pod_satisfies_protocol(self): - def double(x: int) -> int: - return x * 2 - - pf = PythonPacketFunction(double, output_keys="result") - pod = FunctionPod(pf) - assert isinstance(pod, AsyncExecutableProtocol) - - def test_join_satisfies_protocol(self): - assert isinstance(Join(), AsyncExecutableProtocol) - - def test_select_tag_columns_satisfies_protocol(self): - assert isinstance(SelectTagColumns(["id"]), AsyncExecutableProtocol) - - def test_batch_satisfies_protocol(self): - assert isinstance(Batch(), AsyncExecutableProtocol) - - -# --------------------------------------------------------------------------- -# 2. _materialize_to_stream round-trip +# 1. _materialize_to_stream round-trip # --------------------------------------------------------------------------- diff --git a/tests/test_channels/test_node_async_execute.py b/tests/test_channels/test_node_async_execute.py index 8cb5de15..af238696 100644 --- a/tests/test_channels/test_node_async_execute.py +++ b/tests/test_channels/test_node_async_execute.py @@ -2,7 +2,6 @@ Tests for async_execute on Node classes. Covers: -- AsyncExecutableProtocol conformance for all four Node types - CachedPacketFunction.async_call with cache support - FunctionNode.async_execute basic streaming - FunctionNode.async_execute two-phase logic @@ -31,7 +30,6 @@ from orcapod.core.packet_function import CachedPacketFunction, PythonPacketFunction from orcapod.core.streams import ArrowTableStream from orcapod.databases import InMemoryArrowDatabase -from orcapod.protocols.core_protocols import AsyncExecutableProtocol from orcapod.types import CacheMode, NodeConfig @@ -78,40 +76,7 @@ def double(x: int) -> int: # --------------------------------------------------------------------------- -# 1. AsyncExecutableProtocol conformance -# --------------------------------------------------------------------------- - - -class TestProtocolConformance: - def test_function_node_satisfies_protocol(self): - _, pod = make_double_pod() - stream = make_stream(3) - node = FunctionNode(pod, stream) - assert isinstance(node, AsyncExecutableProtocol) - - def test_persistent_function_node_satisfies_protocol(self): - _, pod = make_double_pod() - stream = make_stream(3) - db = InMemoryArrowDatabase() - node = FunctionNode(pod, stream, pipeline_database=db) - assert isinstance(node, AsyncExecutableProtocol) - - def test_operator_node_satisfies_protocol(self): - op = SelectPacketColumns(["x"]) - stream = make_stream(3) - node = OperatorNode(op, [stream]) - assert isinstance(node, AsyncExecutableProtocol) - - def test_persistent_operator_node_satisfies_protocol(self): - op = SelectPacketColumns(["x"]) - stream = make_stream(3) - db = InMemoryArrowDatabase() - node = OperatorNode(op, [stream], pipeline_database=db) - assert isinstance(node, AsyncExecutableProtocol) - - -# --------------------------------------------------------------------------- -# 2. CachedPacketFunction.async_call +# 1. CachedPacketFunction.async_call # --------------------------------------------------------------------------- From 04372bd3fb720b35bc27fc0577582d44ecdb4d93 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 18:44:43 +0000 Subject: [PATCH 06/23] feat(source-node): add execute() with observer injection (PLT-922) Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/core/nodes/source_node.py | 25 +++++++++++ tests/test_pipeline/test_node_protocols.py | 50 ++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/src/orcapod/core/nodes/source_node.py b/src/orcapod/core/nodes/source_node.py index 4c2f6e9c..c27417b7 100644 --- a/src/orcapod/core/nodes/source_node.py +++ b/src/orcapod/core/nodes/source_node.py @@ -234,6 +234,31 @@ def iter_packets(self) -> Iterator[tuple[cp.TagProtocol, cp.PacketProtocol]]: return iter(self._cached_results) return self.stream.iter_packets() + def execute( + self, + *, + observer: Any = None, + ) -> list[tuple[cp.TagProtocol, cp.PacketProtocol]]: + """Execute this source: materialize packets and return. + + Args: + observer: Optional execution observer for hooks. + + Returns: + List of (tag, packet) tuples. + """ + if self.stream is None: + raise RuntimeError( + "SourceNode in read-only mode has no stream data available" + ) + if observer is not None: + observer.on_node_start(self) + result = list(self.stream.iter_packets()) + self._cached_results = result + if observer is not None: + observer.on_node_end(self) + return result + def run(self) -> None: """No-op for source nodes — data is already available.""" diff --git a/tests/test_pipeline/test_node_protocols.py b/tests/test_pipeline/test_node_protocols.py index 62b74b76..12469393 100644 --- a/tests/test_pipeline/test_node_protocols.py +++ b/tests/test_pipeline/test_node_protocols.py @@ -124,3 +124,53 @@ def test_dispatch_operator(self): node = MagicMock() node.node_type = "operator" assert is_operator_node(node) + + +import pyarrow as pa +from orcapod.core.sources import ArrowTableSource +from orcapod.core.nodes import SourceNode + + +class TestSourceNodeExecute: + def _make_source_node(self): + table = pa.table({ + "key": pa.array(["a", "b", "c"], type=pa.large_string()), + "value": pa.array([1, 2, 3], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + return SourceNode(src) + + def test_execute_returns_list(self): + node = self._make_source_node() + result = node.execute() + assert isinstance(result, list) + assert len(result) == 3 + + def test_execute_populates_cached_results(self): + node = self._make_source_node() + node.execute() + assert node._cached_results is not None + assert len(node._cached_results) == 3 + + def test_execute_with_observer(self): + node = self._make_source_node() + events = [] + + class Obs: + def on_node_start(self, n): + events.append(("start", n.node_type)) + def on_node_end(self, n): + events.append(("end", n.node_type)) + def on_packet_start(self, n, t, p): + pass + def on_packet_end(self, n, t, ip, op, cached): + pass + + node.execute(observer=Obs()) + assert events == [("start", "source"), ("end", "source")] + + def test_execute_without_observer(self): + """execute() works fine with no observer.""" + node = self._make_source_node() + result = node.execute() + assert len(result) == 3 From 4cf351c2e346c98f726e573ac05b05e70a1fa7ba Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 18:48:05 +0000 Subject: [PATCH 07/23] refactor(source-node): tighten async_execute signature + observer (PLT-922) Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/core/nodes/source_node.py | 18 +++++++-- src/orcapod/pipeline/async_orchestrator.py | 17 ++++---- tests/test_pipeline/test_node_protocols.py | 45 ++++++++++++++++++++++ tests/test_pipeline/test_orchestrator.py | 4 +- 4 files changed, 71 insertions(+), 13 deletions(-) diff --git a/src/orcapod/core/nodes/source_node.py b/src/orcapod/core/nodes/source_node.py index c27417b7..61807801 100644 --- a/src/orcapod/core/nodes/source_node.py +++ b/src/orcapod/core/nodes/source_node.py @@ -2,11 +2,11 @@ from __future__ import annotations -from collections.abc import Iterator, Sequence +from collections.abc import Iterator from typing import TYPE_CHECKING, Any from orcapod import contexts -from orcapod.channels import ReadableChannel, WritableChannel +from orcapod.channels import WritableChannel from orcapod.config import Config, DEFAULT_CONFIG from orcapod.core.streams.base import StreamBase from orcapod.protocols import core_protocols as cp @@ -264,16 +264,26 @@ def run(self) -> None: async def async_execute( self, - inputs: Sequence[ReadableChannel[tuple[cp.TagProtocol, cp.PacketProtocol]]], output: WritableChannel[tuple[cp.TagProtocol, cp.PacketProtocol]], + *, + observer: Any = None, ) -> None: - """Push all (tag, packet) pairs from the wrapped stream to the output channel.""" + """Push all (tag, packet) pairs from the wrapped stream to the output channel. + + Args: + output: Channel to write results to. + observer: Optional execution observer for hooks. + """ if self.stream is None: raise RuntimeError( "SourceNode in read-only mode has no stream data available" ) try: + if observer is not None: + observer.on_node_start(self) for tag, packet in self.stream.iter_packets(): await output.send((tag, packet)) + if observer is not None: + observer.on_node_end(self) finally: await output.close() diff --git a/src/orcapod/pipeline/async_orchestrator.py b/src/orcapod/pipeline/async_orchestrator.py index 61833e1d..198a0203 100644 --- a/src/orcapod/pipeline/async_orchestrator.py +++ b/src/orcapod/pipeline/async_orchestrator.py @@ -128,15 +128,18 @@ async def _run_async( # Launch all nodes concurrently async with asyncio.TaskGroup() as tg: for node in topo_order: - # Gather input readers from upstream edges - input_readers = [ - edge_readers[(upstream, node)] - for upstream in in_edges.get(node, []) - ] - writer = node_output_channels[node].writer - tg.create_task(node.async_execute(input_readers, writer)) + if getattr(node, "node_type", None) == "source": + # SourceNode.async_execute takes only output (no inputs) + tg.create_task(node.async_execute(writer)) + else: + # Gather input readers from upstream edges + input_readers = [ + edge_readers[(upstream, node)] + for upstream in in_edges.get(node, []) + ] + tg.create_task(node.async_execute(input_readers, writer)) # Drain terminal channels so nothing is left buffered for ch in terminal_channels: diff --git a/tests/test_pipeline/test_node_protocols.py b/tests/test_pipeline/test_node_protocols.py index 12469393..2ce4957c 100644 --- a/tests/test_pipeline/test_node_protocols.py +++ b/tests/test_pipeline/test_node_protocols.py @@ -174,3 +174,48 @@ def test_execute_without_observer(self): node = self._make_source_node() result = node.execute() assert len(result) == 3 + + +import pytest +from orcapod.channels import Channel + + +class TestSourceNodeAsyncExecuteProtocol: + @pytest.mark.asyncio + async def test_tightened_signature(self): + """async_execute takes output only, no inputs.""" + table = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "value": pa.array([1, 2], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + node = SourceNode(src) + + output_ch = Channel(buffer_size=16) + await node.async_execute(output_ch.writer, observer=None) + rows = await output_ch.reader.collect() + assert len(rows) == 2 + + @pytest.mark.asyncio + async def test_async_execute_with_observer(self): + table = pa.table({ + "key": pa.array(["a"], type=pa.large_string()), + "value": pa.array([1], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + node = SourceNode(src) + events = [] + + class Obs: + def on_node_start(self, n): + events.append("start") + def on_node_end(self, n): + events.append("end") + def on_packet_start(self, n, t, p): + pass + def on_packet_end(self, n, t, ip, op, cached): + pass + + output_ch = Channel(buffer_size=16) + await node.async_execute(output_ch.writer, observer=Obs()) + assert events == ["start", "end"] diff --git a/tests/test_pipeline/test_orchestrator.py b/tests/test_pipeline/test_orchestrator.py index 2ef210ed..76036f30 100644 --- a/tests/test_pipeline/test_orchestrator.py +++ b/tests/test_pipeline/test_orchestrator.py @@ -79,7 +79,7 @@ async def test_pushes_all_rows_to_output(self): node = SourceNode(src) output_ch = Channel(buffer_size=16) - await node.async_execute([], output_ch.writer) + await node.async_execute(output_ch.writer) rows = await output_ch.reader.collect() assert len(rows) == 3 @@ -90,7 +90,7 @@ async def test_closes_channel_on_completion(self): node = SourceNode(src) output_ch = Channel(buffer_size=4) - await node.async_execute([], output_ch.writer) + await node.async_execute(output_ch.writer) rows = await output_ch.reader.collect() assert len(rows) == 1 From ff064f05212a91306254f57e95a756a19c360f38 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 18:50:45 +0000 Subject: [PATCH 08/23] feat(function-node): add observer injection to execute() (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/nodes/function_node.py | 44 ++++++++++++----- tests/test_pipeline/test_node_protocols.py | 56 ++++++++++++++++++++++ 2 files changed, 89 insertions(+), 11 deletions(-) diff --git a/src/orcapod/core/nodes/function_node.py b/src/orcapod/core/nodes/function_node.py index e5f453b1..f20b325f 100644 --- a/src/orcapod/core/nodes/function_node.py +++ b/src/orcapod/core/nodes/function_node.py @@ -486,29 +486,51 @@ def execute_packet( return self._process_packet_internal(tag, packet) def execute( - self, input_stream: StreamProtocol + self, + input_stream: StreamProtocol, + *, + observer: Any = None, ) -> list[tuple[TagProtocol, PacketProtocol]]: """Execute all packets from a stream: compute, persist, and cache. - Internal method for orchestrators. The caller must guarantee that - the input stream's identity (content hash, schema) matches - ``self._input_stream``. No validation is performed. - - More efficient than calling ``execute_packet`` per-packet when - observer hooks aren't needed. - Args: input_stream: The input stream to process. + observer: Optional execution observer for hooks. Returns: Materialized list of (tag, output_packet) pairs, excluding None outputs. """ + if observer is not None: + observer.on_node_start(self) + + # Gather entry IDs and check cache + upstream_entries = [ + (tag, packet, self.compute_pipeline_entry_id(tag, packet)) + for tag, packet in input_stream.iter_packets() + ] + entry_ids = [eid for _, _, eid in upstream_entries] + cached = self.get_cached_results(entry_ids=entry_ids) + output: list[tuple[TagProtocol, PacketProtocol]] = [] - for tag, packet in input_stream.iter_packets(): - tag_out, result = self._process_packet_internal(tag, packet) - if result is not None: + for tag, packet, entry_id in upstream_entries: + if observer is not None: + observer.on_packet_start(self, tag, packet) + + if entry_id in cached: + tag_out, result = cached[entry_id] + if observer is not None: + observer.on_packet_end(self, tag, packet, result, cached=True) output.append((tag_out, result)) + else: + tag_out, result = self._process_packet_internal(tag, packet) + if observer is not None: + observer.on_packet_end(self, tag, packet, result, cached=False) + if result is not None: + output.append((tag_out, result)) + + if observer is not None: + observer.on_node_end(self) return output def _process_packet_internal( diff --git a/tests/test_pipeline/test_node_protocols.py b/tests/test_pipeline/test_node_protocols.py index 2ce4957c..87379259 100644 --- a/tests/test_pipeline/test_node_protocols.py +++ b/tests/test_pipeline/test_node_protocols.py @@ -219,3 +219,59 @@ def on_packet_end(self, n, t, ip, op, cached): output_ch = Channel(buffer_size=16) await node.async_execute(output_ch.writer, observer=Obs()) assert events == ["start", "end"] + + +# =========================================================================== +# FunctionNode.execute() with observer +# =========================================================================== + +from orcapod.core.function_pod import FunctionPod +from orcapod.core.packet_function import PythonPacketFunction +from orcapod.core.nodes import FunctionNode + + +def double_value(value: int) -> int: + return value * 2 + + +class TestFunctionNodeExecute: + def _make_function_node(self): + table = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "value": pa.array([1, 2], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + pf = PythonPacketFunction(double_value, output_keys="result") + pod = FunctionPod(pf) + return FunctionNode(pod, src) + + def test_execute_with_observer(self): + node = self._make_function_node() + events = [] + + class Obs: + def on_node_start(self, n): + events.append(("node_start", n.node_type)) + def on_node_end(self, n): + events.append(("node_end", n.node_type)) + def on_packet_start(self, n, t, p): + events.append(("packet_start",)) + def on_packet_end(self, n, t, ip, op, cached): + events.append(("packet_end", cached)) + + input_stream = node._input_stream + result = node.execute(input_stream, observer=Obs()) + + assert len(result) == 2 + assert events[0] == ("node_start", "function") + assert events[-1] == ("node_end", "function") + packet_events = [e for e in events if e[0].startswith("packet")] + assert len(packet_events) == 4 # 2 start + 2 end + + def test_execute_without_observer(self): + node = self._make_function_node() + input_stream = node._input_stream + result = node.execute(input_stream) + assert len(result) == 2 + values = sorted([pkt.as_dict()["result"] for _, pkt in result]) + assert values == [2, 4] From 56488441370eea62661ece6262da64b2b0fc8759 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 18:56:36 +0000 Subject: [PATCH 09/23] refactor(function-node): tighten async_execute signature + observer (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/nodes/function_node.py | 104 +++++++++------------ src/orcapod/pipeline/async_orchestrator.py | 11 ++- tests/test_pipeline/test_node_protocols.py | 65 +++++++++++++ tests/test_pipeline/test_orchestrator.py | 2 +- 4 files changed, 119 insertions(+), 63 deletions(-) diff --git a/src/orcapod/core/nodes/function_node.py b/src/orcapod/core/nodes/function_node.py index f20b325f..79af083f 100644 --- a/src/orcapod/core/nodes/function_node.py +++ b/src/orcapod/core/nodes/function_node.py @@ -4,7 +4,7 @@ import asyncio import logging -from collections.abc import Iterator, Sequence +from collections.abc import Iterator from typing import TYPE_CHECKING, Any, cast from orcapod import contexts @@ -28,10 +28,7 @@ from orcapod.types import ( ColumnConfig, ContentHash, - NodeConfig, - PipelineConfig, Schema, - resolve_concurrency, ) from orcapod.utils import arrow_utils, schema_utils from orcapod.utils.lazy_module import LazyModule @@ -1163,21 +1160,25 @@ def as_table( async def async_execute( self, - inputs: Sequence[ReadableChannel[tuple[TagProtocol, PacketProtocol]]], + input_channel: ReadableChannel[tuple[TagProtocol, PacketProtocol]], output: WritableChannel[tuple[TagProtocol, PacketProtocol]], - pipeline_config: PipelineConfig | None = None, + *, + observer: Any = None, ) -> None: """Streaming async execution for FunctionNode. When a database is attached, uses two-phase execution: replay cached results first, then compute missing packets concurrently. Otherwise, routes each packet through ``async_process_packet`` directly. + + Args: + input_channel: Single readable channel of (tag, packet) pairs. + output: Writable channel for output (tag, packet) pairs. + observer: Optional execution observer for hooks. """ try: - pipeline_config = pipeline_config or PipelineConfig() - # TODO: revisit this logic as use of accidental property is not desirable - node_config = getattr(self._function_pod, "node_config", NodeConfig()) - max_concurrency = resolve_concurrency(node_config, pipeline_config) + if observer is not None: + observer.on_node_start(self) if self._cached_function_pod is not None: # Two-phase async execution with DB backing @@ -1227,60 +1228,41 @@ async def async_execute( for tag, packet in existing_stream.iter_packets(): await output.send((tag, packet)) - # Phase 2: process new packets concurrently - sem = ( - asyncio.Semaphore(max_concurrency) - if max_concurrency is not None - else None - ) - - async def process_one_db( - tag: TagProtocol, packet: PacketProtocol - ) -> None: - try: - ( - tag_out, - result_packet, - ) = await self._async_process_packet_internal(tag, packet) - if result_packet is not None: - await output.send((tag_out, result_packet)) - finally: - if sem is not None: - sem.release() - - async with asyncio.TaskGroup() as tg: - async for tag, packet in inputs[0]: - entry_id = self.compute_pipeline_entry_id(tag, packet) - if entry_id in existing_entry_ids: - continue - if sem is not None: - await sem.acquire() - tg.create_task(process_one_db(tag, packet)) + # Phase 2: process new packets + async for tag, packet in input_channel: + entry_id = self.compute_pipeline_entry_id(tag, packet) + if entry_id in existing_entry_ids: + continue + if observer is not None: + observer.on_packet_start(self, tag, packet) + ( + tag_out, + result_packet, + ) = await self._async_process_packet_internal(tag, packet) + if observer is not None: + observer.on_packet_end( + self, tag, packet, result_packet, cached=False + ) + if result_packet is not None: + await output.send((tag_out, result_packet)) else: # Simple async execution without DB - sem = ( - asyncio.Semaphore(max_concurrency) - if max_concurrency is not None - else None - ) + async for tag, packet in input_channel: + if observer is not None: + observer.on_packet_start(self, tag, packet) + ( + tag_out, + result_packet, + ) = await self._async_process_packet_internal(tag, packet) + if observer is not None: + observer.on_packet_end( + self, tag, packet, result_packet, cached=False + ) + if result_packet is not None: + await output.send((tag_out, result_packet)) - async def process_one(tag: TagProtocol, packet: PacketProtocol) -> None: - try: - ( - tag_out, - result_packet, - ) = await self._async_process_packet_internal(tag, packet) - if result_packet is not None: - await output.send((tag_out, result_packet)) - finally: - if sem is not None: - sem.release() - - async with asyncio.TaskGroup() as tg: - async for tag, packet in inputs[0]: - if sem is not None: - await sem.acquire() - tg.create_task(process_one(tag, packet)) + if observer is not None: + observer.on_node_end(self) finally: await output.close() diff --git a/src/orcapod/pipeline/async_orchestrator.py b/src/orcapod/pipeline/async_orchestrator.py index 198a0203..ae928c23 100644 --- a/src/orcapod/pipeline/async_orchestrator.py +++ b/src/orcapod/pipeline/async_orchestrator.py @@ -133,8 +133,17 @@ async def _run_async( if getattr(node, "node_type", None) == "source": # SourceNode.async_execute takes only output (no inputs) tg.create_task(node.async_execute(writer)) + elif getattr(node, "node_type", None) == "function": + # FunctionNode.async_execute takes single input channel + input_readers = [ + edge_readers[(upstream, node)] + for upstream in in_edges.get(node, []) + ] + tg.create_task( + node.async_execute(input_readers[0], writer) + ) else: - # Gather input readers from upstream edges + # OperatorNode.async_execute takes list of input channels input_readers = [ edge_readers[(upstream, node)] for upstream in in_edges.get(node, []) diff --git a/tests/test_pipeline/test_node_protocols.py b/tests/test_pipeline/test_node_protocols.py index 87379259..eb82526c 100644 --- a/tests/test_pipeline/test_node_protocols.py +++ b/tests/test_pipeline/test_node_protocols.py @@ -275,3 +275,68 @@ def test_execute_without_observer(self): assert len(result) == 2 values = sorted([pkt.as_dict()["result"] for _, pkt in result]) assert values == [2, 4] + + +# =========================================================================== +# FunctionNode.async_execute() with tightened signature +# =========================================================================== + + +class TestFunctionNodeAsyncExecute: + @pytest.mark.asyncio + async def test_tightened_signature(self): + """async_execute takes single input_channel, not Sequence.""" + table = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "value": pa.array([1, 2], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + pf = PythonPacketFunction(double_value, output_keys="result") + pod = FunctionPod(pf) + node = FunctionNode(pod, src) + + input_ch = Channel(buffer_size=16) + output_ch = Channel(buffer_size=16) + + for tag, packet in src.iter_packets(): + await input_ch.writer.send((tag, packet)) + await input_ch.writer.close() + + await node.async_execute(input_ch.reader, output_ch.writer) + rows = await output_ch.reader.collect() + assert len(rows) == 2 + values = sorted([pkt.as_dict()["result"] for _, pkt in rows]) + assert values == [2, 4] + + @pytest.mark.asyncio + async def test_async_execute_with_observer(self): + table = pa.table({ + "key": pa.array(["a"], type=pa.large_string()), + "value": pa.array([1], type=pa.int64()), + }) + src = ArrowTableSource(table, tag_columns=["key"]) + pf = PythonPacketFunction(double_value, output_keys="result") + pod = FunctionPod(pf) + node = FunctionNode(pod, src) + + events = [] + + class Obs: + def on_node_start(self, n): + events.append("node_start") + def on_node_end(self, n): + events.append("node_end") + def on_packet_start(self, n, t, p): + events.append("pkt_start") + def on_packet_end(self, n, t, ip, op, cached): + events.append("pkt_end") + + input_ch = Channel(buffer_size=16) + output_ch = Channel(buffer_size=16) + for tag, packet in src.iter_packets(): + await input_ch.writer.send((tag, packet)) + await input_ch.writer.close() + + await node.async_execute(input_ch.reader, output_ch.writer, observer=Obs()) + assert "node_start" in events + assert "node_end" in events diff --git a/tests/test_pipeline/test_orchestrator.py b/tests/test_pipeline/test_orchestrator.py index 76036f30..4256aefd 100644 --- a/tests/test_pipeline/test_orchestrator.py +++ b/tests/test_pipeline/test_orchestrator.py @@ -141,7 +141,7 @@ async def test_processes_packets(self): await input_ch.writer.send((tag, packet)) await input_ch.writer.close() - await node.async_execute([input_ch.reader], output_ch.writer) + await node.async_execute(input_ch.reader, output_ch.writer) rows = await output_ch.reader.collect() assert len(rows) == 2 From c5adcb94f21df330de78d397ab083b81c5904a96 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 18:59:33 +0000 Subject: [PATCH 10/23] feat(operator-node): add observer + cache check to execute() (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/nodes/operator_node.py | 18 +++++++-- tests/test_pipeline/test_node_protocols.py | 46 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/orcapod/core/nodes/operator_node.py b/src/orcapod/core/nodes/operator_node.py index be54ad5c..59dc8ce9 100644 --- a/src/orcapod/core/nodes/operator_node.py +++ b/src/orcapod/core/nodes/operator_node.py @@ -432,19 +432,27 @@ def get_cached_output(self) -> "StreamProtocol | None": def execute( self, *input_streams: StreamProtocol, + observer: Any = None, ) -> list[tuple[TagProtocol, PacketProtocol]]: """Execute input streams: compute, persist, and cache. - Internal method for orchestrators. The caller must guarantee that - the input streams' identities (content hash, schema) match - ``self._input_streams``. No validation is performed. - Args: *input_streams: Input streams to execute. + observer: Optional execution observer for hooks. Returns: Materialized list of (tag, packet) pairs. """ + if observer is not None: + observer.on_node_start(self) + + # Check REPLAY cache first + cached_output = self.get_cached_output() + if cached_output is not None: + output = list(cached_output.iter_packets()) + if observer is not None: + observer.on_node_end(self) + return output # Compute result_stream = self._operator.process(*input_streams) @@ -470,6 +478,8 @@ def execute( ): self._store_output_stream(self._cached_output_stream) + if observer is not None: + observer.on_node_end(self) return output def _compute_and_store(self) -> None: diff --git a/tests/test_pipeline/test_node_protocols.py b/tests/test_pipeline/test_node_protocols.py index eb82526c..47eb622f 100644 --- a/tests/test_pipeline/test_node_protocols.py +++ b/tests/test_pipeline/test_node_protocols.py @@ -340,3 +340,49 @@ def on_packet_end(self, n, t, ip, op, cached): await node.async_execute(input_ch.reader, output_ch.writer, observer=Obs()) assert "node_start" in events assert "node_end" in events + + +# =========================================================================== +# OperatorNode.execute() with observer + cache check +# =========================================================================== + +from orcapod.core.nodes import OperatorNode +from orcapod.core.operators.join import Join + + +class TestOperatorNodeExecute: + def _make_join_node(self): + table_a = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "value": pa.array([10, 20], type=pa.int64()), + }) + table_b = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "score": pa.array([100, 200], type=pa.int64()), + }) + src_a = ArrowTableSource(table_a, tag_columns=["key"]) + src_b = ArrowTableSource(table_b, tag_columns=["key"]) + return OperatorNode(Join(), input_streams=[src_a, src_b]) + + def test_execute_with_observer(self): + node = self._make_join_node() + events = [] + + class Obs: + def on_node_start(self, n): + events.append(("node_start", n.node_type)) + def on_node_end(self, n): + events.append(("node_end", n.node_type)) + def on_packet_start(self, n, t, p): + pass + def on_packet_end(self, n, t, ip, op, cached): + pass + + result = node.execute(*node._input_streams, observer=Obs()) + assert len(result) == 2 + assert events == [("node_start", "operator"), ("node_end", "operator")] + + def test_execute_without_observer(self): + node = self._make_join_node() + result = node.execute(*node._input_streams) + assert len(result) == 2 From f9708e51aedd8939679f8896b08a6724dc573e74 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 19:00:37 +0000 Subject: [PATCH 11/23] feat(operator-node): add observer to async_execute() (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/nodes/operator_node.py | 17 +++++++++ tests/test_pipeline/test_node_protocols.py | 43 ++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/src/orcapod/core/nodes/operator_node.py b/src/orcapod/core/nodes/operator_node.py index 59dc8ce9..f2e46dbd 100644 --- a/src/orcapod/core/nodes/operator_node.py +++ b/src/orcapod/core/nodes/operator_node.py @@ -638,6 +638,8 @@ async def async_execute( self, inputs: Sequence[ReadableChannel[tuple[TagProtocol, PacketProtocol]]], output: WritableChannel[tuple[TagProtocol, PacketProtocol]], + *, + observer: Any = None, ) -> None: """Async execution with cache mode handling when DB is attached. @@ -648,16 +650,28 @@ async def async_execute( - REPLAY: emit from DB, close output. - OFF: delegate to operator, forward results. - LOG: delegate to operator, forward + collect results, then store in DB. + + Args: + inputs: Sequence of readable channels from upstream nodes. + output: Writable channel for output (tag, packet) pairs. + observer: Optional execution observer for hooks. """ if self._pipeline_database is None: # Simple delegation without DB + if observer is not None: + observer.on_node_start(self) hashes = [s.pipeline_hash() for s in self._input_streams] await self._operator.async_execute( inputs, output, input_pipeline_hashes=hashes ) + if observer is not None: + observer.on_node_end(self) return try: + if observer is not None: + observer.on_node_start(self) + if self._cache_mode == CacheMode.REPLAY: self._replay_from_cache() assert self._cached_output_stream is not None @@ -694,6 +708,9 @@ async def forward() -> None: self._store_output_stream(stream) self._update_modified_time() + + if observer is not None: + observer.on_node_end(self) finally: await output.close() diff --git a/tests/test_pipeline/test_node_protocols.py b/tests/test_pipeline/test_node_protocols.py index 47eb622f..2ecdfbcb 100644 --- a/tests/test_pipeline/test_node_protocols.py +++ b/tests/test_pipeline/test_node_protocols.py @@ -386,3 +386,46 @@ def test_execute_without_observer(self): node = self._make_join_node() result = node.execute(*node._input_streams) assert len(result) == 2 + + +# =========================================================================== +# OperatorNode.async_execute() with observer +# =========================================================================== + +from orcapod.core.operators import SelectPacketColumns + + +class TestOperatorNodeAsyncExecute: + @pytest.mark.asyncio + async def test_async_execute_with_observer(self): + table_a = pa.table({ + "key": pa.array(["a", "b"], type=pa.large_string()), + "value": pa.array([10, 20], type=pa.int64()), + }) + src_a = ArrowTableSource(table_a, tag_columns=["key"]) + op = SelectPacketColumns(columns=["value"]) + op_node = OperatorNode(op, input_streams=[src_a]) + + events = [] + + class Obs: + def on_node_start(self, n): + events.append("start") + def on_node_end(self, n): + events.append("end") + def on_packet_start(self, n, t, p): + pass + def on_packet_end(self, n, t, ip, op, cached): + pass + + input_ch = Channel(buffer_size=16) + output_ch = Channel(buffer_size=16) + for tag, packet in src_a.iter_packets(): + await input_ch.writer.send((tag, packet)) + await input_ch.writer.close() + + await op_node.async_execute( + [input_ch.reader], output_ch.writer, observer=Obs() + ) + assert "start" in events + assert "end" in events From caadfb40b9e116985741d8c1023e0b48ee4c1717 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 19:05:16 +0000 Subject: [PATCH 12/23] fix(tests): update callers for tightened FunctionNode.async_execute signature (PLT-922) Update all test files that pass a list to FunctionNode.async_execute to pass a single reader instead. Update concurrency tests to reflect sequential execution (concurrency deferred to PLT-930). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../test_copilot_review_issues.py | 31 ++++++++++--------- .../test_channels/test_node_async_execute.py | 30 +++++++++--------- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/tests/test_channels/test_copilot_review_issues.py b/tests/test_channels/test_copilot_review_issues.py index 6357caa4..3c786f09 100644 --- a/tests/test_channels/test_copilot_review_issues.py +++ b/tests/test_channels/test_copilot_review_issues.py @@ -90,16 +90,16 @@ async def tracked_double(x: int) -> int: output_ch = Channel(buffer_size=16) await feed_stream_to_channel(make_stream(5), input_ch) - await node.async_execute([input_ch.reader], output_ch.writer) + await node.async_execute(input_ch.reader, output_ch.writer) results = await output_ch.reader.collect() assert len(results) == 5 values = sorted(pkt.as_dict()["result"] for _, pkt in results) assert values == [0, 2, 4, 6, 8] - # If tasks ran concurrently, peak should equal max_concurrency (5). - # If sequential, peak would be 1. - assert peak == 5, f"Expected 5 concurrent tasks but peak was {peak}" + # Concurrency limiting was removed in PLT-922 (deferred to PLT-930). + # Packets are now processed sequentially, so peak should be 1. + assert peak == 1, f"Expected sequential execution (peak=1) but peak was {peak}" # --------------------------------------------------------------------------- @@ -265,11 +265,12 @@ def test_resolve_concurrency_accepts_none(self): assert resolve_concurrency(node_config, pipeline_config) is None @pytest.mark.asyncio - async def test_semaphore_zero_causes_deadlock(self): - """Demonstrate that Semaphore(0) actually deadlocks. + async def test_max_concurrency_zero_no_deadlock(self): + """max_concurrency=0 no longer causes deadlock after PLT-922. - This is a demonstration of the bug — if resolve_concurrency returns 0, - the pipeline hangs forever. We use a timeout to detect the deadlock. + Semaphore/concurrency limiting was removed from async_execute + (deferred to PLT-930). Packets are processed sequentially regardless + of max_concurrency settings. """ async def double(x: int) -> int: @@ -285,13 +286,13 @@ async def double(x: int) -> int: await feed_stream_to_channel(make_stream(1), input_ch) - # This should deadlock because Semaphore(0) never allows acquisition. - # We use a timeout to detect the deadlock instead of hanging forever. - with pytest.raises((asyncio.TimeoutError, ValueError)): - await asyncio.wait_for( - node.async_execute([input_ch.reader], output_ch.writer), - timeout=0.5, - ) + # With concurrency removed, this completes without deadlock. + await asyncio.wait_for( + node.async_execute(input_ch.reader, output_ch.writer), + timeout=2.0, + ) + results = await output_ch.reader.collect() + assert len(results) == 1 # --------------------------------------------------------------------------- diff --git a/tests/test_channels/test_node_async_execute.py b/tests/test_channels/test_node_async_execute.py index af238696..8eca0c80 100644 --- a/tests/test_channels/test_node_async_execute.py +++ b/tests/test_channels/test_node_async_execute.py @@ -186,7 +186,7 @@ async def test_basic_streaming_matches_sync(self): output_ch = Channel(buffer_size=16) await feed_stream_to_channel(make_stream(5), input_ch) - await node_async.async_execute([input_ch.reader], output_ch.writer) + await node_async.async_execute(input_ch.reader, output_ch.writer) async_results = await output_ch.reader.collect() async_values = sorted(pkt.as_dict()["result"] for _, pkt in async_results) @@ -201,7 +201,7 @@ async def test_empty_input_closes_cleanly(self): output_ch = Channel(buffer_size=4) await input_ch.writer.close() - await node.async_execute([input_ch.reader], output_ch.writer) + await node.async_execute(input_ch.reader, output_ch.writer) results = await output_ch.reader.collect() assert results == [] @@ -216,7 +216,7 @@ async def test_tags_preserved(self): output_ch = Channel(buffer_size=16) await feed_stream_to_channel(make_stream(3), input_ch) - await node.async_execute([input_ch.reader], output_ch.writer) + await node.async_execute(input_ch.reader, output_ch.writer) results = await output_ch.reader.collect() ids = sorted(tag.as_dict()["id"] for tag, _ in results) @@ -241,7 +241,7 @@ async def test_no_cache_processes_all_inputs(self): output_ch = Channel(buffer_size=16) await feed_stream_to_channel(make_stream(3), input_ch) - await node.async_execute([input_ch.reader], output_ch.writer) + await node.async_execute(input_ch.reader, output_ch.writer) results = await output_ch.reader.collect() assert len(results) == 3 @@ -267,7 +267,7 @@ async def test_sync_run_then_async_emits_from_cache(self): # Close input immediately — no new packets await input_ch.writer.close() - await node2.async_execute([input_ch.reader], output_ch.writer) + await node2.async_execute(input_ch.reader, output_ch.writer) results = await output_ch.reader.collect() assert len(results) == 3 @@ -291,7 +291,7 @@ async def test_two_phase_cached_and_new(self): output_ch = Channel(buffer_size=16) await feed_stream_to_channel(make_stream(5), input_ch) - await node2.async_execute([input_ch.reader], output_ch.writer) + await node2.async_execute(input_ch.reader, output_ch.writer) results = await output_ch.reader.collect() values = sorted(pkt.as_dict()["result"] for _, pkt in results) @@ -319,7 +319,7 @@ async def slow_double(x: int) -> int: await feed_stream_to_channel(make_stream(5), input_ch) t0 = time.perf_counter() - await node.async_execute([input_ch.reader], output_ch.writer) + await node.async_execute(input_ch.reader, output_ch.writer) elapsed = time.perf_counter() - t0 results = await output_ch.reader.collect() @@ -327,9 +327,9 @@ async def slow_double(x: int) -> int: values = sorted(pkt.as_dict()["result"] for _, pkt in results) assert values == [0, 2, 4, 6, 8] - # With 5 packets at 0.2s each and max_concurrency=5, - # concurrent execution should complete in ~0.2s, not ~1.0s - assert elapsed < 0.6, f"Expected concurrent execution but took {elapsed:.2f}s" + # Concurrency limiting removed in PLT-922 (deferred to PLT-930). + # Packets are now processed sequentially. + assert elapsed >= 0.9, f"Expected sequential execution but took {elapsed:.2f}s" @pytest.mark.asyncio async def test_db_records_created(self): @@ -343,7 +343,7 @@ async def test_db_records_created(self): output_ch = Channel(buffer_size=16) await feed_stream_to_channel(make_stream(3), input_ch) - await node.async_execute([input_ch.reader], output_ch.writer) + await node.async_execute(input_ch.reader, output_ch.writer) await output_ch.reader.collect() # Verify records in DB @@ -619,7 +619,7 @@ async def patched(tag, packet): output_ch = Channel(buffer_size=16) await feed_stream_to_channel(make_stream(3), input_ch) - await node.async_execute([input_ch.reader], output_ch.writer) + await node.async_execute(input_ch.reader, output_ch.writer) await output_ch.reader.collect() assert len(call_log) == 3 @@ -653,7 +653,7 @@ async def source(): async with asyncio.TaskGroup() as tg: tg.create_task(source()) - tg.create_task(node.async_execute([ch1.reader], ch2.writer)) + tg.create_task(node.async_execute(ch1.reader, ch2.writer)) results = await ch2.reader.collect() assert len(results) == 4 @@ -727,7 +727,7 @@ async def source_producer(): async with asyncio.TaskGroup() as tg: tg.create_task(source_producer()) - tg.create_task(node.async_execute([input_ch.reader], output_ch.writer)) + tg.create_task(node.async_execute(input_ch.reader, output_ch.writer)) async_results = await output_ch.reader.collect() async_values = sorted(pkt.as_dict()["result"] for _, pkt in async_results) @@ -846,7 +846,7 @@ async def source_producer(): async with asyncio.TaskGroup() as tg: tg.create_task(source_producer()) - tg.create_task(fn_node.async_execute([ch_source.reader], ch_mid.writer)) + tg.create_task(fn_node.async_execute(ch_source.reader, ch_mid.writer)) tg.create_task(op_node.async_execute([ch_mid.reader], ch_out.writer)) final_results = await ch_out.reader.collect() From f051e6d8fb2d153f028eb49598f85943fdcaa1a2 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 19:08:58 +0000 Subject: [PATCH 13/23] refactor(sync-orchestrator): delegate to node.execute(), remove per-packet logic (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/pipeline/sync_orchestrator.py | 96 +++++------------------ 1 file changed, 20 insertions(+), 76 deletions(-) diff --git a/src/orcapod/pipeline/sync_orchestrator.py b/src/orcapod/pipeline/sync_orchestrator.py index 21c408ff..193c355a 100644 --- a/src/orcapod/pipeline/sync_orchestrator.py +++ b/src/orcapod/pipeline/sync_orchestrator.py @@ -1,7 +1,7 @@ """Synchronous pipeline orchestrator. -Walks a compiled pipeline's node graph topologically, executing each node -with materialized buffers and per-packet observer hooks for function nodes. +Walks a compiled pipeline's node graph topologically, delegating to each +node's ``execute()`` method with observer injection. """ from __future__ import annotations @@ -9,7 +9,6 @@ import logging from typing import TYPE_CHECKING, Any -from orcapod.pipeline.observer import NoOpObserver from orcapod.pipeline.result import OrchestratorResult from orcapod.protocols.node_protocols import ( is_function_node, @@ -27,26 +26,21 @@ class SyncPipelineOrchestrator: - """Execute a compiled pipeline synchronously with observer hooks. + """Execute a compiled pipeline synchronously via node ``execute()`` methods. - Walks the node graph in topological order. For each node: + Walks the node graph in topological order. For each node, delegates + to ``node.execute(observer=...)`` which owns all per-packet logic, + cache lookups, and observer hooks internally. - - **SourceNode**: materializes ``iter_packets()`` into an - orchestrator-owned buffer. The source node itself does not cache. - - **FunctionNode**: per-packet execution with cache lookup and - observer hooks. ``execute_packet`` handles computation + - function-level memoization. - - **OperatorNode**: bulk execution via ``node.execute()``. - - The orchestrator returns an ``OrchestratorResult`` with all node outputs. + The orchestrator is responsible only for topological ordering, + buffer management, and stream materialization between nodes. Args: - observer: Optional execution observer for hooks. Defaults to - ``NoOpObserver``. + observer: Optional execution observer forwarded to nodes. """ def __init__(self, observer: "ExecutionObserver | None" = None) -> None: - self._observer = observer or NoOpObserver() + self._observer = observer def run( self, @@ -72,13 +66,19 @@ def run( for node in topo_order: if is_source_node(node): - buffers[node] = self._execute_source(node) + buffers[node] = node.execute(observer=self._observer) elif is_function_node(node): - upstream_buffer = self._gather_upstream(node, graph, buffers) - buffers[node] = self._execute_function(node, upstream_buffer) + upstream_buf = self._gather_upstream(node, graph, buffers) + upstream_node = list(graph.predecessors(node))[0] + input_stream = self._materialize_as_stream(upstream_buf, upstream_node) + buffers[node] = node.execute(input_stream, observer=self._observer) elif is_operator_node(node): upstream_buffers = self._gather_upstream_multi(node, graph, buffers) - buffers[node] = self._execute_operator(node, upstream_buffers) + input_streams = [ + self._materialize_as_stream(buf, upstream_node) + for buf, upstream_node in upstream_buffers + ] + buffers[node] = node.execute(*input_streams, observer=self._observer) else: raise TypeError( f"Unknown node type: {getattr(node, 'node_type', None)!r}" @@ -91,62 +91,6 @@ def run( return OrchestratorResult(node_outputs=buffers) - def _execute_source(self, node: Any) -> list[tuple[Any, Any]]: - """Execute a source node: materialize its packets.""" - self._observer.on_node_start(node) - output = list(node.iter_packets()) - self._observer.on_node_end(node) - return output - - def _execute_function( - self, node: Any, upstream_buffer: list[tuple[Any, Any]] - ) -> list[tuple[Any, Any]]: - """Execute a function node with per-packet hooks.""" - self._observer.on_node_start(node) - - upstream_entries = [ - (tag, packet, node.compute_pipeline_entry_id(tag, packet)) - for tag, packet in upstream_buffer - ] - entry_ids = [eid for _, _, eid in upstream_entries] - - cached = node.get_cached_results(entry_ids=entry_ids) - - output: list[tuple[Any, Any]] = [] - for tag, packet, entry_id in upstream_entries: - self._observer.on_packet_start(node, tag, packet) - if entry_id in cached: - tag_out, result = cached[entry_id] - self._observer.on_packet_end(node, tag, packet, result, cached=True) - output.append((tag_out, result)) - else: - tag_out, result = node.execute_packet(tag, packet) - self._observer.on_packet_end(node, tag, packet, result, cached=False) - if result is not None: - output.append((tag_out, result)) - - self._observer.on_node_end(node) - return output - - def _execute_operator( - self, node: Any, upstream_buffers: list[tuple[list[tuple[Any, Any]], Any]] - ) -> list[tuple[Any, Any]]: - """Execute an operator node: bulk stream processing.""" - self._observer.on_node_start(node) - - cached = node.get_cached_output() - if cached is not None: - output = list(cached.iter_packets()) - else: - input_streams = [ - self._materialize_as_stream(buf, upstream_node) - for buf, upstream_node in upstream_buffers - ] - output = node.execute(*input_streams) - - self._observer.on_node_end(node) - return output - @staticmethod def _gather_upstream( node: Any, graph: "nx.DiGraph", buffers: dict[Any, list[tuple[Any, Any]]] From 33e40e6eff8f7cbc37309e4e4c200d001eafae7f Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 19:10:42 +0000 Subject: [PATCH 14/23] refactor(async-orchestrator): use node protocols, graph interface, OrchestratorResult (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/pipeline/async_orchestrator.py | 173 +++++++++++++-------- tests/test_pipeline/test_orchestrator.py | 67 ++++---- 2 files changed, 141 insertions(+), 99 deletions(-) diff --git a/src/orcapod/pipeline/async_orchestrator.py b/src/orcapod/pipeline/async_orchestrator.py index ae928c23..4fb83c0d 100644 --- a/src/orcapod/pipeline/async_orchestrator.py +++ b/src/orcapod/pipeline/async_orchestrator.py @@ -1,10 +1,8 @@ """Async pipeline orchestrator for push-based channel execution. -Walks a compiled ``Pipeline``'s persistent node graph and launches all -nodes concurrently via ``asyncio.TaskGroup``, wiring them together with -bounded channels. After execution, results are available in the -pipeline databases via the usual ``get_all_records()`` / ``as_source()`` -accessors on each persistent node. +Walks a compiled pipeline's node graph and launches all nodes concurrently +via ``asyncio.TaskGroup``, wiring them together with bounded channels. +Uses TypeGuard dispatch with tightened per-type async_execute signatures. """ from __future__ import annotations @@ -15,93 +13,100 @@ from typing import TYPE_CHECKING, Any from orcapod.channels import BroadcastChannel, Channel -from orcapod.types import PipelineConfig +from orcapod.pipeline.result import OrchestratorResult +from orcapod.protocols.node_protocols import ( + is_function_node, + is_operator_node, + is_source_node, +) if TYPE_CHECKING: import networkx as nx - from orcapod.pipeline.graph import Pipeline + from orcapod.pipeline.observer import ExecutionObserver + from orcapod.protocols.core_protocols import PacketProtocol, TagProtocol logger = logging.getLogger(__name__) class AsyncPipelineOrchestrator: - """Execute a compiled ``Pipeline`` asynchronously using channels. + """Execute a compiled pipeline asynchronously using channels. - After ``Pipeline.compile()``, the orchestrator: + After compilation, the orchestrator: - 1. Walks ``Pipeline._node_graph`` (persistent nodes) in topological - order. + 1. Walks the node graph in topological order. 2. Creates bounded channels (or broadcast channels for fan-out) between connected nodes. 3. Launches every node's ``async_execute`` concurrently via - ``asyncio.TaskGroup``. + ``asyncio.TaskGroup``, using TypeGuard dispatch for per-type + signatures. - Results are written to the pipeline databases by the persistent - nodes themselves (``FunctionNode``, ``OperatorNode`` - in LOG mode, etc.). After ``run()`` returns, callers retrieve data - via ``pipeline.