Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
44491f6
docs(specs): add async orchestrator refactor design spec (PLT-922)
eywalker Mar 15, 2026
8117a7e
docs(specs): address spec review feedback for PLT-922
eywalker Mar 15, 2026
7916d90
docs(plans): add async orchestrator refactor implementation plan (PLT…
eywalker Mar 15, 2026
653c993
refactor(protocols): slim node protocols to execute + async_execute w…
eywalker Mar 15, 2026
2653ee3
refactor(protocols): remove AsyncExecutableProtocol (PLT-922)
eywalker Mar 15, 2026
04372bd
feat(source-node): add execute() with observer injection (PLT-922)
eywalker Mar 15, 2026
4cf351c
refactor(source-node): tighten async_execute signature + observer (PL…
eywalker Mar 15, 2026
ff064f0
feat(function-node): add observer injection to execute() (PLT-922)
eywalker Mar 15, 2026
5648844
refactor(function-node): tighten async_execute signature + observer (…
eywalker Mar 15, 2026
c5adcb9
feat(operator-node): add observer + cache check to execute() (PLT-922)
eywalker Mar 15, 2026
f9708e5
feat(operator-node): add observer to async_execute() (PLT-922)
eywalker Mar 15, 2026
caadfb4
fix(tests): update callers for tightened FunctionNode.async_execute s…
eywalker Mar 15, 2026
f051e6d
refactor(sync-orchestrator): delegate to node.execute(), remove per-p…
eywalker Mar 15, 2026
33e40e6
refactor(async-orchestrator): use node protocols, graph interface, Or…
eywalker Mar 15, 2026
77dd47f
refactor(pipeline): update run() to pass graph to orchestrators, remo…
eywalker Mar 15, 2026
9e716f4
test(orchestrator): add materialize_results tests (PLT-922)
eywalker Mar 15, 2026
a185a04
test(async-orchestrator): add fan-out, terminal, and error propagatio…
eywalker Mar 15, 2026
886a167
fix(review): type-safe observer, robust writer, terminal drain, opera…
eywalker Mar 15, 2026
0927a52
test(observer): add observer injection tests for both orchestrators (…
eywalker Mar 15, 2026
083f1ae
fix(review): address Copilot PR review feedback (PLT-922)
eywalker Mar 15, 2026
a2ab693
docs: add PR review response workflow to CLAUDE.md and .zed/rules
eywalker Mar 15, 2026
fa67ff9
docs: require Linear issue linking for all work
eywalker Mar 15, 2026
862f07a
docs: add branch naming and multi-issue PR tracking guidelines
eywalker Mar 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions .zed/rules
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,16 @@ import path doesn't work, create a proper re-export package with an __init__.py
Use Google style (https://google.github.io/styleguide/pyguide.html#38-comments-and-docstrings)
Python docstrings everywhere.

## Linear issues
## Linear issue tracking

All work must be linked to a Linear issue. Before starting any feature, bug fix, or
refactor:

1. Check for an existing issue — search Linear for a corresponding issue.
2. If none exists — ask the developer whether to create one. Do not proceed without
either a linked issue or explicit approval to skip.
3. When a new issue is discovered during development (bug, design problem, deferred
work), create a corresponding Linear issue using the template below.

When creating Linear issues, always use this template for the description:

Expand Down Expand Up @@ -82,8 +91,28 @@ When creating Linear issues, always use this template for the description:

Remove any optional sections that don't apply rather than leaving them empty.

When working on a Linear issue, create and checkout a git branch using the gitBranchName
returned by Linear (e.g. eywalker/plt-911-add-documentation-for-orcapod-python).
### Branches and PRs

When working on a feature, create and checkout a git branch using the gitBranchName
returned by the primary Linear issue (e.g. eywalker/plt-911-add-documentation-for-orcapod-python).

If a feature branch / PR corresponds to multiple Linear issues, list all of them in the
PR description body so that Linear's GitHub integration auto-tracks the PR against each
issue. Use the format "Fixes PLT-123" or "Closes PLT-123" (GitHub magic words) for issues
that the PR fully resolves, and simply mention "PLT-456" for issues that are related but
not fully resolved by the PR.

## Responding to PR reviews

When asked to respond to PR reviewer comments:

1. **Fetch and present** — Read all review comments, then present a response plan as a table:
each comment, its severity, whether to fix or explain, and the proposed action.
2. **Wait for approval** — Let the user approve the plan before making changes.
3. **Fix, then reply** — Make all fixes in a single commit, then post replies to each
reviewer comment explaining what was done (or why it was declined).

Never make fixes silently or skip the plan step.

## Git commits

Expand Down
35 changes: 32 additions & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,16 @@ import path doesn't work, create a proper re-export package with an `__init__.py
Use [Google style](https://google.github.io/styleguide/pyguide.html#38-comments-and-docstrings)
Python docstrings everywhere.

## Linear issues
## Linear issue tracking

All work must be linked to a Linear issue. Before starting any feature, bug fix, or
refactor:

1. **Check for an existing issue** — search Linear for a corresponding issue.
2. **If none exists** — ask the developer whether to create one. Do not proceed without
either a linked issue or explicit approval to skip.
3. **When a new issue is discovered** during development (bug, design problem, deferred
work), create a corresponding Linear issue using the template below.

When creating Linear issues, always use this template for the description:

Expand Down Expand Up @@ -88,8 +97,28 @@ Out of scope:

Remove any optional sections that don't apply rather than leaving them empty.

When working on a Linear issue, create and checkout a git branch using the `gitBranchName`
returned by Linear (e.g. `eywalker/plt-911-add-documentation-for-orcapod-python`).
### Branches and PRs

When working on a feature, create and checkout a git branch using the `gitBranchName`
returned by the primary Linear issue (e.g. `eywalker/plt-911-add-documentation-for-orcapod-python`).

If a feature branch / PR corresponds to multiple Linear issues, list all of them in the
PR description body so that Linear's GitHub integration auto-tracks the PR against each
issue. Use the format `Fixes PLT-123` or `Closes PLT-123` (GitHub magic words) for issues
that the PR fully resolves, and simply mention `PLT-456` for issues that are related but
not fully resolved by the PR.

## Responding to PR reviews

When asked to respond to PR reviewer comments:

1. **Fetch and present** — Read all review comments, then present a response plan as a table:
each comment, its severity, whether to fix or explain, and the proposed action.
2. **Wait for approval** — Let the user approve the plan before making changes.
3. **Fix, then reply** — Make all fixes in a single commit, then post replies to each
reviewer comment explaining what was done (or why it was declined).

Never make fixes silently or skip the plan step.

## Git commits

Expand Down
170 changes: 93 additions & 77 deletions src/orcapod/core/nodes/function_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import asyncio
import logging
from collections.abc import Iterator, Sequence
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, cast

from orcapod import contexts
Expand All @@ -28,10 +28,7 @@
from orcapod.types import (
ColumnConfig,
ContentHash,
NodeConfig,
PipelineConfig,
Schema,
resolve_concurrency,
)
from orcapod.utils import arrow_utils, schema_utils
from orcapod.utils.lazy_module import LazyModule
Expand All @@ -41,6 +38,8 @@
if TYPE_CHECKING:
import polars as pl
import pyarrow as pa

from orcapod.pipeline.observer import ExecutionObserver
else:
pa = LazyModule("pyarrow")
pl = LazyModule("polars")
Expand Down Expand Up @@ -486,29 +485,51 @@ def execute_packet(
return self._process_packet_internal(tag, packet)

def execute(
self, input_stream: StreamProtocol
self,
input_stream: StreamProtocol,
*,
observer: "ExecutionObserver | None" = None,
) -> list[tuple[TagProtocol, PacketProtocol]]:
"""Execute all packets from a stream: compute, persist, and cache.

Internal method for orchestrators. The caller must guarantee that
the input stream's identity (content hash, schema) matches
``self._input_stream``. No validation is performed.

More efficient than calling ``execute_packet`` per-packet when
observer hooks aren't needed.

Args:
input_stream: The input stream to process.
observer: Optional execution observer for hooks.

Returns:
Materialized list of (tag, output_packet) pairs, excluding
None outputs.
"""
if observer is not None:
observer.on_node_start(self)

# Gather entry IDs and check cache
upstream_entries = [
(tag, packet, self.compute_pipeline_entry_id(tag, packet))
for tag, packet in input_stream.iter_packets()
]
entry_ids = [eid for _, _, eid in upstream_entries]
cached = self.get_cached_results(entry_ids=entry_ids)

output: list[tuple[TagProtocol, PacketProtocol]] = []
for tag, packet in input_stream.iter_packets():
tag_out, result = self._process_packet_internal(tag, packet)
if result is not None:
for tag, packet, entry_id in upstream_entries:
if observer is not None:
observer.on_packet_start(self, tag, packet)

if entry_id in cached:
tag_out, result = cached[entry_id]
if observer is not None:
observer.on_packet_end(self, tag, packet, result, cached=True)
output.append((tag_out, result))
else:
tag_out, result = self._process_packet_internal(tag, packet)
if observer is not None:
observer.on_packet_end(self, tag, packet, result, cached=False)
if result is not None:
output.append((tag_out, result))

if observer is not None:
observer.on_node_end(self)
return output

def _process_packet_internal(
Expand Down Expand Up @@ -1141,27 +1162,35 @@ def as_table(

async def async_execute(
self,
inputs: Sequence[ReadableChannel[tuple[TagProtocol, PacketProtocol]]],
input_channel: ReadableChannel[tuple[TagProtocol, PacketProtocol]],
output: WritableChannel[tuple[TagProtocol, PacketProtocol]],
pipeline_config: PipelineConfig | None = None,
*,
observer: "ExecutionObserver | None" = None,
) -> None:
"""Streaming async execution for FunctionNode.

When a database is attached, uses two-phase execution: replay cached
results first, then compute missing packets concurrently. Otherwise,
routes each packet through ``async_process_packet`` directly.

Args:
input_channel: Single readable channel of (tag, packet) pairs.
output: Writable channel for output (tag, packet) pairs.
observer: Optional execution observer for hooks.
"""
# TODO(PLT-930): Restore concurrency limiting (semaphore) via node-level config.
# Currently all packets are processed sequentially in async_execute.
try:
pipeline_config = pipeline_config or PipelineConfig()
# TODO: revisit this logic as use of accidental property is not desirable
node_config = getattr(self._function_pod, "node_config", NodeConfig())
max_concurrency = resolve_concurrency(node_config, pipeline_config)
if observer is not None:
observer.on_node_start(self)

if self._cached_function_pod is not None:
# Two-phase async execution with DB backing
# Phase 1: emit existing results from DB
# DB-backed async execution:
# Phase 1: build cache lookup from pipeline DB
PIPELINE_ENTRY_ID_COL = "__pipeline_entry_id"
existing_entry_ids: set[str] = set()
cached_by_entry_id: dict[
str, tuple[TagProtocol, PacketProtocol]
] = {}

taginfo = self._pipeline_database.get_all_records(
self.pipeline_path,
Expand All @@ -1184,12 +1213,9 @@ async def async_execute(
)
if joined.num_rows > 0:
tag_keys = self._input_stream.keys()[0]
existing_entry_ids = set(
cast(
list[str],
joined.column(PIPELINE_ENTRY_ID_COL).to_pylist(),
)
)
entry_ids_col = joined.column(
PIPELINE_ENTRY_ID_COL
).to_pylist()
drop_cols = [
c
for c in joined.column_names
Expand All @@ -1202,63 +1228,53 @@ async def async_execute(
existing_stream = ArrowTableStream(
data_table, tag_columns=tag_keys
)
for tag, packet in existing_stream.iter_packets():
await output.send((tag, packet))

# Phase 2: process new packets concurrently
sem = (
asyncio.Semaphore(max_concurrency)
if max_concurrency is not None
else None
)
for eid, (tag_out, pkt_out) in zip(
entry_ids_col, existing_stream.iter_packets()
):
cached_by_entry_id[eid] = (tag_out, pkt_out)

async def process_one_db(
tag: TagProtocol, packet: PacketProtocol
) -> None:
try:
# Phase 2: drive output from input channel — cached or compute
async for tag, packet in input_channel:
entry_id = self.compute_pipeline_entry_id(tag, packet)
if entry_id in cached_by_entry_id:
tag_out, result_packet = cached_by_entry_id[entry_id]
if observer is not None:
observer.on_packet_start(self, tag, packet)
observer.on_packet_end(
self, tag, packet, result_packet, cached=True
)
await output.send((tag_out, result_packet))
else:
if observer is not None:
observer.on_packet_start(self, tag, packet)
(
tag_out,
result_packet,
) = await self._async_process_packet_internal(tag, packet)
if observer is not None:
observer.on_packet_end(
self, tag, packet, result_packet, cached=False
)
if result_packet is not None:
await output.send((tag_out, result_packet))
finally:
if sem is not None:
sem.release()

async with asyncio.TaskGroup() as tg:
async for tag, packet in inputs[0]:
entry_id = self.compute_pipeline_entry_id(tag, packet)
if entry_id in existing_entry_ids:
continue
if sem is not None:
await sem.acquire()
tg.create_task(process_one_db(tag, packet))
else:
# Simple async execution without DB
sem = (
asyncio.Semaphore(max_concurrency)
if max_concurrency is not None
else None
)
async for tag, packet in input_channel:
if observer is not None:
observer.on_packet_start(self, tag, packet)
(
tag_out,
result_packet,
) = await self._async_process_packet_internal(tag, packet)
if observer is not None:
observer.on_packet_end(
self, tag, packet, result_packet, cached=False
)
if result_packet is not None:
await output.send((tag_out, result_packet))

async def process_one(tag: TagProtocol, packet: PacketProtocol) -> None:
try:
(
tag_out,
result_packet,
) = await self._async_process_packet_internal(tag, packet)
if result_packet is not None:
await output.send((tag_out, result_packet))
finally:
if sem is not None:
sem.release()

async with asyncio.TaskGroup() as tg:
async for tag, packet in inputs[0]:
if sem is not None:
await sem.acquire()
tg.create_task(process_one(tag, packet))
if observer is not None:
observer.on_node_end(self)
finally:
await output.close()

Expand Down
Loading
Loading