From 88ba5ba51099165be485b0d29dd8b73d97f73eac Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 21:37:20 +0000 Subject: [PATCH 01/12] docs(spec): add PLT-931 design spec for PacketFunctionProxy Introduces PacketFunctionProxy to fix read-only pipeline loading when the original packet function is unavailable. The proxy satisfies PacketFunctionProtocol, enabling full FunctionNode construction with CachedFunctionPod and DB access for cached data retrieval. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../2026-03-15-stub-packet-function-design.md | 458 ++++++++++++++++++ 1 file changed, 458 insertions(+) create mode 100644 superpowers/specs/2026-03-15-stub-packet-function-design.md diff --git a/superpowers/specs/2026-03-15-stub-packet-function-design.md b/superpowers/specs/2026-03-15-stub-packet-function-design.md new file mode 100644 index 00000000..9992d463 --- /dev/null +++ b/superpowers/specs/2026-03-15-stub-packet-function-design.md @@ -0,0 +1,458 @@ +# PLT-931: PacketFunctionProxy for Read-Only Pipeline Loading + +## Problem + +When a pipeline is saved to JSON and loaded in an environment where the original +packet function (Python callable) is unavailable, `FunctionPod.from_config()` fails +and the loading code falls back to constructing a `FunctionNode` in read-only mode +with `function_pod=None`. This causes: + +1. **`iter_packets()`** crashes — calls `_input_stream.iter_packets()` on `None` +2. **`get_all_records()`** returns `None` — short-circuits because + `_cached_function_pod is None`, even though the DB and record paths are known +3. **`as_table()`** crashes — delegates to `iter_packets()` +4. **Downstream nodes** are marked read-only because upstream `load_status != FULL`, + blocking all downstream computation even for operators and function pods whose + functions *are* available + +The root cause: `FunctionNode.from_descriptor()` bypasses `__init__` entirely when +`function_pod=None`, leaving all internal state as `None`. Every code path that +touches the function pod, packet function, input stream, or cached function pod +fails. + +## Solution: PacketFunctionProxy + +Introduce a `PacketFunctionProxy` (consistent with the existing `SourceProxy` +naming pattern) that carries all metadata of the original packet function +(schemas, identity, URI) but raises a clear error when invoked. This allows the +full `FunctionNode.__init__` → `attach_databases()` → `CachedFunctionPod` path +to work, so cached data access flows through existing code paths unchanged. + +The proxy also supports **late binding**: a real packet function can be bound +to the proxy after loading via `bind()`. The proxy wraps (not replaces) the +bound function, preserving the fact that the function was not loaded originally. + +### Design principles + +- **No special-casing in FunctionNode**: The proxy satisfies + `PacketFunctionProtocol`, so `FunctionNode.__init__`, `CachedFunctionPod`, + `output_schema()`, `keys()`, `pipeline_path`, and `get_all_records()` all + work without modification. +- **Clear failure at point of invocation**: `call()` / `async_call()` / + `direct_call()` / `direct_async_call()` raise `PacketFunctionUnavailableError` + with a message naming the function and explaining only cached results are + available — unless a real function has been bound via `bind()`. +- **Correct identity**: The proxy reproduces the same `content_hash()` and + `pipeline_hash()` as the original packet function, so DB path scoping and + record lookups work correctly. +- **Downstream nodes can compute**: Because the `FunctionNode` with a proxy is + fully constructed (with DB-backed `CachedFunctionPod`), it can serve cached + data via `iter_packets()` Phase 1. Downstream operators and function pods + (whose functions are available) can run in full mode on that data. +- **Late binding preserves provenance**: `bind()` wraps the bound function + inside the proxy rather than replacing the proxy, so the proxy remains in the + call chain. This preserves the information that the function was not loaded + from the serialized config — useful for provenance tracking and debugging. +- **Binding validates identity**: `bind()` verifies that the provided packet + function matches the proxy's stored identity (URI, schemas, content hash) + before accepting it. If any identifying information mismatches, `bind()` + raises `ValueError` with a clear message describing which fields differ. + This prevents silently substituting a different function for the original. + +## Components + +### 1. `PacketFunctionUnavailableError` (in `errors.py`) + +```python +class PacketFunctionUnavailableError(RuntimeError): + """Raised when a packet function proxy is invoked without a bound function. + + This occurs when a pipeline is loaded in an environment where the + original packet function is not available. Only cached results can + be accessed. + """ +``` + +Subclasses `RuntimeError` so existing `except RuntimeError` handlers still catch +it, while allowing callers to catch this specific condition. + +### 2. `PacketFunctionProxy` (new file: `src/orcapod/core/packet_function_proxy.py`) + +A concrete implementation of `PacketFunctionProtocol` that stands in for an +unavailable packet function, preserving all identifying metadata and supporting +optional late binding of a real function. + +#### Metadata and identity + +- **Stores metadata from the serialized config**: `packet_function_type_id`, + `canonical_function_name`, `major_version`, `minor_version_string`, + `input_packet_schema`, `output_packet_schema` +- **Stores the original `uri` tuple**: Extracted from + `config["uri"]` (the `FunctionPod.to_config()` output includes it). + The `uri` property is overridden to return this stored value directly, + avoiding any recomputation via `semantic_hasher.hash_object()`. This is + critical because the schema round-trip (`Python type → Arrow string → + Python type`) may not produce identical hash inputs, which would cause + `CachedFunctionPod` to look in the wrong DB path. +- **Stores pre-computed hashes**: `content_hash` and `pipeline_hash` strings + from the pipeline descriptor, overriding `content_hash()` and + `pipeline_hash()` to return them directly (no recomputation needed) +- **`to_config()`** returns the original config dict (round-trip preservation), + regardless of whether a function has been bound +- **`from_config()`** class method constructs from the same config dict that + `PythonPacketFunction.to_config()` produces, extracting schemas and metadata + without importing the callable + +#### Invocation behavior + +When **unbound** (no real function has been bound via `bind()`): +- `call()`, `async_call()`, `direct_call()`, `direct_async_call()` all raise + `PacketFunctionUnavailableError` with a message like: + *"Packet function 'my_func' is not available in this environment. + Only cached results can be accessed."* +- `get_function_variation_data()` and `get_execution_data()` return empty dicts. + Note: these are only called by `CachedFunctionPod.process_packet()` when + *storing* a new result. Since the proxy raises + `PacketFunctionUnavailableError` before the store call is reached, the empty + dicts are never actually persisted. +- `executor` property returns `None`; setter is a **no-op** (silently ignored). + This avoids conflicts with code paths like `Pipeline._apply_execution_engine` + that iterate all function nodes and set executors — raising here would break + pipeline loading unnecessarily. + +When **bound** (a real function has been attached via `bind()`): +- `call()`, `async_call()`, `direct_call()`, `direct_async_call()` delegate + to the bound function +- `get_function_variation_data()` and `get_execution_data()` delegate to the + bound function +- `executor` property and setter delegate to the bound function + +#### Constructor signature + +```python +class PacketFunctionProxy(PacketFunctionBase): + def __init__( + self, + config: dict[str, Any], + content_hash_str: str | None = None, + pipeline_hash_str: str | None = None, + ) -> None: +``` + +The `config` parameter is the `packet_function` config dict from the serialized +pipeline (same format as `PythonPacketFunction.to_config()` output). The hash +strings come from the node descriptor. + +**Version string handling**: `PacketFunctionBase.__init__` requires a `version` +string matching the regex `\D*(\d+)\.(.*)`. The proxy extracts this from +`config["config"]["version"]` (the same field `PythonPacketFunction.to_config()` +writes) and passes it to `super().__init__(version=...)`. + +#### Schema reconstruction + +The config contains `input_packet_schema` and `output_packet_schema` as +`dict[str, str]` (Arrow type strings). The proxy uses +`deserialize_schema()` from `pipeline.serialization` to reconstruct them +as `Schema` objects with Python types. **This must happen eagerly during +`__init__`** (not lazily), because `CachedFunctionPod` accesses +`output_packet_schema` immediately during its initialization chain via +`WrappedFunctionPod.uri → _FunctionPodBase.uri → semantic_hasher.hash_object( +self.packet_function.output_packet_schema)`. + +However, since the proxy **overrides the `uri` property** to return the stored +original URI, the `semantic_hasher.hash_object()` call on the schema never +actually happens for the proxy. The schemas are still reconstructed eagerly +because `FunctionNode.__init__` calls `schema_utils.check_schema_compatibility()` +on input schema validation. + +#### `bind()` — late binding of a real packet function + +```python +def bind(self, packet_function: PacketFunctionProtocol) -> None: + """Bind a real packet function to this proxy. + + Validates that the provided function matches the proxy's stored + identity before accepting it. After binding, invocation methods + delegate to the bound function. + + Args: + packet_function: The real packet function to bind. + + Raises: + ValueError: If the provided function's identifying information + does not match the proxy's stored identity. + """ +``` + +**Validation checks** (all must pass): +- `canonical_function_name` matches +- `major_version` matches +- `packet_function_type_id` matches +- `input_packet_schema` matches +- `output_packet_schema` matches +- `uri` matches +- `content_hash()` matches (if the proxy has a stored content hash) + +If any check fails, `bind()` raises `ValueError` with a message listing +the mismatched fields and their expected vs actual values. + +The bound function is stored as `self._bound_function` (initially `None`). +The `is_bound` property exposes whether a function has been bound. + +#### `unbind()` — reverting to proxy-only mode + +```python +def unbind(self) -> None: + """Remove the bound packet function, reverting to proxy-only mode.""" +``` + +Sets `self._bound_function = None`. After unbinding, invocation methods +raise `PacketFunctionUnavailableError` again. Included for API consistency +with `SourceProxy`. + +**Why wrap, not replace**: The proxy remains in the call chain even after +binding. This preserves the fact that the function was not loaded from the +serialized config — useful for provenance tracking, debugging, and +distinguishing "loaded from config" vs "bound after load" in downstream +logic. + +### 3. Changes to `resolve_packet_function_from_config` (in `pipeline/serialization.py`) + +Add a `fallback_to_proxy` parameter (default `False`): + +```python +def resolve_packet_function_from_config( + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, +) -> PacketFunctionProtocol: +``` + +When `fallback_to_proxy=True` and the normal resolution fails (ImportError, +AttributeError, etc.), return a `PacketFunctionProxy` constructed from the config +instead of re-raising. + +### 4. Changes to `FunctionPod.from_config` (in `function_pod.py`) + +Add a `fallback_to_proxy` parameter that passes through to +`resolve_packet_function_from_config`: + +```python +@classmethod +def from_config( + cls, + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, +) -> "FunctionPod": +``` + +When the packet function resolves to a proxy, `FunctionPod` is constructed +normally — the proxy satisfies `PacketFunctionProtocol`. + +### 5. Changes to `Pipeline._load_function_node` (in `pipeline/graph.py`) + +The current logic: + +```python +if mode == "full": + if not upstream_usable: + # fall back to read-only + else: + try: + pod = FunctionPod.from_config(descriptor["function_pod"]) + return FunctionNode.from_descriptor( + descriptor, function_pod=pod, input_stream=upstream_node, + databases=databases, + ) + except Exception: + # fall back to read-only +``` + +Changes: +1. Call `FunctionPod.from_config(..., fallback_to_proxy=True)` so it never + raises — it returns a FunctionPod with either a live or proxy packet function. +2. Determine `load_status` based on whether the packet function is a proxy. +3. Always construct `FunctionNode` via the normal `from_descriptor` full-mode + path (with `function_pod` and `input_stream`), so `__init__` runs and + `CachedFunctionPod` is created. + +### 6. Changes to upstream usability check (in `Pipeline.load`) + +Currently in `Pipeline.load()`, the upstream usability check requires +`LoadStatus.FULL`: + +```python +upstream_usable = ( + upstream_node is not None + and hasattr(upstream_node, "load_status") + and upstream_node.load_status == LoadStatus.FULL +) +``` + +Change to also accept `READ_ONLY`: + +```python +upstream_usable = ( + upstream_node is not None + and hasattr(upstream_node, "load_status") + and upstream_node.load_status in (LoadStatus.FULL, LoadStatus.READ_ONLY) +) +``` + +This allows downstream operators and function pods to compute on cached data +from upstream proxy-backed nodes. + +The same change applies to the `all_upstreams_usable` check for operator nodes. + +### 7. `FunctionNode.from_descriptor` and `load_status` + +With the proxy approach, `from_descriptor` receives a real `function_pod` +(containing a proxy) and a real `input_stream` (upstream node). It takes +the normal full-mode `__init__` path. + +After construction, `load_status` is set based on whether the packet function +is a proxy: + +```python +from orcapod.core.packet_function_proxy import PacketFunctionProxy + +if isinstance(pod.packet_function, PacketFunctionProxy): + node._load_status = LoadStatus.READ_ONLY +else: + node._load_status = LoadStatus.FULL +``` + +This is set in `_load_function_node` after the node is constructed. + +### 8. `FunctionNode.iter_packets()` behavior with proxy + +With a proxy-backed node: + +- **Phase 1** (cached results from DB): Works unchanged — `CachedFunctionPod` + reads from the result DB, pipeline DB join produces (tag, packet) pairs. +- **Phase 2** (compute missing packets): If any input packets are not in the + cache, `_process_packet_internal` calls `CachedFunctionPod.process_packet()`, + which calls `self._function_pod.process_packet(tag, packet)`, which calls + `proxy.call(packet)` → raises `PacketFunctionUnavailableError`. + +This is the correct behavior: cached data flows, new computation fails clearly. + +### 9. Simplification of `FunctionNode.from_descriptor` read-only path + +The current read-only path (lines 251–307) with `cls.__new__` and manual +attribute assignment becomes dead code for pipeline loading, since all loaded +function nodes now go through the normal `__init__` path (with either a live or +proxy packet function). + +However, we should keep the read-only path for the case where the DB is also +unavailable (e.g., in-memory DB from original run). In that case, `load_status` +remains `UNAVAILABLE` and the node truly cannot serve any data. + +The read-only `__new__` path is only used when `pipeline_db` is `None` or when +the DB type is in-memory (which warns and provides no cached data). + +## Data flow summary + +``` +Pipeline.load() + │ + ├─ Source node: reconstructed or proxy (unchanged) + │ + ├─ Function node (function available): + │ └─ FunctionPod(PythonPacketFunction) → FunctionNode (FULL) + │ + ├─ Function node (function unavailable, DB available): + │ └─ FunctionPod(PacketFunctionProxy) → FunctionNode (READ_ONLY) + │ ├─ iter_packets() Phase 1: serves cached data ✓ + │ ├─ iter_packets() Phase 2: raises PacketFunctionUnavailableError ✗ + │ ├─ get_all_records(): works ✓ + │ └─ as_table(): works (from Phase 1 cache) ✓ + │ + ├─ Function node (function unavailable, DB unavailable): + │ └─ from_descriptor read-only path → FunctionNode (UNAVAILABLE) + │ + └─ Downstream operator/function pod: + ├─ upstream FULL or READ_ONLY → construct in FULL mode + └─ upstream UNAVAILABLE → construct in READ_ONLY/UNAVAILABLE mode +``` + +## Files changed + +| File | Change | +|------|--------| +| `src/orcapod/errors.py` | Add `PacketFunctionUnavailableError` | +| `src/orcapod/core/packet_function_proxy.py` | New: `PacketFunctionProxy` class | +| `src/orcapod/pipeline/serialization.py` | `resolve_packet_function_from_config`: add `fallback_to_proxy` param | +| `src/orcapod/core/function_pod.py` | `FunctionPod.from_config`: add `fallback_to_proxy` param | +| `src/orcapod/pipeline/graph.py` | `_load_function_node`: use proxy fallback; `load()`: relax upstream usability check | +| `src/orcapod/core/nodes/function_node.py` | `from_descriptor`: set `load_status` based on proxy detection | + +## Test plan + +1. **PacketFunctionProxy unit tests** + - Construction from a `PythonPacketFunction.to_config()` output + - Correct `input_packet_schema`, `output_packet_schema` + - `uri` returns stored original value (not recomputed) + - `content_hash()` and `pipeline_hash()` return stored values + - `call()` raises `PacketFunctionUnavailableError` + - `async_call()` raises `PacketFunctionUnavailableError` + - `to_config()` round-trips (returns original config) + - `executor` returns `None`; setter is no-op (no error) + +2. **FunctionPod with proxy** + - `FunctionPod(PacketFunctionProxy(...))` constructs without error + - `output_schema()` returns correct schemas + - `process_packet()` raises `PacketFunctionUnavailableError` + +3. **Pipeline load with unavailable function** + - Save a pipeline with a function pod, load in environment where function + is not importable + - Function node has `load_status == READ_ONLY` + - `get_all_records()` returns cached data + - `iter_packets()` yields cached results (Phase 1) + - `as_table()` returns cached data as table + - Attempting to process a new (uncached) packet raises + `PacketFunctionUnavailableError` + +4. **Downstream computation from cached data** + - Save a pipeline: source → function_pod → operator + - Load with function unavailable but operator available + - Function node is `READ_ONLY`, operator node is `FULL` + - Operator can process cached data from function node + - End-to-end: `operator_node.as_table()` returns correct results + +5. **Downstream function pod with available function** + - Save a pipeline: source → function_pod_A → function_pod_B + - Load with function_A unavailable, function_B available + - function_pod_A node is `READ_ONLY`, function_pod_B node is `FULL` + - function_pod_B can compute on cached output from function_pod_A + +6. **`bind()` — successful late binding** + - Create a `PacketFunctionProxy` from a config, then `bind()` the + matching `PythonPacketFunction` + - `is_bound` returns `True` + - `call()` delegates to the bound function (no error) + - `executor` getter/setter delegate to the bound function + +7. **`bind()` — identity mismatch rejection** + - Create a `PacketFunctionProxy`, attempt `bind()` with a function + that has a different `canonical_function_name` → `ValueError` + - Same for mismatched `major_version`, `input_packet_schema`, + `output_packet_schema`, `uri`, `content_hash` + - Error message lists which fields differ + +8. **`unbind()` — reverting to proxy-only mode** + - Bind a function, then `unbind()` + - `is_bound` returns `False` + - `call()` raises `PacketFunctionUnavailableError` again + +9. **Schema round-trip fidelity** + - Verify that `deserialize_schema(serialize_schema(schema))` produces + schemas that are functionally equivalent for all supported types + - This guards against type-converter round-trip divergence + +10. **Fully unavailable (no DB)** + - Load pipeline with in-memory DB → function node is `UNAVAILABLE` + - `iter_packets()` / `as_table()` raise `RuntimeError` + - Downstream nodes are also `UNAVAILABLE` or `READ_ONLY` From dcbf57c4e9a82a19f3859733835ec5aee8638891 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 21:58:52 +0000 Subject: [PATCH 02/12] docs(plan): add PLT-931 implementation plan for PacketFunctionProxy Also logs RC1 design issue for ResultCache.record_path not being overridable via public API. Co-Authored-By: Claude Opus 4.6 (1M context) --- DESIGN_ISSUES.md | 20 + .../2026-03-15-packet-function-proxy-plan.md | 1507 +++++++++++++++++ 2 files changed, 1527 insertions(+) create mode 100644 superpowers/plans/2026-03-15-packet-function-proxy-plan.md diff --git a/DESIGN_ISSUES.md b/DESIGN_ISSUES.md index 9dcab19b..f429ff46 100644 --- a/DESIGN_ISSUES.md +++ b/DESIGN_ISSUES.md @@ -892,6 +892,26 @@ For derived sources (e.g., `DerivedSource`), the stream may not have a meaningfu --- +## `src/orcapod/core/cached_function_pod.py` / `src/orcapod/core/result_cache.py` + +### RC1 — `ResultCache.record_path` not overridable via public API +**Status:** open +**Severity:** medium + +When loading a pipeline with a `PacketFunctionProxy` (PLT-931), the +`CachedFunctionPod`'s `record_path` may differ from the original because the +pod-level URI is recomputed from a deserialized schema (whose semantic hash +may diverge from the original due to the `Python type → Arrow string → Python type` +round-trip). The workaround reaches into private fields: +`node._cached_function_pod._cache._record_path = stored_path`. + +**Fix:** Add a public `override_record_path(path)` method to `CachedFunctionPod` +(or `ResultCache`) that validates and sets the record path. Alternatively, accept +an optional `record_path_override` in `CachedFunctionPod.__init__` that takes +precedence over the computed path when provided. + +--- + ## `src/orcapod/core/nodes/` — Config and context delegation chain ### T2 — `orcapod_config` not on any protocol; delegation chain needs review diff --git a/superpowers/plans/2026-03-15-packet-function-proxy-plan.md b/superpowers/plans/2026-03-15-packet-function-proxy-plan.md new file mode 100644 index 00000000..967d3c01 --- /dev/null +++ b/superpowers/plans/2026-03-15-packet-function-proxy-plan.md @@ -0,0 +1,1507 @@ +# PacketFunctionProxy Implementation Plan + +> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Enable cached data access from loaded pipelines when the original packet function is unavailable, by introducing `PacketFunctionProxy`. + +**Architecture:** `PacketFunctionProxy` implements `PacketFunctionProtocol` using stored metadata from the serialized config. It plugs into the existing `FunctionPod` → `FunctionNode` → `CachedFunctionPod` chain unchanged, so cached DB reads work. Invocation raises `PacketFunctionUnavailableError`. Late binding via `bind()` allows attaching a real function with identity validation. + +**Tech Stack:** Python, PyArrow, orcapod internals (PacketFunctionBase, FunctionPod, FunctionNode, CachedFunctionPod, Pipeline serialization) + +**Spec:** `superpowers/specs/2026-03-15-stub-packet-function-design.md` + +--- + +## File Structure + +| File | Action | Responsibility | +|------|--------|----------------| +| `src/orcapod/errors.py` | Modify | Add `PacketFunctionUnavailableError` | +| `src/orcapod/core/packet_function_proxy.py` | Create | `PacketFunctionProxy` class | +| `src/orcapod/pipeline/serialization.py` | Modify | `fallback_to_proxy` in `resolve_packet_function_from_config` | +| `src/orcapod/core/function_pod.py` | Modify | `fallback_to_proxy` in `FunctionPod.from_config` | +| `src/orcapod/pipeline/graph.py` | Modify | `_load_function_node` proxy support; upstream usability relaxation | +| `tests/test_core/packet_function/test_packet_function_proxy.py` | Create | Proxy unit tests | +| `tests/test_pipeline/test_serialization.py` | Modify | Pipeline load integration tests | + +--- + +## Chunk 1: PacketFunctionProxy Core + +### Task 1: Add `PacketFunctionUnavailableError` + +**Files:** +- Modify: `src/orcapod/errors.py` + +- [ ] **Step 1: Add the error class** + +Add at the end of `src/orcapod/errors.py`: + +```python +class PacketFunctionUnavailableError(RuntimeError): + """Raised when a packet function proxy is invoked without a bound function. + + This occurs when a pipeline is loaded in an environment where the + original packet function is not available. Only cached results can + be accessed. + """ +``` + +- [ ] **Step 2: Verify import works** + +Run: `uv run python -c "from orcapod.errors import PacketFunctionUnavailableError; print('OK')"` +Expected: `OK` + +- [ ] **Step 3: Commit** + +```bash +git add src/orcapod/errors.py +git commit -m "feat(errors): add PacketFunctionUnavailableError" +``` + +--- + +### Task 2: Create `PacketFunctionProxy` — construction and metadata + +**Files:** +- Create: `src/orcapod/core/packet_function_proxy.py` +- Create: `tests/test_core/packet_function/test_packet_function_proxy.py` + +**Context:** `PacketFunctionProxy` subclasses `PacketFunctionBase` (from `src/orcapod/core/packet_function.py`). It needs to: +- Accept a config dict (same format as `PythonPacketFunction.to_config()` output) +- Extract version from `config["config"]["version"]` and pass to `super().__init__(version=...)` +- Eagerly deserialize schemas via `deserialize_schema()` from `pipeline.serialization` +- Store the original config, pre-computed hashes, and a `_uri` override +- Override `uri` property to return stored value (avoiding semantic hasher recomputation) +- Override `content_hash()` and `pipeline_hash()` to return stored values + +The config format from `PythonPacketFunction.to_config()` is: +```python +{ + "packet_function_type_id": "python.function.v0", + "config": { + "module_path": "some.module", + "callable_name": "my_func", + "version": "v0.1", + "input_packet_schema": {"age": "int64", "name": "large_string"}, + "output_packet_schema": {"result": "float64"}, + "output_keys": ["result"], + }, +} +``` + +The `uri` comes from the parent `FunctionPod.to_config()` which wraps the above: +```python +{ + "uri": ["my_func", "", "v0", "python.function.v0"], + "packet_function": { ... the above ... }, + "node_config": None, +} +``` + +So the proxy receives the `packet_function` sub-dict as `config`, and receives +`uri` separately (or from the parent `function_pod` config in `_load_function_node`). + +- [ ] **Step 1: Write failing tests for construction and metadata** + +Create `tests/test_core/packet_function/test_packet_function_proxy.py`: + +```python +"""Tests for PacketFunctionProxy.""" + +from __future__ import annotations + +import pytest + +from orcapod.core.packet_function import PythonPacketFunction +from orcapod.core.packet_function_proxy import PacketFunctionProxy +from orcapod.errors import PacketFunctionUnavailableError +from orcapod.types import ContentHash, Schema + + +def _make_sample_function(): + """Create a simple packet function for testing.""" + + def double_age(age: int) -> int: + return age * 2 + + return PythonPacketFunction( + double_age, + output_keys="doubled_age", + version="v1.0", + ) + + +def _make_proxy_from_function(pf: PythonPacketFunction) -> PacketFunctionProxy: + """Create a proxy from a live packet function's config.""" + config = pf.to_config() + return PacketFunctionProxy( + config=config, + uri=tuple(pf.uri), + content_hash_str=pf.content_hash().to_string(), + pipeline_hash_str=pf.pipeline_hash().to_string(), + ) + + +class TestPacketFunctionProxyConstruction: + def test_construction_from_config(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy is not None + + def test_canonical_function_name(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.canonical_function_name == pf.canonical_function_name + + def test_major_version(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.major_version == pf.major_version + + def test_minor_version_string(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.minor_version_string == pf.minor_version_string + + def test_packet_function_type_id(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.packet_function_type_id == pf.packet_function_type_id + + def test_input_packet_schema(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert dict(proxy.input_packet_schema) == dict(pf.input_packet_schema) + + def test_output_packet_schema(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert dict(proxy.output_packet_schema) == dict(pf.output_packet_schema) + + def test_uri_returns_stored_value(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.uri == pf.uri + + def test_content_hash_returns_stored_value(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.content_hash().to_string() == pf.content_hash().to_string() + + def test_pipeline_hash_returns_stored_value(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.pipeline_hash().to_string() == pf.pipeline_hash().to_string() + + def test_to_config_returns_original(self): + pf = _make_sample_function() + config = pf.to_config() + proxy = PacketFunctionProxy( + config=config, + uri=tuple(pf.uri), + content_hash_str=pf.content_hash().to_string(), + pipeline_hash_str=pf.pipeline_hash().to_string(), + ) + assert proxy.to_config() == config + + def test_executor_returns_none(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.executor is None + + def test_executor_setter_is_noop(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.executor = None # should not raise + assert proxy.executor is None + + def test_is_bound_initially_false(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.is_bound is False +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py -v` +Expected: All FAIL (ModuleNotFoundError for `packet_function_proxy`) + +- [ ] **Step 3: Implement `PacketFunctionProxy`** + +Create `src/orcapod/core/packet_function_proxy.py`: + +```python +"""PacketFunctionProxy — stands in for an unavailable packet function.""" + +from __future__ import annotations + +from typing import Any + +from orcapod.core.packet_function import PacketFunctionBase +from orcapod.errors import PacketFunctionUnavailableError +from orcapod.protocols.core_protocols import ( + PacketFunctionExecutorProtocol, + PacketFunctionProtocol, + PacketProtocol, +) +from orcapod.types import ContentHash, Schema + + +class PacketFunctionProxy(PacketFunctionBase): + """Proxy for a packet function that is not available in this environment. + + Carries all identifying metadata (schemas, URI, hashes) from the + serialized config so that ``FunctionPod``, ``FunctionNode``, and + ``CachedFunctionPod`` can be fully constructed. Invocation methods + raise ``PacketFunctionUnavailableError`` unless a real function has + been bound via ``bind()``. + + Args: + config: The packet function config dict as produced by + ``PythonPacketFunction.to_config()``. + uri: The original URI tuple from the ``FunctionPod.to_config()`` + output. Stored directly and returned from the ``uri`` + property to avoid semantic hasher recomputation. + content_hash_str: Pre-computed content hash string from the + pipeline descriptor. + pipeline_hash_str: Pre-computed pipeline hash string from the + pipeline descriptor. + """ + + def __init__( + self, + config: dict[str, Any], + uri: tuple[str, ...], + content_hash_str: str | None = None, + pipeline_hash_str: str | None = None, + ) -> None: + self._original_config = config + inner = config.get("config", config) + + # Extract version for PacketFunctionBase.__init__ + version = inner.get("version", "v0.0") + + super().__init__(version=version) + + # Store identity metadata + self._proxy_type_id = config.get( + "packet_function_type_id", "python.function.v0" + ) + self._proxy_function_name = inner.get("callable_name", "unknown") + self._stored_uri = uri + self._stored_content_hash = content_hash_str + self._stored_pipeline_hash = pipeline_hash_str + + # Eagerly deserialize schemas + from orcapod.pipeline.serialization import deserialize_schema + + raw_input = inner.get("input_packet_schema", {}) + raw_output = inner.get("output_packet_schema", {}) + self._input_schema = Schema(deserialize_schema(raw_input)) + self._output_schema = Schema(deserialize_schema(raw_output)) + + # Late binding state + self._bound_function: PacketFunctionProtocol | None = None + + # ==================== Identity & Metadata ==================== + + @property + def packet_function_type_id(self) -> str: + return self._proxy_type_id + + @property + def canonical_function_name(self) -> str: + return self._proxy_function_name + + @property + def input_packet_schema(self) -> Schema: + return self._input_schema + + @property + def output_packet_schema(self) -> Schema: + return self._output_schema + + @property + def uri(self) -> tuple[str, ...]: + """Return the stored original URI, avoiding recomputation.""" + return self._stored_uri + + # ==================== Hashing ==================== + + def content_hash(self, hasher=None) -> ContentHash: + """Return the stored content hash.""" + if self._stored_content_hash is not None: + return ContentHash.from_string(self._stored_content_hash) + return super().content_hash(hasher) + + def pipeline_hash(self, hasher=None) -> ContentHash: + """Return the stored pipeline hash.""" + if self._stored_pipeline_hash is not None: + return ContentHash.from_string(self._stored_pipeline_hash) + return super().pipeline_hash(hasher) + + # ==================== Variation / Execution Data ==================== + + def get_function_variation_data(self) -> dict[str, Any]: + if self._bound_function is not None: + return self._bound_function.get_function_variation_data() + return {} + + def get_execution_data(self) -> dict[str, Any]: + if self._bound_function is not None: + return self._bound_function.get_execution_data() + return {} + + # ==================== Executor ==================== + + @property + def executor(self) -> PacketFunctionExecutorProtocol | None: + if self._bound_function is not None: + return self._bound_function.executor + return None + + @executor.setter + def executor(self, executor: PacketFunctionExecutorProtocol | None) -> None: + if self._bound_function is not None: + self._bound_function.executor = executor + # No-op when unbound — avoids breaking _apply_execution_engine + + # ==================== Invocation ==================== + + def _raise_unavailable(self) -> None: + raise PacketFunctionUnavailableError( + f"Packet function '{self._proxy_function_name}' is not available " + f"in this environment. Only cached results can be accessed." + ) + + def call(self, packet: PacketProtocol) -> PacketProtocol | None: + if self._bound_function is not None: + return self._bound_function.call(packet) + self._raise_unavailable() + + async def async_call(self, packet: PacketProtocol) -> PacketProtocol | None: + if self._bound_function is not None: + return await self._bound_function.async_call(packet) + self._raise_unavailable() + + def direct_call(self, packet: PacketProtocol) -> PacketProtocol | None: + if self._bound_function is not None: + return self._bound_function.direct_call(packet) + self._raise_unavailable() + + async def direct_async_call( + self, packet: PacketProtocol + ) -> PacketProtocol | None: + if self._bound_function is not None: + return await self._bound_function.direct_async_call(packet) + self._raise_unavailable() + + # ==================== Serialization ==================== + + def to_config(self) -> dict[str, Any]: + """Return the original config dict (round-trip preservation).""" + return self._original_config + + @classmethod + def from_config(cls, config: dict[str, Any]) -> "PacketFunctionProxy": + """Construct a proxy from a packet function config dict. + + Args: + config: A dict as produced by ``PythonPacketFunction.to_config()``. + + Returns: + A new ``PacketFunctionProxy`` instance. + """ + # URI not available from packet_function config alone — caller + # must provide it separately via the constructor. from_config + # builds a proxy without URI/hash overrides. + inner = config.get("config", config) + # Build a placeholder URI from available metadata + type_id = config.get("packet_function_type_id", "python.function.v0") + name = inner.get("callable_name", "unknown") + version = inner.get("version", "v0.0") + import re + + match = re.match(r"\D*(\d+)\.(.*)", version) + major = int(match.group(1)) if match else 0 + uri = (name, "", f"v{major}", type_id) + + return cls(config=config, uri=uri) + + # ==================== Binding ==================== + + @property + def is_bound(self) -> bool: + """Return whether a real packet function has been bound.""" + return self._bound_function is not None + + def bind(self, packet_function: PacketFunctionProtocol) -> None: + """Bind a real packet function to this proxy. + + Validates that the provided function matches the proxy's stored + identity before accepting it. + + Args: + packet_function: The real packet function to bind. + + Raises: + ValueError: If the provided function's identifying information + does not match the proxy's stored identity. + """ + mismatches: list[str] = [] + + if packet_function.canonical_function_name != self.canonical_function_name: + mismatches.append( + f"canonical_function_name: expected {self.canonical_function_name!r}, " + f"got {packet_function.canonical_function_name!r}" + ) + if packet_function.major_version != self.major_version: + mismatches.append( + f"major_version: expected {self.major_version}, " + f"got {packet_function.major_version}" + ) + if packet_function.packet_function_type_id != self.packet_function_type_id: + mismatches.append( + f"packet_function_type_id: expected {self.packet_function_type_id!r}, " + f"got {packet_function.packet_function_type_id!r}" + ) + if dict(packet_function.input_packet_schema) != dict(self.input_packet_schema): + mismatches.append( + f"input_packet_schema: expected {dict(self.input_packet_schema)}, " + f"got {dict(packet_function.input_packet_schema)}" + ) + if dict(packet_function.output_packet_schema) != dict( + self.output_packet_schema + ): + mismatches.append( + f"output_packet_schema: expected {dict(self.output_packet_schema)}, " + f"got {dict(packet_function.output_packet_schema)}" + ) + if packet_function.uri != self.uri: + mismatches.append( + f"uri: expected {self.uri}, got {packet_function.uri}" + ) + if ( + self._stored_content_hash is not None + and packet_function.content_hash().to_string() + != self._stored_content_hash + ): + mismatches.append( + f"content_hash: expected {self._stored_content_hash}, " + f"got {packet_function.content_hash().to_string()}" + ) + + if mismatches: + raise ValueError( + f"Cannot bind packet function: identity mismatch.\n" + + "\n".join(f" - {m}" for m in mismatches) + ) + + self._bound_function = packet_function + + def unbind(self) -> None: + """Remove the bound packet function, reverting to proxy-only mode.""" + self._bound_function = None +``` + +**Important note about `call()` and `async_call()`:** These are inherited from +`PacketFunctionBase` which routes through `direct_call()` / `direct_async_call()` +(via the executor if set, or directly). Since the proxy overrides `direct_call()` +and `direct_async_call()` to raise `PacketFunctionUnavailableError` (when unbound) +or delegate (when bound), the inherited `call()` / `async_call()` will do the +right thing automatically. Check `PacketFunctionBase.call()` implementation to +confirm this routing — if it calls `self.direct_call()`, we're good. If it has +its own logic, we may need to override `call()` too. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py -v` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/core/packet_function_proxy.py tests/test_core/packet_function/test_packet_function_proxy.py +git commit -m "feat(core): add PacketFunctionProxy with metadata and identity" +``` + +--- + +### Task 3: Test invocation behavior (unbound raises, bound delegates) + +**Files:** +- Modify: `tests/test_core/packet_function/test_packet_function_proxy.py` + +- [ ] **Step 1: Write failing tests for invocation** + +Append to the test file: + +```python +class TestPacketFunctionProxyInvocation: + def test_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + # Create a minimal packet to pass + from orcapod.core.datagrams.tag_packet import Packet + + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError, match="double_age"): + proxy.call(packet) + + @pytest.mark.asyncio + async def test_async_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + from orcapod.core.datagrams.tag_packet import Packet + + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError, match="double_age"): + await proxy.async_call(packet) + + def test_direct_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + from orcapod.core.datagrams.tag_packet import Packet + + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError): + proxy.direct_call(packet) + + def test_variation_data_empty_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.get_function_variation_data() == {} + + def test_execution_data_empty_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.get_execution_data() == {} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py::TestPacketFunctionProxyInvocation -v` +Expected: FAIL (depending on whether `call()` routes through `direct_call()`) + +- [ ] **Step 3: Fix any test issues** + +The `call()`, `async_call()`, `direct_call()`, and `direct_async_call()` +overrides are already in the Task 2 implementation. They raise +`PacketFunctionUnavailableError` when unbound and delegate when bound. +If tests fail, check that the error message includes the function name. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py -v` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add tests/test_core/packet_function/test_packet_function_proxy.py src/orcapod/core/packet_function_proxy.py +git commit -m "test(proxy): add invocation behavior tests for PacketFunctionProxy" +``` + +--- + +### Task 4: Test `bind()`, `unbind()`, and identity mismatch rejection + +**Files:** +- Modify: `tests/test_core/packet_function/test_packet_function_proxy.py` + +- [ ] **Step 1: Write failing tests for bind/unbind** + +Append to the test file: + +```python +class TestPacketFunctionProxyBinding: + def test_bind_succeeds_with_matching_function(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.is_bound is True + + def test_call_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + from orcapod.core.datagrams.tag_packet import Packet + + packet = Packet({"age": 25}) + result = proxy.call(packet) + assert result is not None + assert result.as_dict()["doubled_age"] == 50 + + def test_variation_data_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.get_function_variation_data() == pf.get_function_variation_data() + + def test_execution_data_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.get_execution_data() == pf.get_execution_data() + + def test_unbind_reverts_to_proxy_mode(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.is_bound is True + proxy.unbind() + assert proxy.is_bound is False + from orcapod.core.datagrams.tag_packet import Packet + + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError): + proxy.call(packet) + + def test_bind_rejects_mismatched_function_name(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def triple_age(age: int) -> int: + return age * 3 + + other_pf = PythonPacketFunction( + triple_age, output_keys="tripled_age", version="v1.0" + ) + with pytest.raises(ValueError, match="canonical_function_name"): + proxy.bind(other_pf) + + def test_bind_rejects_mismatched_version(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def double_age(age: int) -> int: + return age * 2 + + other_pf = PythonPacketFunction( + double_age, output_keys="doubled_age", version="v2.0" + ) + with pytest.raises(ValueError, match="major_version"): + proxy.bind(other_pf) + + def test_bind_rejects_mismatched_output_schema(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def double_age(age: int) -> str: + return str(age * 2) + + other_pf = PythonPacketFunction( + double_age, output_keys="doubled_age", version="v1.0" + ) + with pytest.raises(ValueError, match="output_packet_schema"): + proxy.bind(other_pf) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py::TestPacketFunctionProxyBinding -v` +Expected: FAIL + +- [ ] **Step 3: Fix any issues in implementation** + +The `bind()` and `unbind()` implementations are already in the Task 2 code. +If `call()` delegation doesn't work, ensure the bound function's `call()` is +used (not `direct_call()`). + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py -v` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add tests/test_core/packet_function/test_packet_function_proxy.py src/orcapod/core/packet_function_proxy.py +git commit -m "test(proxy): add bind/unbind and identity validation tests" +``` + +--- + +### Task 5: Test `FunctionPod` with proxy + +**Files:** +- Modify: `tests/test_core/packet_function/test_packet_function_proxy.py` + +- [ ] **Step 1: Write failing tests for FunctionPod integration** + +Append to the test file: + +```python +class TestFunctionPodWithProxy: + def test_function_pod_constructs_with_proxy(self): + from orcapod.core.function_pod import FunctionPod + + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + assert pod is not None + assert pod.packet_function is proxy + + def test_function_pod_output_schema(self): + from orcapod.core.function_pod import FunctionPod + from orcapod.core.sources.dict_source import DictSource + + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + source = DictSource({"age": [10, 20, 30]}) + tag_schema, packet_schema = pod.output_schema(source) + assert "doubled_age" in packet_schema + + def test_function_pod_process_packet_raises(self): + from orcapod.core.datagrams.tag_packet import Packet, Tag + from orcapod.core.function_pod import FunctionPod + + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + tag = Tag({}) + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError): + pod.process_packet(tag, packet) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py::TestFunctionPodWithProxy -v` +Expected: FAIL + +- [ ] **Step 3: Fix any issues** + +The `FunctionPod` constructor calls `_FunctionPodBase.uri` which accesses +`self.packet_function.output_packet_schema`. Since the proxy's `uri` is +overridden, we need to check if `_FunctionPodBase.uri` is even called during +construction. If `_output_schema_hash` is set lazily (only when `uri` is +accessed), construction should work. If `CachedFunctionPod` init triggers +`uri` access, we need to ensure the proxy's override takes precedence. + +**Key insight:** `_FunctionPodBase.uri` is on the *pod*, not the packet +function. The pod's `uri` calls `self.packet_function.canonical_function_name`, +`self.data_context.semantic_hasher.hash_object(self.packet_function.output_packet_schema)`, +etc. But the *proxy's* `uri` property (on `PacketFunctionBase`) is a different +property. The pod accesses `self.packet_function.uri` only in +`identity_structure()` and `pipeline_identity_structure()`, which return +`self.packet_function.identity_structure()` → `self.uri` on the proxy. So the +proxy's `uri` override IS used for identity/hashing, which is correct. + +The pod's own `uri` (on `_FunctionPodBase`) is used for `CachedFunctionPod` +record path. This pod-level `uri` computes `semantic_hasher.hash_object( +self.packet_function.output_packet_schema)`. Since the proxy's +`output_packet_schema` is eagerly deserialized, this should work — but the +resulting hash may differ from the original. This is why the spec says to store +the URI and use it. However, the pod-level `uri` is separate from the proxy's +`uri`. + +If the pod-level `uri` hash differs, `CachedFunctionPod` will look in the wrong +DB path. **Solution**: In `_load_function_node`, after constructing the +`FunctionNode`, override `_cached_function_pod._cache.record_path` with the +stored `result_record_path` from the descriptor. OR store the +`_output_schema_hash` on the FunctionPod from the stored URI. + +This is a subtlety the implementation will need to handle. The test will reveal +if it's a problem. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py -v` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add tests/test_core/packet_function/test_packet_function_proxy.py src/orcapod/core/packet_function_proxy.py +git commit -m "test(proxy): add FunctionPod integration tests" +``` + +--- + +## Chunk 2: Serialization and Pipeline Loading + +### Task 6: Add `fallback_to_proxy` to serialization resolver + +**Files:** +- Modify: `src/orcapod/pipeline/serialization.py` +- Modify: `tests/test_pipeline/test_serialization_helpers.py` + +- [ ] **Step 1: Write failing test for fallback** + +Add to `tests/test_pipeline/test_serialization_helpers.py`: + +```python +def test_resolve_packet_function_fallback_to_proxy(): + """When the module can't be imported, fallback_to_proxy returns a proxy.""" + from orcapod.core.packet_function_proxy import PacketFunctionProxy + from orcapod.pipeline.serialization import resolve_packet_function_from_config + + config = { + "packet_function_type_id": "python.function.v0", + "config": { + "module_path": "nonexistent.module.that.does.not.exist", + "callable_name": "some_func", + "version": "v1.0", + "input_packet_schema": {"x": "int64"}, + "output_packet_schema": {"y": "float64"}, + "output_keys": ["y"], + }, + } + + # Without fallback, should raise + with pytest.raises(Exception): + resolve_packet_function_from_config(config) + + # With fallback, should return proxy + result = resolve_packet_function_from_config(config, fallback_to_proxy=True) + assert isinstance(result, PacketFunctionProxy) + assert result.canonical_function_name == "some_func" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py::test_resolve_packet_function_fallback_to_proxy -v` +Expected: FAIL (TypeError — `fallback_to_proxy` not recognized) + +- [ ] **Step 3: Implement fallback in `resolve_packet_function_from_config`** + +In `src/orcapod/pipeline/serialization.py`, modify `resolve_packet_function_from_config`: + +```python +def resolve_packet_function_from_config( + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, +) -> Any: + """Reconstruct a packet function from a config dict. + + Args: + config: Dict with at least a ``"packet_function_type_id"`` key matching + a registered packet function type. + fallback_to_proxy: If ``True`` and reconstruction fails, return a + ``PacketFunctionProxy`` preserving identity from the config. + + Returns: + A new packet function instance constructed from the config. + + Raises: + ValueError: If the type ID is missing or unknown (and fallback is off). + """ + _ensure_registries() + type_id = config.get("packet_function_type_id") + if type_id not in PACKET_FUNCTION_REGISTRY: + if fallback_to_proxy: + return _packet_function_proxy_from_config(config) + raise ValueError( + f"Unknown packet function type: {type_id!r}. " + f"Known types: {sorted(PACKET_FUNCTION_REGISTRY.keys())}" + ) + cls = PACKET_FUNCTION_REGISTRY[type_id] + try: + return cls.from_config(config) + except Exception: + if fallback_to_proxy: + logger.warning( + "Could not reconstruct packet function from config; " + "returning PacketFunctionProxy." + ) + return _packet_function_proxy_from_config(config) + raise + + +def _packet_function_proxy_from_config(config: dict[str, Any]) -> Any: + """Create a ``PacketFunctionProxy`` from a packet function config. + + Args: + config: Packet function config dict. + + Returns: + A ``PacketFunctionProxy`` preserving the original identity. + """ + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + return PacketFunctionProxy.from_config(config) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py::test_resolve_packet_function_fallback_to_proxy -v` +Expected: PASS + +- [ ] **Step 5: Run full serialization helper tests** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py -v` +Expected: All PASS (no regressions) + +- [ ] **Step 6: Commit** + +```bash +git add src/orcapod/pipeline/serialization.py tests/test_pipeline/test_serialization_helpers.py +git commit -m "feat(serialization): add fallback_to_proxy for packet function resolution" +``` + +--- + +### Task 7: Add `fallback_to_proxy` to `FunctionPod.from_config` + +**Files:** +- Modify: `src/orcapod/core/function_pod.py` + +- [ ] **Step 1: Write failing test** + +Add to `tests/test_pipeline/test_serialization_helpers.py`: + +```python +def test_function_pod_from_config_fallback_to_proxy(): + """FunctionPod.from_config with fallback_to_proxy returns pod with proxy.""" + from orcapod.core.function_pod import FunctionPod + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + config = { + "uri": ["some_func", "hash123", "v1", "python.function.v0"], + "packet_function": { + "packet_function_type_id": "python.function.v0", + "config": { + "module_path": "nonexistent.module", + "callable_name": "some_func", + "version": "v1.0", + "input_packet_schema": {"x": "int64"}, + "output_packet_schema": {"y": "float64"}, + "output_keys": ["y"], + }, + }, + "node_config": None, + } + + # Without fallback, should raise + with pytest.raises(Exception): + FunctionPod.from_config(config) + + # With fallback, should return FunctionPod with proxy + pod = FunctionPod.from_config(config, fallback_to_proxy=True) + assert isinstance(pod.packet_function, PacketFunctionProxy) + assert pod.packet_function.canonical_function_name == "some_func" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py::test_function_pod_from_config_fallback_to_proxy -v` +Expected: FAIL + +- [ ] **Step 3: Modify `FunctionPod.from_config`** + +In `src/orcapod/core/function_pod.py`, change `FunctionPod.from_config` (around line 297): + +```python +@classmethod +def from_config( + cls, + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, +) -> "FunctionPod": + """Reconstruct a FunctionPod from a config dict. + + Args: + config: A dict as produced by :meth:`to_config`. + fallback_to_proxy: If ``True`` and the packet function cannot be + reconstructed, use a ``PacketFunctionProxy`` instead. + + Returns: + A new ``FunctionPod`` instance. + """ + from orcapod.pipeline.serialization import resolve_packet_function_from_config + + pf_config = config["packet_function"] + + if fallback_to_proxy: + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + packet_function = resolve_packet_function_from_config( + pf_config, fallback_to_proxy=True + ) + # If we got a proxy, inject the URI from the parent config + if isinstance(packet_function, PacketFunctionProxy): + uri_list = config.get("uri") + if uri_list is not None: + packet_function._stored_uri = tuple(uri_list) + else: + packet_function = resolve_packet_function_from_config(pf_config) + + node_config = None + if config.get("node_config") is not None: + node_config = NodeConfig(**config["node_config"]) + + return cls(packet_function=packet_function, node_config=node_config) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py::test_function_pod_from_config_fallback_to_proxy -v` +Expected: PASS + +- [ ] **Step 5: Run full test suite for regressions** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py -v` +Expected: All PASS + +- [ ] **Step 6: Commit** + +```bash +git add src/orcapod/core/function_pod.py tests/test_pipeline/test_serialization_helpers.py +git commit -m "feat(function_pod): add fallback_to_proxy to FunctionPod.from_config" +``` + +--- + +### Task 8: Update `Pipeline.load` — upstream usability and function node loading + +**Files:** +- Modify: `src/orcapod/pipeline/graph.py` + +This is the integration point. Two changes: + +1. Relax upstream usability check to accept `READ_ONLY` +2. Modify `_load_function_node` to use `fallback_to_proxy=True` and set `load_status` + +- [ ] **Step 1: Modify upstream usability check in `Pipeline.load`** + +In `src/orcapod/pipeline/graph.py`, find the upstream usability check for function +nodes (around line 722-726): + +Change: +```python +upstream_usable = ( + upstream_node is not None + and hasattr(upstream_node, "load_status") + and upstream_node.load_status == LoadStatus.FULL +) +``` + +To: +```python +upstream_usable = ( + upstream_node is not None + and hasattr(upstream_node, "load_status") + and upstream_node.load_status + in (LoadStatus.FULL, LoadStatus.READ_ONLY) +) +``` + +Do the same for the operator node `all_upstreams_usable` check (around line 744-751): + +Change: +```python +all_upstreams_usable = ( + all( + hasattr(n, "load_status") and n.load_status == LoadStatus.FULL + for n in upstream_nodes + ) + if upstream_nodes + else False +) +``` + +To: +```python +all_upstreams_usable = ( + all( + hasattr(n, "load_status") + and n.load_status in (LoadStatus.FULL, LoadStatus.READ_ONLY) + for n in upstream_nodes + ) + if upstream_nodes + else False +) +``` + +- [ ] **Step 2: Modify `_load_function_node` to use proxy fallback** + +In `src/orcapod/pipeline/graph.py`, replace `_load_function_node` (around line 851-900): + +```python +@staticmethod +def _load_function_node( + descriptor: dict[str, Any], + mode: str, + upstream_node: Any | None, + upstream_usable: bool, + databases: dict[str, Any], +) -> FunctionNode: + """Reconstruct a FunctionNode from a descriptor. + + Args: + descriptor: The serialized node descriptor. + mode: Load mode. + upstream_node: The reconstructed upstream node, or ``None``. + upstream_usable: Whether the upstream can provide data. + databases: Database role mapping. + + Returns: + A ``FunctionNode`` instance. + """ + from orcapod.core.function_pod import FunctionPod + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + if mode != "read_only" and upstream_usable: + try: + pod = FunctionPod.from_config( + descriptor["function_pod"], fallback_to_proxy=True + ) + node = FunctionNode.from_descriptor( + descriptor, + function_pod=pod, + input_stream=upstream_node, + databases=databases, + ) + # Set load_status based on whether packet function is a proxy + if isinstance(pod.packet_function, PacketFunctionProxy): + node._load_status = LoadStatus.READ_ONLY + # Override CachedFunctionPod's record_path with the stored + # value from the descriptor. The pod-level URI computation + # (which hashes output_packet_schema via semantic_hasher) + # may produce a different hash after schema round-trip, + # causing DB lookups to miss. The descriptor stores the + # original record_path that was used when data was written. + stored_result_path = tuple( + descriptor.get("result_record_path", ()) + ) + if stored_result_path and node._cached_function_pod is not None: + # NOTE: This reaches into private fields. Logged as a + # design issue — CachedFunctionPod / ResultCache should + # expose a public API for record_path override. + node._cached_function_pod._cache._record_path = ( + stored_result_path + ) + else: + node._load_status = LoadStatus.FULL + return node + except Exception: + logger.warning( + "Failed to reconstruct function node %r, " + "falling back to read-only.", + descriptor.get("label"), + ) + + # Fall through: upstream not usable, read_only mode, or reconstruction failed + if not upstream_usable and mode != "read_only": + logger.warning( + "Upstream for function node %r is not usable, " + "falling back to read-only.", + descriptor.get("label"), + ) + + return FunctionNode.from_descriptor( + descriptor, + function_pod=None, + input_stream=None, + databases=databases, + ) +``` + +- [ ] **Step 3: Run existing serialization tests to check for regressions** + +Run: `uv run pytest tests/test_pipeline/test_serialization.py -v` +Expected: All PASS (existing behavior preserved) + +- [ ] **Step 4: Commit** + +```bash +git add src/orcapod/pipeline/graph.py +git commit -m "feat(pipeline): use PacketFunctionProxy in load, relax upstream usability" +``` + +--- + +### Task 9: Integration test — pipeline load with unavailable function + +**Files:** +- Modify: `tests/test_pipeline/test_serialization.py` + +- [ ] **Step 1: Write integration test** + +Add to `tests/test_pipeline/test_serialization.py`: + +```python +class TestPipelineLoadWithUnavailableFunction: + """Tests for loading a pipeline when the packet function is not importable.""" + + def _build_and_save_pipeline(self, tmp_path): + """Build a pipeline with a function pod, run it, and save to JSON.""" + from orcapod.core.function_pod import FunctionPod, function_pod + from orcapod.core.sources.dict_source import DictSource + from orcapod.databases.delta_lake_databases import DeltaTableDatabase + from orcapod.pipeline import Pipeline + + db_path = str(tmp_path / "pipeline_db") + db = DeltaTableDatabase(db_path) + + source = DictSource( + {"name": ["alice", "bob"], "age": [30, 25]}, + tag_columns=["name"], + ) + + @function_pod(output_keys="doubled_age", version="v1.0") + def double_age(age: int) -> int: + return age * 2 + + with Pipeline("test_pipeline", pipeline_database=db) as p: + stream = source + stream = double_age.pod(stream) + + p.run() + + save_path = str(tmp_path / "pipeline.json") + p.save(save_path) + return save_path, db_path + + def test_load_with_unavailable_function_has_read_only_status(self, tmp_path): + """Function node should be READ_ONLY when function can't be loaded.""" + import json + + from orcapod.pipeline import LoadStatus, Pipeline + + save_path, _ = self._build_and_save_pipeline(tmp_path) + + # Corrupt the module path to simulate unavailable function + with open(save_path) as f: + data = json.load(f) + for node in data["nodes"].values(): + if node.get("node_type") == "function": + pf_config = node["function_pod"]["packet_function"]["config"] + pf_config["module_path"] = "nonexistent.module.path" + with open(save_path, "w") as f: + json.dump(data, f) + + loaded = Pipeline.load(save_path) + + # Find the function node + fn_node = None + for node in loaded.compiled_nodes.values(): + if node.node_type == "function": + fn_node = node + break + + assert fn_node is not None + assert fn_node.load_status == LoadStatus.READ_ONLY + + def test_load_with_unavailable_function_can_get_all_records(self, tmp_path): + """get_all_records() should return cached data.""" + import json + + from orcapod.pipeline import Pipeline + + save_path, _ = self._build_and_save_pipeline(tmp_path) + + with open(save_path) as f: + data = json.load(f) + for node in data["nodes"].values(): + if node.get("node_type") == "function": + pf_config = node["function_pod"]["packet_function"]["config"] + pf_config["module_path"] = "nonexistent.module.path" + with open(save_path, "w") as f: + json.dump(data, f) + + loaded = Pipeline.load(save_path) + fn_node = None + for node in loaded.compiled_nodes.values(): + if node.node_type == "function": + fn_node = node + break + + records = fn_node.get_all_records() + assert records is not None + assert records.num_rows == 2 + + def test_load_with_unavailable_function_iter_packets_yields_cached( + self, tmp_path + ): + """iter_packets() should yield cached results from Phase 1.""" + import json + + from orcapod.pipeline import Pipeline + + save_path, _ = self._build_and_save_pipeline(tmp_path) + + with open(save_path) as f: + data = json.load(f) + for node in data["nodes"].values(): + if node.get("node_type") == "function": + pf_config = node["function_pod"]["packet_function"]["config"] + pf_config["module_path"] = "nonexistent.module.path" + with open(save_path, "w") as f: + json.dump(data, f) + + loaded = Pipeline.load(save_path) + fn_node = None + for node in loaded.compiled_nodes.values(): + if node.node_type == "function": + fn_node = node + break + + packets = list(fn_node.iter_packets()) + assert len(packets) == 2 + # Verify data content + values = sorted(p.as_dict()["doubled_age"] for _, p in packets) + assert values == [50, 60] +``` + +- [ ] **Step 2: Run integration tests** + +Run: `uv run pytest tests/test_pipeline/test_serialization.py::TestPipelineLoadWithUnavailableFunction -v` +Expected: Tests may FAIL if there are issues with the pod-level `uri` hash +divergence (see Task 5 notes). Debug and fix as needed. + +- [ ] **Step 3: Fix any issues discovered** + +The most likely issue: `CachedFunctionPod.record_path` computed from +`_FunctionPodBase.uri` (which hashes `output_packet_schema`) may differ from +the original `result_record_path` stored in the descriptor. If so, after +constructing the `FunctionNode` in `_load_function_node`, override the record +path: + +```python +# In _load_function_node, after constructing the node with proxy: +if isinstance(pod.packet_function, PacketFunctionProxy): + stored_result_path = tuple(descriptor.get("result_record_path", ())) + if stored_result_path and node._cached_function_pod is not None: + node._cached_function_pod._cache._record_path = stored_result_path +``` + +Check if `ResultCache` has `_record_path` as a settable attribute or if +`record_path` is a property. Adjust accordingly. + +- [ ] **Step 4: Run integration tests again** + +Run: `uv run pytest tests/test_pipeline/test_serialization.py::TestPipelineLoadWithUnavailableFunction -v` +Expected: All PASS + +- [ ] **Step 5: Run full serialization test suite** + +Run: `uv run pytest tests/test_pipeline/test_serialization.py -v` +Expected: All PASS (no regressions) + +- [ ] **Step 6: Commit** + +```bash +git add tests/test_pipeline/test_serialization.py src/orcapod/pipeline/graph.py +git commit -m "test(pipeline): add integration tests for loading with unavailable function" +``` + +--- + +### Task 10: Integration test — downstream computation from cached data + +**Files:** +- Modify: `tests/test_pipeline/test_serialization.py` + +- [ ] **Step 1: Write downstream operator test** + +Add to `TestPipelineLoadWithUnavailableFunction`: + +```python + def test_downstream_operator_computes_from_cached(self, tmp_path): + """An operator downstream of a proxy-backed node should work.""" + import json + + from orcapod.core.operators import SelectPacketColumns + from orcapod.core.function_pod import function_pod + from orcapod.core.sources.dict_source import DictSource + from orcapod.databases.delta_lake_databases import DeltaTableDatabase + from orcapod.pipeline import LoadStatus, Pipeline + + db_path = str(tmp_path / "pipeline_db") + db = DeltaTableDatabase(db_path) + + source = DictSource( + {"name": ["alice", "bob"], "age": [30, 25]}, + tag_columns=["name"], + ) + + @function_pod(output_keys=("doubled_age", "original_age"), version="v1.0") + def transform(age: int) -> tuple[int, int]: + return age * 2, age + + select_op = SelectPacketColumns(columns=["doubled_age"]) + + with Pipeline("test_downstream", pipeline_database=db) as p: + stream = source + stream = transform.pod(stream) + stream = select_op(stream) + + p.run() + + save_path = str(tmp_path / "pipeline.json") + p.save(save_path) + + # Corrupt function module path + with open(save_path) as f: + data = json.load(f) + for node in data["nodes"].values(): + if node.get("node_type") == "function": + pf_config = node["function_pod"]["packet_function"]["config"] + pf_config["module_path"] = "nonexistent.module.path" + with open(save_path, "w") as f: + json.dump(data, f) + + loaded = Pipeline.load(save_path) + + # Find the operator node + op_node = None + fn_node = None + for node in loaded.compiled_nodes.values(): + if node.node_type == "operator": + op_node = node + elif node.node_type == "function": + fn_node = node + + assert fn_node is not None + assert fn_node.load_status == LoadStatus.READ_ONLY + + assert op_node is not None + assert op_node.load_status == LoadStatus.FULL + + # The operator should be able to compute from cached data + table = op_node.as_table() + assert table is not None + assert "doubled_age" in table.column_names + assert table.num_rows == 2 +``` + +- [ ] **Step 2: Run test** + +Run: `uv run pytest tests/test_pipeline/test_serialization.py::TestPipelineLoadWithUnavailableFunction::test_downstream_operator_computes_from_cached -v` +Expected: PASS + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_pipeline/test_serialization.py +git commit -m "test(pipeline): add downstream operator test for cached data flow" +``` + +--- + +### Task 11: Run full test suite and final cleanup + +**Files:** +- All modified files + +- [ ] **Step 1: Run the full test suite** + +Run: `uv run pytest tests/ -v --timeout=120` +Expected: All PASS + +- [ ] **Step 2: Fix any failures** + +Address any test failures discovered. + +- [ ] **Step 3: Final commit if any fixes were needed** + +```bash +git add -u +git commit -m "fix: address test failures from PacketFunctionProxy integration" +``` From 8d37f8347f7b8ea06ae2ec0f70c8e24a5c7bceea Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 22:26:26 +0000 Subject: [PATCH 03/12] feat(core): add PacketFunctionProxy and PacketFunctionUnavailableError Add PacketFunctionProxy that stands in for unavailable packet functions in deserialized pipelines, enabling cached data access without the original function. Add PacketFunctionUnavailableError for invocation attempts on unbound proxies. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/packet_function_proxy.py | 340 ++++++++++++++++++++++ src/orcapod/errors.py | 9 + 2 files changed, 349 insertions(+) create mode 100644 src/orcapod/core/packet_function_proxy.py diff --git a/src/orcapod/core/packet_function_proxy.py b/src/orcapod/core/packet_function_proxy.py new file mode 100644 index 00000000..362bd841 --- /dev/null +++ b/src/orcapod/core/packet_function_proxy.py @@ -0,0 +1,340 @@ +"""Proxy for unavailable packet functions in deserialized pipelines. + +When a pipeline is loaded in an environment where the original packet +function cannot be imported, ``PacketFunctionProxy`` stands in so that +``FunctionPod``, ``FunctionNode``, and ``CachedFunctionPod`` can still be +constructed and cached data can be accessed. Invoking the proxy raises +``PacketFunctionUnavailableError`` unless a real function has been bound +via :meth:`bind`. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from orcapod.core.packet_function import PacketFunctionBase +from orcapod.errors import PacketFunctionUnavailableError +from orcapod.protocols.core_protocols import PacketFunctionProtocol +from orcapod.types import ContentHash, Schema + +if TYPE_CHECKING: + from orcapod.protocols.core_protocols import PacketProtocol + from orcapod.protocols.core_protocols.executor import ( + PacketFunctionExecutorProtocol, + ) + + +_BUILTIN_TYPE_MAP: dict[str, type] = { + "": int, + "": float, + "": str, + "": bool, + "": bytes, +} + + +def _deserialize_schema_from_config(schema_dict: dict[str, str]) -> Schema: + """Reconstruct a Schema from a to_config() schema dict. + + Handles both ``str(python_type)`` format (e.g. ``""``), + used by ``PythonPacketFunction.to_config()``, and Arrow type string + format (e.g. ``"int64"``), used by the serialization module. + + Args: + schema_dict: Dict mapping field names to type strings. + + Returns: + A Schema with Python types reconstructed from the strings. + """ + from orcapod.pipeline.serialization import deserialize_schema + + result: dict[str, Any] = {} + needs_arrow_fallback: list[str] = [] + + for name, type_str in schema_dict.items(): + if type_str in _BUILTIN_TYPE_MAP: + result[name] = _BUILTIN_TYPE_MAP[type_str] + else: + needs_arrow_fallback.append(name) + + if needs_arrow_fallback: + arrow_result = deserialize_schema( + {k: schema_dict[k] for k in needs_arrow_fallback} + ) + result.update(arrow_result) + + return Schema(result) + + +class PacketFunctionProxy(PacketFunctionBase): + """Stand-in for an unavailable packet function. + + Satisfies ``PacketFunctionProtocol`` so pipeline construction succeeds. + All execution methods raise ``PacketFunctionUnavailableError`` until a + real function is attached via :meth:`bind`. + + Args: + config: Serialized packet function config dict (as produced by + ``PythonPacketFunction.to_config()``). + uri: The original URI tuple from ``FunctionPod.to_config()["uri"]``. + content_hash_str: Optional stored content hash string. + pipeline_hash_str: Optional stored pipeline hash string. + """ + + def __init__( + self, + config: dict[str, Any], + uri: tuple[str, ...], + content_hash_str: str | None = None, + pipeline_hash_str: str | None = None, + ) -> None: + inner = config["config"] + self._original_config = config + self._packet_function_type_id = config["packet_function_type_id"] + self._canonical_function_name = inner["callable_name"] + + # Eagerly deserialize schemas. + # to_config() stores types as str(python_type) e.g. "". + # We reconstruct proper Schema objects by going through the type + # converter (Python type -> Arrow -> Python) using the Arrow string + # representation. We also try deserialize_schema which handles + # Arrow-format strings. As a fallback we keep the raw config dicts + # for serialized-form comparison in bind(). + self._raw_input_schema_dict = inner["input_packet_schema"] + self._raw_output_schema_dict = inner["output_packet_schema"] + self._input_packet_schema = _deserialize_schema_from_config( + self._raw_input_schema_dict + ) + self._output_packet_schema = _deserialize_schema_from_config( + self._raw_output_schema_dict + ) + + # Stored identity hashes + self._stored_uri = uri + self._stored_content_hash = content_hash_str + self._stored_pipeline_hash = pipeline_hash_str + + # Late-binding slot + self._bound_function: PacketFunctionProtocol | None = None + + version = inner["version"] + super().__init__(version=version) + + # ==================== Identity properties ==================== + + @property + def packet_function_type_id(self) -> str: + """Unique function type identifier.""" + return self._packet_function_type_id + + @property + def canonical_function_name(self) -> str: + """Human-readable function identifier.""" + return self._canonical_function_name + + @property + def input_packet_schema(self) -> Schema: + """Schema describing the input packets this function accepts.""" + return self._input_packet_schema + + @property + def output_packet_schema(self) -> Schema: + """Schema describing the output packets this function produces.""" + return self._output_packet_schema + + @property + def uri(self) -> tuple[str, ...]: + """Return the stored URI, avoiding recomputation via semantic_hasher.""" + return self._stored_uri + + # ==================== Hash overrides ==================== + + def content_hash(self, hasher=None) -> ContentHash: + """Return the stored content hash.""" + if self._stored_content_hash is not None: + return ContentHash.from_string(self._stored_content_hash) + return super().content_hash(hasher) + + def pipeline_hash(self, hasher=None) -> ContentHash: + """Return the stored pipeline hash.""" + if self._stored_pipeline_hash is not None: + return ContentHash.from_string(self._stored_pipeline_hash) + return super().pipeline_hash(hasher) + + # ==================== Execution ==================== + + def _raise_unavailable(self) -> None: + """Raise ``PacketFunctionUnavailableError`` with context.""" + raise PacketFunctionUnavailableError( + f"Packet function '{self._canonical_function_name}' is not available. " + f"Use bind() to attach a real function, or access cached results only." + ) + + def call(self, packet: PacketProtocol) -> PacketProtocol | None: + """Process a single packet; delegates to bound function or raises.""" + if self._bound_function is not None: + return self._bound_function.call(packet) + self._raise_unavailable() + + async def async_call(self, packet: PacketProtocol) -> PacketProtocol | None: + """Async counterpart of ``call``.""" + if self._bound_function is not None: + return await self._bound_function.async_call(packet) + self._raise_unavailable() + + def direct_call(self, packet: PacketProtocol) -> PacketProtocol | None: + """Direct execution; delegates to bound function or raises.""" + if self._bound_function is not None: + return self._bound_function.direct_call(packet) + self._raise_unavailable() + + async def direct_async_call(self, packet: PacketProtocol) -> PacketProtocol | None: + """Async direct execution; delegates to bound function or raises.""" + if self._bound_function is not None: + return await self._bound_function.direct_async_call(packet) + self._raise_unavailable() + + # ==================== Variation / execution data ==================== + + def get_function_variation_data(self) -> dict[str, Any]: + """Return function variation data, or empty dict when unbound.""" + if self._bound_function is not None: + return self._bound_function.get_function_variation_data() + return {} + + def get_execution_data(self) -> dict[str, Any]: + """Return execution data, or empty dict when unbound.""" + if self._bound_function is not None: + return self._bound_function.get_execution_data() + return {} + + # ==================== Executor ==================== + + @property + def executor(self) -> PacketFunctionExecutorProtocol | None: + """Return executor from bound function, or None.""" + if self._bound_function is not None: + return self._bound_function.executor + return None + + @executor.setter + def executor(self, executor: PacketFunctionExecutorProtocol | None) -> None: + """Set executor on bound function; no-op when unbound.""" + if self._bound_function is not None: + self._bound_function.executor = executor + + # ==================== Serialization ==================== + + def to_config(self) -> dict[str, Any]: + """Return the original config dict.""" + return self._original_config + + @classmethod + def from_config(cls, config: dict[str, Any]) -> PacketFunctionProxy: + """Construct a proxy from a serialized config dict. + + Builds a placeholder URI from the config fields (callable_name, + version, packet_function_type_id with an empty schema hash). + + Args: + config: A dict as produced by ``PythonPacketFunction.to_config()``. + + Returns: + A new ``PacketFunctionProxy`` instance. + """ + inner = config.get("config", config) + name = inner["callable_name"] + version = inner.get("version", "v0.0") + type_id = config.get("packet_function_type_id", "unknown") + # Build placeholder URI: (name, empty_schema_hash, version, type_id) + uri = (name, "", version, type_id) + return cls(config=config, uri=uri) + + # ==================== Bind / unbind ==================== + + @property + def is_bound(self) -> bool: + """Return whether a real function is currently bound.""" + return self._bound_function is not None + + def bind(self, packet_function: PacketFunctionProtocol) -> None: + """Attach a real packet function, validating compatibility. + + Checks that the bound function matches this proxy on: + ``canonical_function_name``, ``major_version``, + ``packet_function_type_id``, ``input_packet_schema``, + ``output_packet_schema``, ``uri``, and ``content_hash()`` + (if a stored hash exists). + + Args: + packet_function: The real function to bind. + + Raises: + ValueError: If any compatibility check fails, listing all + mismatches. + """ + mismatches: list[str] = [] + + if packet_function.canonical_function_name != self._canonical_function_name: + mismatches.append( + f"canonical_function_name: expected {self._canonical_function_name!r}, " + f"got {packet_function.canonical_function_name!r}" + ) + + if packet_function.major_version != self.major_version: + mismatches.append( + f"major_version: expected {self.major_version!r}, " + f"got {packet_function.major_version!r}" + ) + + if packet_function.packet_function_type_id != self._packet_function_type_id: + mismatches.append( + f"packet_function_type_id: expected {self._packet_function_type_id!r}, " + f"got {packet_function.packet_function_type_id!r}" + ) + + if packet_function.input_packet_schema != self._input_packet_schema: + # Also compare via serialized form in case type repr differs + bound_input = { + k: str(v) for k, v in packet_function.input_packet_schema.items() + } + if bound_input != self._raw_input_schema_dict: + mismatches.append( + f"input_packet_schema: expected {self._input_packet_schema!r}, " + f"got {packet_function.input_packet_schema!r}" + ) + + if packet_function.output_packet_schema != self._output_packet_schema: + bound_output = { + k: str(v) for k, v in packet_function.output_packet_schema.items() + } + if bound_output != self._raw_output_schema_dict: + mismatches.append( + f"output_packet_schema: expected {self._output_packet_schema!r}, " + f"got {packet_function.output_packet_schema!r}" + ) + + if tuple(packet_function.uri) != tuple(self._stored_uri): + mismatches.append( + f"uri: expected {self._stored_uri!r}, " + f"got {tuple(packet_function.uri)!r}" + ) + + if self._stored_content_hash is not None: + bound_hash = packet_function.content_hash().to_string() + if bound_hash != self._stored_content_hash: + mismatches.append( + f"content_hash: expected {self._stored_content_hash!r}, " + f"got {bound_hash!r}" + ) + + if mismatches: + raise ValueError( + f"Cannot bind packet function: {', '.join(mismatches)}" + ) + + self._bound_function = packet_function + + def unbind(self) -> None: + """Detach the bound function, reverting to proxy mode.""" + self._bound_function = None diff --git a/src/orcapod/errors.py b/src/orcapod/errors.py index 9d1c05cf..c7a0e3d3 100644 --- a/src/orcapod/errors.py +++ b/src/orcapod/errors.py @@ -11,6 +11,15 @@ class DuplicateTagError(ValueError): pass +class PacketFunctionUnavailableError(RuntimeError): + """Raised when a packet function proxy is invoked without a bound function. + + This occurs when a pipeline is loaded in an environment where the + original packet function is not available. Only cached results can + be accessed. + """ + + class FieldNotResolvableError(LookupError): """ Raised when a source cannot resolve a field value for a given record ID. From a20c4808760d6478615e41b6ecabda6c693c8b50 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 22:26:32 +0000 Subject: [PATCH 04/12] test(proxy): add PacketFunctionProxy unit tests Cover invocation behavior (unbound raises, empty variation/execution data), bind/unbind lifecycle, and identity mismatch detection for function name, version, and output schema. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../test_packet_function_proxy.py | 160 ++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 tests/test_core/packet_function/test_packet_function_proxy.py diff --git a/tests/test_core/packet_function/test_packet_function_proxy.py b/tests/test_core/packet_function/test_packet_function_proxy.py new file mode 100644 index 00000000..19d25318 --- /dev/null +++ b/tests/test_core/packet_function/test_packet_function_proxy.py @@ -0,0 +1,160 @@ +"""Tests for PacketFunctionProxy invocation, bind, and unbind behavior.""" + +import pytest + +from orcapod.core.datagrams.tag_packet import Packet +from orcapod.core.packet_function import PythonPacketFunction +from orcapod.core.packet_function_proxy import PacketFunctionProxy +from orcapod.errors import PacketFunctionUnavailableError + + +# ==================== Helpers ==================== + + +def _make_sample_function() -> PythonPacketFunction: + def double_age(age: int) -> int: + return age * 2 + + return PythonPacketFunction(double_age, output_keys="doubled_age", version="v1.0") + + +def _make_proxy_from_function(pf: PythonPacketFunction) -> PacketFunctionProxy: + config = pf.to_config() + return PacketFunctionProxy( + config=config, + uri=tuple(pf.uri), + content_hash_str=pf.content_hash().to_string(), + pipeline_hash_str=pf.pipeline_hash().to_string(), + ) + + +# ==================== Task 3: Invocation tests ==================== + + +class TestPacketFunctionProxyInvocation: + """Tests for proxy behavior when no function is bound.""" + + def test_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + packet = Packet({"age": 25}) + with pytest.raises( + PacketFunctionUnavailableError, match="double_age" + ): + proxy.call(packet) + + @pytest.mark.asyncio + async def test_async_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + packet = Packet({"age": 25}) + with pytest.raises( + PacketFunctionUnavailableError, match="double_age" + ): + await proxy.async_call(packet) + + def test_direct_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + packet = Packet({"age": 25}) + with pytest.raises( + PacketFunctionUnavailableError, match="double_age" + ): + proxy.direct_call(packet) + + def test_variation_data_empty_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.get_function_variation_data() == {} + + def test_execution_data_empty_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.get_execution_data() == {} + + +# ==================== Task 4: Bind/unbind tests ==================== + + +class TestPacketFunctionProxyBinding: + """Tests for bind/unbind and identity mismatch detection.""" + + def test_bind_succeeds_with_matching_function(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.is_bound + + def test_call_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + packet = Packet({"age": 25}) + result = proxy.call(packet) + assert result is not None + assert result.as_dict()["doubled_age"] == 50 + + def test_variation_data_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + data = proxy.get_function_variation_data() + assert "function_name" in data + assert data["function_name"] == "double_age" + + def test_execution_data_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + data = proxy.get_execution_data() + assert "python_version" in data + + def test_unbind_reverts_to_proxy_mode(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.is_bound + proxy.unbind() + assert not proxy.is_bound + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError): + proxy.call(packet) + + def test_bind_rejects_mismatched_function_name(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def other_func(age: int) -> int: + return age + 1 + + other_pf = PythonPacketFunction( + other_func, output_keys="doubled_age", version="v1.0" + ) + with pytest.raises(ValueError, match="canonical_function_name"): + proxy.bind(other_pf) + + def test_bind_rejects_mismatched_version(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def double_age(age: int) -> int: + return age * 2 + + other_pf = PythonPacketFunction( + double_age, output_keys="doubled_age", version="v2.0" + ) + with pytest.raises(ValueError, match="major_version"): + proxy.bind(other_pf) + + def test_bind_rejects_mismatched_output_schema(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def double_age(age: int) -> str: + return str(age * 2) + + other_pf = PythonPacketFunction( + double_age, output_keys="doubled_age", version="v1.0" + ) + with pytest.raises(ValueError, match="output_packet_schema"): + proxy.bind(other_pf) From 2ce88b776472331b7525279b07dd5aa2cf96fc94 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 22:29:06 +0000 Subject: [PATCH 05/12] test(proxy): add missing executor, direct_async_call, and bind mismatch tests Co-Authored-By: Claude Sonnet 4.6 --- .../test_packet_function_proxy.py | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/test_core/packet_function/test_packet_function_proxy.py b/tests/test_core/packet_function/test_packet_function_proxy.py index 19d25318..82c62235 100644 --- a/tests/test_core/packet_function/test_packet_function_proxy.py +++ b/tests/test_core/packet_function/test_packet_function_proxy.py @@ -8,6 +8,23 @@ from orcapod.errors import PacketFunctionUnavailableError +# ==================== Task 2: Construction tests ==================== + + +class TestPacketFunctionProxyConstruction: + """Tests for proxy construction and executor property.""" + + def test_executor_returns_none(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.executor is None + + def test_executor_setter_is_noop(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.executor = None # should not raise + + # ==================== Helpers ==================== @@ -62,6 +79,16 @@ def test_direct_call_raises_when_unbound(self): ): proxy.direct_call(packet) + @pytest.mark.asyncio + async def test_direct_async_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + packet = Packet({"age": 25}) + with pytest.raises( + PacketFunctionUnavailableError, match="double_age" + ): + await proxy.direct_async_call(packet) + def test_variation_data_empty_when_unbound(self): pf = _make_sample_function() proxy = _make_proxy_from_function(pf) @@ -158,3 +185,16 @@ def double_age(age: int) -> str: ) with pytest.raises(ValueError, match="output_packet_schema"): proxy.bind(other_pf) + + def test_bind_rejects_mismatched_input_schema(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def double_age(name: str) -> int: + return len(name) * 2 + + other_pf = PythonPacketFunction( + double_age, output_keys="doubled_age", version="v1.0" + ) + with pytest.raises(ValueError, match="input_packet_schema"): + proxy.bind(other_pf) From 3f5e08f8b6bc31e9895bc5fb7d2193b49068544b Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 22:31:19 +0000 Subject: [PATCH 06/12] test(proxy): add FunctionPod integration tests Co-Authored-By: Claude Opus 4.6 (1M context) --- .../test_packet_function_proxy.py | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/tests/test_core/packet_function/test_packet_function_proxy.py b/tests/test_core/packet_function/test_packet_function_proxy.py index 82c62235..105f6680 100644 --- a/tests/test_core/packet_function/test_packet_function_proxy.py +++ b/tests/test_core/packet_function/test_packet_function_proxy.py @@ -2,9 +2,11 @@ import pytest -from orcapod.core.datagrams.tag_packet import Packet +from orcapod.core.datagrams.tag_packet import Packet, Tag +from orcapod.core.function_pod import FunctionPod from orcapod.core.packet_function import PythonPacketFunction from orcapod.core.packet_function_proxy import PacketFunctionProxy +from orcapod.core.sources.dict_source import DictSource from orcapod.errors import PacketFunctionUnavailableError @@ -198,3 +200,38 @@ def double_age(name: str) -> int: ) with pytest.raises(ValueError, match="input_packet_schema"): proxy.bind(other_pf) + + +# ==================== Task 5: FunctionPod with proxy ==================== + + +class TestFunctionPodWithProxy: + """Tests for FunctionPod constructed with a PacketFunctionProxy.""" + + def test_function_pod_constructs_with_proxy(self): + """FunctionPod accepts a proxy and exposes it as packet_function.""" + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + assert pod.packet_function is proxy + + def test_function_pod_output_schema(self): + """FunctionPod with proxy correctly reports output schema.""" + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + source = DictSource( + data=[{"age": 10}, {"age": 20}, {"age": 30}], + ) + _tag_schema, packet_schema = pod.output_schema(source) + assert "doubled_age" in packet_schema + + def test_function_pod_process_packet_raises(self): + """FunctionPod with unbound proxy raises on process_packet.""" + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + tag = Tag({}) + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError): + pod.process_packet(tag, packet) From 4c9bb0ae351a305a484a0015d83e4437c43d976c Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 22:34:09 +0000 Subject: [PATCH 07/12] feat(serialization): add fallback_to_proxy for packet function resolution When fallback_to_proxy=True and normal resolution fails (ImportError, AttributeError, unknown type ID), return a PacketFunctionProxy instead of raising. Mirrors the existing pattern in resolve_source_from_config. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/pipeline/serialization.py | 34 ++++++- .../test_serialization_helpers.py | 97 +++++++++++++++++++ 2 files changed, 127 insertions(+), 4 deletions(-) diff --git a/src/orcapod/pipeline/serialization.py b/src/orcapod/pipeline/serialization.py index 99ea59f6..4db970b6 100644 --- a/src/orcapod/pipeline/serialization.py +++ b/src/orcapod/pipeline/serialization.py @@ -199,28 +199,54 @@ def resolve_operator_from_config(config: dict[str, Any]) -> Any: return cls.from_config(config) -def resolve_packet_function_from_config(config: dict[str, Any]) -> Any: +def resolve_packet_function_from_config( + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, +) -> Any: """Reconstruct a packet function from a config dict. Args: config: Dict with at least a ``"packet_function_type_id"`` key matching a registered packet function type. + fallback_to_proxy: If ``True`` and reconstruction fails (unknown type, + ``ImportError``, ``AttributeError``, etc.), return a + ``PacketFunctionProxy`` instead of raising. Returns: - A new packet function instance constructed from the config. + A new packet function instance constructed from the config, or a + ``PacketFunctionProxy`` if reconstruction fails and + *fallback_to_proxy* is ``True``. Raises: - ValueError: If the type ID is missing or unknown. + ValueError: If the type ID is missing or unknown and + *fallback_to_proxy* is ``False``. """ _ensure_registries() type_id = config.get("packet_function_type_id") if type_id not in PACKET_FUNCTION_REGISTRY: + if fallback_to_proxy: + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + return PacketFunctionProxy.from_config(config) raise ValueError( f"Unknown packet function type: {type_id!r}. " f"Known types: {sorted(PACKET_FUNCTION_REGISTRY.keys())}" ) cls = PACKET_FUNCTION_REGISTRY[type_id] - return cls.from_config(config) + try: + return cls.from_config(config) + except Exception: + if fallback_to_proxy: + logger.warning( + "Could not reconstruct %s packet function; returning " + "PacketFunctionProxy.", + type_id, + ) + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + return PacketFunctionProxy.from_config(config) + raise def resolve_source_from_config( diff --git a/tests/test_pipeline/test_serialization_helpers.py b/tests/test_pipeline/test_serialization_helpers.py index 90d778cb..91eb7e11 100644 --- a/tests/test_pipeline/test_serialization_helpers.py +++ b/tests/test_pipeline/test_serialization_helpers.py @@ -294,3 +294,100 @@ def test_preserves_field_order(self): schema = {"z": int, "a": str, "m": float} result = self._round_trip(schema) assert list(result.keys()) == ["z", "a", "m"] + + +# --------------------------------------------------------------------------- +# PacketFunction proxy fallback +# --------------------------------------------------------------------------- + + +class TestResolvePacketFunctionFallbackToProxy: + """resolve_packet_function_from_config with fallback_to_proxy.""" + + def test_resolve_packet_function_fallback_to_proxy(self): + from orcapod.core.packet_function_proxy import PacketFunctionProxy + from orcapod.pipeline.serialization import resolve_packet_function_from_config + + config = { + "packet_function_type_id": "python.function.v0", + "config": { + "module_path": "nonexistent.module.that.does.not.exist", + "callable_name": "some_func", + "version": "v1.0", + "input_packet_schema": {"x": "int64"}, + "output_packet_schema": {"y": "float64"}, + "output_keys": ["y"], + }, + } + # Without fallback, should raise + with pytest.raises(Exception): + resolve_packet_function_from_config(config) + + # With fallback, should return proxy + result = resolve_packet_function_from_config(config, fallback_to_proxy=True) + assert isinstance(result, PacketFunctionProxy) + assert result.canonical_function_name == "some_func" + + def test_unknown_type_id_fallback(self): + from orcapod.core.packet_function_proxy import PacketFunctionProxy + from orcapod.pipeline.serialization import resolve_packet_function_from_config + + config = { + "packet_function_type_id": "unknown.type.v99", + "config": { + "callable_name": "mystery", + "version": "v1.0", + "input_packet_schema": {"a": "int64"}, + "output_packet_schema": {"b": "int64"}, + "output_keys": ["b"], + }, + } + with pytest.raises(ValueError, match="Unknown packet function type"): + resolve_packet_function_from_config(config) + + result = resolve_packet_function_from_config(config, fallback_to_proxy=True) + assert isinstance(result, PacketFunctionProxy) + assert result.canonical_function_name == "mystery" + + +# --------------------------------------------------------------------------- +# FunctionPod.from_config proxy fallback +# --------------------------------------------------------------------------- + + +class TestFunctionPodFromConfigFallbackToProxy: + """FunctionPod.from_config with fallback_to_proxy.""" + + def test_function_pod_from_config_fallback_to_proxy(self): + from orcapod.core.function_pod import FunctionPod + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + config = { + "uri": ["some_func", "hash123", "v1", "python.function.v0"], + "packet_function": { + "packet_function_type_id": "python.function.v0", + "config": { + "module_path": "nonexistent.module", + "callable_name": "some_func", + "version": "v1.0", + "input_packet_schema": {"x": "int64"}, + "output_packet_schema": {"y": "float64"}, + "output_keys": ["y"], + }, + }, + "node_config": None, + } + + with pytest.raises(Exception): + FunctionPod.from_config(config) + + pod = FunctionPod.from_config(config, fallback_to_proxy=True) + assert isinstance(pod.packet_function, PacketFunctionProxy) + assert pod.packet_function.canonical_function_name == "some_func" + # URI should be injected from parent config + assert pod.packet_function.uri == ( + "some_func", + "hash123", + "v1", + "python.function.v0", + ) From b187166ceba0e7fb5b01bd46f8975419db211461 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 22:34:14 +0000 Subject: [PATCH 08/12] feat(function_pod): add fallback_to_proxy to FunctionPod.from_config When fallback_to_proxy=True, passes through to resolve_packet_function_from_config. If a PacketFunctionProxy is returned, injects the URI from the parent config's "uri" field. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/function_pod.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/orcapod/core/function_pod.py b/src/orcapod/core/function_pod.py index 5a1db639..cf5f2f17 100644 --- a/src/orcapod/core/function_pod.py +++ b/src/orcapod/core/function_pod.py @@ -295,11 +295,20 @@ def to_config(self) -> dict[str, Any]: return config @classmethod - def from_config(cls, config: dict[str, Any]) -> "FunctionPod": + def from_config( + cls, + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, + ) -> "FunctionPod": """Reconstruct a FunctionPod from a config dict. Args: config: A dict as produced by :meth:`to_config`. + fallback_to_proxy: If ``True`` and the packet function cannot be + resolved, use a ``PacketFunctionProxy`` instead of raising. + When a proxy is returned, the URI from the parent config's + ``"uri"`` field is injected into the proxy. Returns: A new ``FunctionPod`` instance. @@ -307,7 +316,20 @@ def from_config(cls, config: dict[str, Any]) -> "FunctionPod": from orcapod.pipeline.serialization import resolve_packet_function_from_config pf_config = config["packet_function"] - packet_function = resolve_packet_function_from_config(pf_config) + + if fallback_to_proxy: + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + packet_function = resolve_packet_function_from_config( + pf_config, fallback_to_proxy=True + ) + # If we got a proxy, inject the URI from the parent config + if isinstance(packet_function, PacketFunctionProxy): + uri_list = config.get("uri") + if uri_list is not None: + packet_function._stored_uri = tuple(uri_list) + else: + packet_function = resolve_packet_function_from_config(pf_config) node_config = None if config.get("node_config") is not None: From e662c817b0f00e72ce0c854238df6f087ad5d4b7 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 22:35:07 +0000 Subject: [PATCH 09/12] feat(pipeline): use PacketFunctionProxy in load, relax upstream usability - Relax upstream usability checks to accept both FULL and READ_ONLY status, enabling downstream nodes to be reconstructed when their upstream uses a proxy. - Rewrite _load_function_node to use fallback_to_proxy=True so that unavailable functions get a PacketFunctionProxy instead of falling back to fully read-only mode. Nodes with a proxy get READ_ONLY status and their cached function pod record path is restored from the descriptor. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/pipeline/graph.py | 72 +++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 21 deletions(-) diff --git a/src/orcapod/pipeline/graph.py b/src/orcapod/pipeline/graph.py index c5a9595e..611f6f83 100644 --- a/src/orcapod/pipeline/graph.py +++ b/src/orcapod/pipeline/graph.py @@ -722,7 +722,8 @@ def load(cls, path: str | Path, mode: str = "full") -> "Pipeline": upstream_usable = ( upstream_node is not None and hasattr(upstream_node, "load_status") - and upstream_node.load_status == LoadStatus.FULL + and upstream_node.load_status + in (LoadStatus.FULL, LoadStatus.READ_ONLY) ) # Build databases dict @@ -743,7 +744,9 @@ def load(cls, path: str | Path, mode: str = "full") -> "Pipeline": # Check if all upstreams are usable all_upstreams_usable = ( all( - hasattr(n, "load_status") and n.load_status == LoadStatus.FULL + hasattr(n, "load_status") + and n.load_status + in (LoadStatus.FULL, LoadStatus.READ_ONLY) for n in upstream_nodes ) if upstream_nodes @@ -857,40 +860,67 @@ def _load_function_node( ) -> FunctionNode: """Reconstruct a FunctionNode from a descriptor. + When the upstream is usable and mode is not ``"read_only"``, attempts + to reconstruct the function pod with ``fallback_to_proxy=True`` so + that a ``PacketFunctionProxy`` is used when the original function + cannot be imported. Nodes backed by a proxy get + ``LoadStatus.READ_ONLY`` — they can read cached results but cannot + compute new ones. + Args: descriptor: The serialized node descriptor. mode: Load mode. upstream_node: The reconstructed upstream node, or ``None``. - upstream_usable: Whether the upstream is in FULL mode. + upstream_usable: Whether the upstream is usable (FULL or + READ_ONLY). databases: Database role mapping. Returns: A ``FunctionNode`` instance. """ from orcapod.core.function_pod import FunctionPod + from orcapod.core.packet_function_proxy import PacketFunctionProxy + from orcapod.pipeline.serialization import LoadStatus + + if mode != "read_only" and upstream_usable: + try: + pod = FunctionPod.from_config( + descriptor["function_pod"], fallback_to_proxy=True + ) + node = FunctionNode.from_descriptor( + descriptor, + function_pod=pod, + input_stream=upstream_node, + databases=databases, + ) - if mode == "full": - if not upstream_usable: + if isinstance(pod.packet_function, PacketFunctionProxy): + node._load_status = LoadStatus.READ_ONLY + # Override the cached function pod's record path to match + # the stored path from the original pipeline run. + stored_result_path = tuple( + descriptor.get("result_record_path", ()) + ) + if stored_result_path and node._cached_function_pod is not None: + node._cached_function_pod._cache._record_path = ( + stored_result_path + ) + else: + node._load_status = LoadStatus.FULL + + return node + except Exception: logger.warning( - "Upstream for function node %r is not usable, " + "Failed to reconstruct function node %r, " "falling back to read-only.", descriptor.get("label"), ) - else: - try: - pod = FunctionPod.from_config(descriptor["function_pod"]) - return FunctionNode.from_descriptor( - descriptor, - function_pod=pod, - input_stream=upstream_node, - databases=databases, - ) - except Exception: - logger.warning( - "Failed to reconstruct function node %r, " - "falling back to read-only.", - descriptor.get("label"), - ) + elif mode != "read_only" and not upstream_usable: + logger.warning( + "Upstream for function node %r is not usable, " + "falling back to read-only.", + descriptor.get("label"), + ) return FunctionNode.from_descriptor( descriptor, From d5f3d7165004d1116b7fe69a1c5d87d2e31d18ea Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 22:39:58 +0000 Subject: [PATCH 10/12] test(pipeline): add integration tests for loading with unavailable function Add TestPipelineLoadWithUnavailableFunction class with 4 tests: - test_load_with_unavailable_function_has_read_only_status - test_load_with_unavailable_function_can_get_all_records - test_load_with_unavailable_function_iter_packets_yields_cached - test_downstream_operator_computes_from_cached These tests verify that when a pipeline is saved, the function module_path is corrupted (simulating an unavailable function), and the pipeline is reloaded, the function node gets a PacketFunctionProxy with READ_ONLY status and can still serve cached data. The downstream operator test confirms that a SelectPacketColumns operator can compute from the READ_ONLY function node's cached output. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/test_pipeline/test_serialization.py | 187 ++++++++++++++++++++++ 1 file changed, 187 insertions(+) diff --git a/tests/test_pipeline/test_serialization.py b/tests/test_pipeline/test_serialization.py index 86357911..2eb8bd31 100644 --- a/tests/test_pipeline/test_serialization.py +++ b/tests/test_pipeline/test_serialization.py @@ -1067,3 +1067,190 @@ def test_function_database_round_trip(self, tmp_path): assert loaded.function_database is not None assert type(loaded.function_database) is DeltaTableDatabase + + +# --------------------------------------------------------------------------- +# Pipeline load with unavailable function (Tasks 9-10) +# --------------------------------------------------------------------------- + + +def _double_age(age: int) -> tuple[int, int]: + """A simple function that doubles age and preserves original.""" + return age * 2, age + + +def _corrupt_function_module_path(save_path): + """Edit saved pipeline JSON to make function module unimportable.""" + with open(save_path) as f: + data = json.load(f) + for node in data["nodes"].values(): + if node.get("node_type") == "function": + pf_config = node["function_pod"]["packet_function"]["config"] + pf_config["module_path"] = "nonexistent.module.that.does.not.exist" + with open(save_path, "w") as f: + json.dump(data, f) + + +class TestPipelineLoadWithUnavailableFunction: + """Tests for loading a pipeline when the function cannot be imported.""" + + @staticmethod + def _write_csv(path, rows): + """Write a list of dicts as a CSV file.""" + fieldnames = list(rows[0].keys()) + with open(path, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + def _build_and_save_pipeline(self, tmp_path): + """Build pipeline with CSV source and function pod, run it, save to JSON. + + Uses CSVSource (reconstructable) so the source survives load and the + function node enters the proxy path rather than the pure read-only path. + + Returns: + Tuple of (save_path, db_path). + """ + csv_path = str(tmp_path / "data.csv") + self._write_csv( + csv_path, + [{"name": "alice", "age": "30"}, {"name": "bob", "age": "25"}], + ) + + db_path = str(tmp_path / "db") + db = DeltaTableDatabase(base_path=db_path) + source = CSVSource( + file_path=csv_path, + tag_columns=["name"], + source_id="people", + ) + pf = PythonPacketFunction( + function=_double_age, + output_keys=["doubled_age", "original_age"], + function_name="_double_age", + ) + pod = FunctionPod(packet_function=pf) + pipeline = Pipeline(name="proxy_test", pipeline_database=db) + with pipeline: + pod.process(source, label="transform") + pipeline.run() + db.flush() + + save_path = str(tmp_path / "pipeline.json") + pipeline.save(save_path) + return save_path, db_path + + def _build_and_save_pipeline_with_operator(self, tmp_path): + """Build pipeline: CSV source -> function_pod -> SelectPacketColumns. + + Returns: + Tuple of (save_path, db_path). + """ + from orcapod.core.operators import SelectPacketColumns + + csv_path = str(tmp_path / "data.csv") + self._write_csv( + csv_path, + [{"name": "alice", "age": "30"}, {"name": "bob", "age": "25"}], + ) + + db_path = str(tmp_path / "db") + db = DeltaTableDatabase(base_path=db_path) + source = CSVSource( + file_path=csv_path, + tag_columns=["name"], + source_id="people", + ) + pf = PythonPacketFunction( + function=_double_age, + output_keys=["doubled_age", "original_age"], + function_name="_double_age", + ) + pod = FunctionPod(packet_function=pf) + select_op = SelectPacketColumns(columns=["doubled_age"]) + + pipeline = Pipeline(name="proxy_op_test", pipeline_database=db) + with pipeline: + fn_result = pod.process(source, label="transform") + select_op.process(fn_result, label="select_col") + pipeline.run() + db.flush() + + save_path = str(tmp_path / "pipeline.json") + pipeline.save(save_path) + return save_path, db_path + + # -- Task 9 tests -- + + def test_load_with_unavailable_function_has_read_only_status(self, tmp_path): + """Loading a pipeline with corrupted module_path yields READ_ONLY function node.""" + save_path, _ = self._build_and_save_pipeline(tmp_path) + _corrupt_function_module_path(save_path) + + loaded = Pipeline.load(save_path, mode="full") + + fn_nodes = [ + n + for n in loaded.compiled_nodes.values() + if n.node_type == "function" + ] + assert len(fn_nodes) == 1 + assert fn_nodes[0].load_status == LoadStatus.READ_ONLY + + def test_load_with_unavailable_function_can_get_all_records(self, tmp_path): + """A READ_ONLY function node with proxy can retrieve all cached records.""" + save_path, _ = self._build_and_save_pipeline(tmp_path) + _corrupt_function_module_path(save_path) + + loaded = Pipeline.load(save_path, mode="full") + fn_node = loaded.compiled_nodes["transform"] + + records = fn_node.get_all_records() + assert records is not None + assert records.num_rows == 2 + + def test_load_with_unavailable_function_iter_packets_yields_cached(self, tmp_path): + """A READ_ONLY function node with proxy yields cached packets via iter_packets.""" + save_path, _ = self._build_and_save_pipeline(tmp_path) + _corrupt_function_module_path(save_path) + + loaded = Pipeline.load(save_path, mode="full") + fn_node = loaded.compiled_nodes["transform"] + + packets = list(fn_node.iter_packets()) + assert len(packets) == 2 + + # Verify actual data values + doubled_ages = sorted(p.as_dict()["doubled_age"] for _, p in packets) + assert doubled_ages == [50, 60] + + original_ages = sorted(p.as_dict()["original_age"] for _, p in packets) + assert original_ages == [25, 30] + + # -- Task 10 test -- + + def test_downstream_operator_computes_from_cached(self, tmp_path): + """Operator downstream of READ_ONLY function node can compute from cached data.""" + save_path, _ = self._build_and_save_pipeline_with_operator(tmp_path) + _corrupt_function_module_path(save_path) + + loaded = Pipeline.load(save_path, mode="full") + + # Function node should be READ_ONLY (proxy) + fn_node = loaded.compiled_nodes["transform"] + assert fn_node.load_status == LoadStatus.READ_ONLY + + # Operator node should be FULL (it can reconstruct from cached upstream) + op_node = loaded.compiled_nodes["select_col"] + assert op_node.load_status == LoadStatus.FULL + + # Operator should produce correct results + table = op_node.as_table() + assert table.num_rows == 2 + assert "doubled_age" in table.column_names + # original_age should have been dropped by SelectPacketColumns + assert "original_age" not in table.column_names + + doubled_ages = sorted(table.column("doubled_age").to_pylist()) + assert doubled_ages == [50, 60] From bea5d7ed096f0a61e56e8fa92a150d772bb3d393 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 22:46:48 +0000 Subject: [PATCH 11/12] docs: add PR target branch and Linear status update policies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Feature branch PRs always target dev; dev→main for versioning - Update Linear issue status to In Progress when starting work Co-Authored-By: Claude Opus 4.6 (1M context) --- .zed/rules | 6 +++++- CLAUDE.md | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/.zed/rules b/.zed/rules index dc83f5e7..ba2385a4 100644 --- a/.zed/rules +++ b/.zed/rules @@ -59,7 +59,8 @@ refactor: 1. Check for an existing issue — search Linear for a corresponding issue. 2. If none exists — ask the developer whether to create one. Do not proceed without either a linked issue or explicit approval to skip. -3. When a new issue is discovered during development (bug, design problem, deferred +3. When starting work on an issue — update its Linear status to "In Progress". +4. When a new issue is discovered during development (bug, design problem, deferred work), create a corresponding Linear issue using the template below. When creating Linear issues, always use this template for the description: @@ -96,6 +97,9 @@ Remove any optional sections that don't apply rather than leaving them empty. When working on a feature, create and checkout a git branch using the gitBranchName returned by the primary Linear issue (e.g. eywalker/plt-911-add-documentation-for-orcapod-python). +Feature branch PRs always target the "dev" branch. The dev → main PR is used +for versioning/releases only. + If a feature branch / PR corresponds to multiple Linear issues, list all of them in the PR description body so that Linear's GitHub integration auto-tracks the PR against each issue. Use the format "Fixes PLT-123" or "Closes PLT-123" (GitHub magic words) for issues diff --git a/CLAUDE.md b/CLAUDE.md index 722c6d2a..e71f6add 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -63,7 +63,8 @@ refactor: 1. **Check for an existing issue** — search Linear for a corresponding issue. 2. **If none exists** — ask the developer whether to create one. Do not proceed without either a linked issue or explicit approval to skip. -3. **When a new issue is discovered** during development (bug, design problem, deferred +3. **When starting work on an issue** — update its Linear status to **In Progress**. +4. **When a new issue is discovered** during development (bug, design problem, deferred work), create a corresponding Linear issue using the template below. When creating Linear issues, always use this template for the description: @@ -102,6 +103,9 @@ Remove any optional sections that don't apply rather than leaving them empty. When working on a feature, create and checkout a git branch using the `gitBranchName` returned by the primary Linear issue (e.g. `eywalker/plt-911-add-documentation-for-orcapod-python`). +**Feature branch PRs always target the `dev` branch.** The `dev` → `main` PR is used +for versioning/releases only. + If a feature branch / PR corresponds to multiple Linear issues, list all of them in the PR description body so that Linear's GitHub integration auto-tracks the PR against each issue. Use the format `Fixes PLT-123` or `Closes PLT-123` (GitHub magic words) for issues From 1b89161900c750677c54713eb76173ee572c4e54 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Sun, 15 Mar 2026 23:33:44 +0000 Subject: [PATCH 12/12] fix(review): address PR #87 review feedback (PLT-931) - Add uri to PythonPacketFunction.to_config() output - Refactor PacketFunctionProxy to read URI from config (remove uri param) - Move _BUILTIN_TYPE_MAP to pipeline/serialization.py - Coerce unrecognized type strings to object in _deserialize_schema_from_config - Simplify FunctionPod.from_config() (remove proxy-specific URI injection) - Remove record_path override from _load_function_node in graph.py - Fix result_path_prefix derivation in FunctionNode.from_descriptor - Narrow exception handling to ImportError/ModuleNotFoundError/AttributeError - Fix test specificity (pytest.raises with specific exception types) - Add schema round-trip consistency and hash preservation tests Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/function_pod.py | 19 +-- src/orcapod/core/nodes/function_node.py | 15 +- src/orcapod/core/packet_function.py | 1 + src/orcapod/core/packet_function_proxy.py | 148 +++++++++--------- src/orcapod/pipeline/graph.py | 10 +- src/orcapod/pipeline/serialization.py | 21 ++- .../test_packet_function_proxy.py | 1 - .../test_serialization_helpers.py | 55 ++++++- 8 files changed, 156 insertions(+), 114 deletions(-) diff --git a/src/orcapod/core/function_pod.py b/src/orcapod/core/function_pod.py index cf5f2f17..7e78cddd 100644 --- a/src/orcapod/core/function_pod.py +++ b/src/orcapod/core/function_pod.py @@ -307,8 +307,6 @@ def from_config( config: A dict as produced by :meth:`to_config`. fallback_to_proxy: If ``True`` and the packet function cannot be resolved, use a ``PacketFunctionProxy`` instead of raising. - When a proxy is returned, the URI from the parent config's - ``"uri"`` field is injected into the proxy. Returns: A new ``FunctionPod`` instance. @@ -316,20 +314,9 @@ def from_config( from orcapod.pipeline.serialization import resolve_packet_function_from_config pf_config = config["packet_function"] - - if fallback_to_proxy: - from orcapod.core.packet_function_proxy import PacketFunctionProxy - - packet_function = resolve_packet_function_from_config( - pf_config, fallback_to_proxy=True - ) - # If we got a proxy, inject the URI from the parent config - if isinstance(packet_function, PacketFunctionProxy): - uri_list = config.get("uri") - if uri_list is not None: - packet_function._stored_uri = tuple(uri_list) - else: - packet_function = resolve_packet_function_from_config(pf_config) + packet_function = resolve_packet_function_from_config( + pf_config, fallback_to_proxy=fallback_to_proxy + ) node_config = None if config.get("node_config") is not None: diff --git a/src/orcapod/core/nodes/function_node.py b/src/orcapod/core/nodes/function_node.py index ca3b198c..2bb8a911 100644 --- a/src/orcapod/core/nodes/function_node.py +++ b/src/orcapod/core/nodes/function_node.py @@ -224,23 +224,30 @@ def from_descriptor( pipeline_path = tuple(descriptor.get("pipeline_path", ())) # Derive pipeline_path_prefix by stripping the suffix that # __init__ appends (packet_function.uri + node hash element). - # We pass the full pipeline_path_prefix from the descriptor. # The descriptor stores the complete pipeline_path; we need # to reconstruct the prefix that was originally passed to # __init__. The suffix added is: pf.uri + (f"node:{hash}",). - # Instead of reverse-engineering, use the descriptor's path - # minus what __init__ will add. For full mode we let __init__ - # recompute pipeline_path from the prefix. pf_uri_len = len(function_pod.packet_function.uri) + 1 # +1 for node:hash prefix = ( pipeline_path[:-pf_uri_len] if len(pipeline_path) > pf_uri_len else () ) + # Derive result_path_prefix from the stored result_record_path + # by stripping the URI suffix that CachedFunctionPod appends. + stored_result_path = tuple( + descriptor.get("result_record_path", ()) + ) + uri_len = len(function_pod.packet_function.uri) + result_prefix: tuple[str, ...] | None = None + if stored_result_path and len(stored_result_path) > uri_len: + result_prefix = stored_result_path[:-uri_len] + node = cls( function_pod=function_pod, input_stream=input_stream, pipeline_database=pipeline_db, result_database=result_db, + result_path_prefix=result_prefix, pipeline_path_prefix=prefix, label=descriptor.get("label"), ) diff --git a/src/orcapod/core/packet_function.py b/src/orcapod/core/packet_function.py index bd26ea11..5ee9afdb 100644 --- a/src/orcapod/core/packet_function.py +++ b/src/orcapod/core/packet_function.py @@ -574,6 +574,7 @@ def to_config(self) -> dict[str, Any]: """ return { "packet_function_type_id": self.packet_function_type_id, + "uri": list(self.uri), "config": { "module_path": self._function.__module__, "callable_name": self._function_name, diff --git a/src/orcapod/core/packet_function_proxy.py b/src/orcapod/core/packet_function_proxy.py index 362bd841..615b94a9 100644 --- a/src/orcapod/core/packet_function_proxy.py +++ b/src/orcapod/core/packet_function_proxy.py @@ -10,61 +10,13 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import Any from orcapod.core.packet_function import PacketFunctionBase from orcapod.errors import PacketFunctionUnavailableError from orcapod.protocols.core_protocols import PacketFunctionProtocol from orcapod.types import ContentHash, Schema -if TYPE_CHECKING: - from orcapod.protocols.core_protocols import PacketProtocol - from orcapod.protocols.core_protocols.executor import ( - PacketFunctionExecutorProtocol, - ) - - -_BUILTIN_TYPE_MAP: dict[str, type] = { - "": int, - "": float, - "": str, - "": bool, - "": bytes, -} - - -def _deserialize_schema_from_config(schema_dict: dict[str, str]) -> Schema: - """Reconstruct a Schema from a to_config() schema dict. - - Handles both ``str(python_type)`` format (e.g. ``""``), - used by ``PythonPacketFunction.to_config()``, and Arrow type string - format (e.g. ``"int64"``), used by the serialization module. - - Args: - schema_dict: Dict mapping field names to type strings. - - Returns: - A Schema with Python types reconstructed from the strings. - """ - from orcapod.pipeline.serialization import deserialize_schema - - result: dict[str, Any] = {} - needs_arrow_fallback: list[str] = [] - - for name, type_str in schema_dict.items(): - if type_str in _BUILTIN_TYPE_MAP: - result[name] = _BUILTIN_TYPE_MAP[type_str] - else: - needs_arrow_fallback.append(name) - - if needs_arrow_fallback: - arrow_result = deserialize_schema( - {k: schema_dict[k] for k in needs_arrow_fallback} - ) - result.update(arrow_result) - - return Schema(result) - class PacketFunctionProxy(PacketFunctionBase): """Stand-in for an unavailable packet function. @@ -76,7 +28,6 @@ class PacketFunctionProxy(PacketFunctionBase): Args: config: Serialized packet function config dict (as produced by ``PythonPacketFunction.to_config()``). - uri: The original URI tuple from ``FunctionPod.to_config()["uri"]``. content_hash_str: Optional stored content hash string. pipeline_hash_str: Optional stored pipeline hash string. """ @@ -84,7 +35,6 @@ class PacketFunctionProxy(PacketFunctionBase): def __init__( self, config: dict[str, Any], - uri: tuple[str, ...], content_hash_str: str | None = None, pipeline_hash_str: str | None = None, ) -> None: @@ -94,12 +44,6 @@ def __init__( self._canonical_function_name = inner["callable_name"] # Eagerly deserialize schemas. - # to_config() stores types as str(python_type) e.g. "". - # We reconstruct proper Schema objects by going through the type - # converter (Python type -> Arrow -> Python) using the Arrow string - # representation. We also try deserialize_schema which handles - # Arrow-format strings. As a fallback we keep the raw config dicts - # for serialized-form comparison in bind(). self._raw_input_schema_dict = inner["input_packet_schema"] self._raw_output_schema_dict = inner["output_packet_schema"] self._input_packet_schema = _deserialize_schema_from_config( @@ -109,17 +53,32 @@ def __init__( self._raw_output_schema_dict ) + # Call super().__init__ so that major_version and + # output_packet_schema_hash are available for URI fallback. + version = inner["version"] + super().__init__(version=version) + + # URI: read from config if present, otherwise compute from metadata. + uri_list = config.get("uri") + if uri_list is not None: + self._stored_uri = tuple(uri_list) + else: + # Fallback: compute from available metadata + # (same structure as PacketFunctionBase.uri). + self._stored_uri = ( + self._canonical_function_name, + self.output_packet_schema_hash, + f"v{self.major_version}", + self._packet_function_type_id, + ) + # Stored identity hashes - self._stored_uri = uri self._stored_content_hash = content_hash_str self._stored_pipeline_hash = pipeline_hash_str # Late-binding slot self._bound_function: PacketFunctionProtocol | None = None - version = inner["version"] - super().__init__(version=version) - # ==================== Identity properties ==================== @property @@ -170,25 +129,25 @@ def _raise_unavailable(self) -> None: f"Use bind() to attach a real function, or access cached results only." ) - def call(self, packet: PacketProtocol) -> PacketProtocol | None: + def call(self, packet: "PacketProtocol") -> "PacketProtocol | None": """Process a single packet; delegates to bound function or raises.""" if self._bound_function is not None: return self._bound_function.call(packet) self._raise_unavailable() - async def async_call(self, packet: PacketProtocol) -> PacketProtocol | None: + async def async_call(self, packet: "PacketProtocol") -> "PacketProtocol | None": """Async counterpart of ``call``.""" if self._bound_function is not None: return await self._bound_function.async_call(packet) self._raise_unavailable() - def direct_call(self, packet: PacketProtocol) -> PacketProtocol | None: + def direct_call(self, packet: "PacketProtocol") -> "PacketProtocol | None": """Direct execution; delegates to bound function or raises.""" if self._bound_function is not None: return self._bound_function.direct_call(packet) self._raise_unavailable() - async def direct_async_call(self, packet: PacketProtocol) -> PacketProtocol | None: + async def direct_async_call(self, packet: "PacketProtocol") -> "PacketProtocol | None": """Async direct execution; delegates to bound function or raises.""" if self._bound_function is not None: return await self._bound_function.direct_async_call(packet) @@ -211,14 +170,14 @@ def get_execution_data(self) -> dict[str, Any]: # ==================== Executor ==================== @property - def executor(self) -> PacketFunctionExecutorProtocol | None: + def executor(self) -> "PacketFunctionExecutorProtocol | None": """Return executor from bound function, or None.""" if self._bound_function is not None: return self._bound_function.executor return None @executor.setter - def executor(self, executor: PacketFunctionExecutorProtocol | None) -> None: + def executor(self, executor: "PacketFunctionExecutorProtocol | None") -> None: """Set executor on bound function; no-op when unbound.""" if self._bound_function is not None: self._bound_function.executor = executor @@ -233,8 +192,8 @@ def to_config(self) -> dict[str, Any]: def from_config(cls, config: dict[str, Any]) -> PacketFunctionProxy: """Construct a proxy from a serialized config dict. - Builds a placeholder URI from the config fields (callable_name, - version, packet_function_type_id with an empty schema hash). + The URI is read from the config's ``"uri"`` field if present; + otherwise a fallback is computed from config metadata. Args: config: A dict as produced by ``PythonPacketFunction.to_config()``. @@ -242,13 +201,7 @@ def from_config(cls, config: dict[str, Any]) -> PacketFunctionProxy: Returns: A new ``PacketFunctionProxy`` instance. """ - inner = config.get("config", config) - name = inner["callable_name"] - version = inner.get("version", "v0.0") - type_id = config.get("packet_function_type_id", "unknown") - # Build placeholder URI: (name, empty_schema_hash, version, type_id) - uri = (name, "", version, type_id) - return cls(config=config, uri=uri) + return cls(config=config) # ==================== Bind / unbind ==================== @@ -338,3 +291,46 @@ def bind(self, packet_function: PacketFunctionProtocol) -> None: def unbind(self) -> None: """Detach the bound function, reverting to proxy mode.""" self._bound_function = None + + +def _deserialize_schema_from_config(schema_dict: dict[str, str]) -> Schema: + """Reconstruct a Schema from a to_config() schema dict. + + Handles both ``str(python_type)`` format (e.g. ``""``), + used by ``PythonPacketFunction.to_config()``, and Arrow type string + format (e.g. ``"int64"``), used by the serialization module. + + Unrecognized type strings are coerced to ``object``. + + Args: + schema_dict: Dict mapping field names to type strings. + + Returns: + A Schema with Python types reconstructed from the strings. + """ + from orcapod.pipeline.serialization import ( + _BUILTIN_TYPE_MAP, + deserialize_schema, + ) + + result: dict[str, Any] = {} + needs_arrow_fallback: list[str] = [] + + for name, type_str in schema_dict.items(): + if type_str in _BUILTIN_TYPE_MAP: + result[name] = _BUILTIN_TYPE_MAP[type_str] + else: + needs_arrow_fallback.append(name) + + if needs_arrow_fallback: + arrow_result = deserialize_schema( + {k: schema_dict[k] for k in needs_arrow_fallback} + ) + for k, v in arrow_result.items(): + # If deserialize_schema fell back to a raw string, coerce to object + if isinstance(v, str): + result[k] = object + else: + result[k] = v + + return Schema(result) diff --git a/src/orcapod/pipeline/graph.py b/src/orcapod/pipeline/graph.py index 611f6f83..755240e2 100644 --- a/src/orcapod/pipeline/graph.py +++ b/src/orcapod/pipeline/graph.py @@ -894,17 +894,9 @@ def _load_function_node( databases=databases, ) + # Determine load status: proxies can only read cached data. if isinstance(pod.packet_function, PacketFunctionProxy): node._load_status = LoadStatus.READ_ONLY - # Override the cached function pod's record path to match - # the stored path from the original pipeline run. - stored_result_path = tuple( - descriptor.get("result_record_path", ()) - ) - if stored_result_path and node._cached_function_pod is not None: - node._cached_function_pod._cache._record_path = ( - stored_result_path - ) else: node._load_status = LoadStatus.FULL diff --git a/src/orcapod/pipeline/serialization.py b/src/orcapod/pipeline/serialization.py index 4db970b6..066cc8ea 100644 --- a/src/orcapod/pipeline/serialization.py +++ b/src/orcapod/pipeline/serialization.py @@ -236,12 +236,12 @@ def resolve_packet_function_from_config( cls = PACKET_FUNCTION_REGISTRY[type_id] try: return cls.from_config(config) - except Exception: + except (ImportError, ModuleNotFoundError, AttributeError) as exc: if fallback_to_proxy: logger.warning( - "Could not reconstruct %s packet function; returning " - "PacketFunctionProxy.", - type_id, + "Could not reconstruct packet function from config (%s); " + "returning PacketFunctionProxy.", + exc, ) from orcapod.core.packet_function_proxy import PacketFunctionProxy @@ -623,3 +623,16 @@ def _build_arrow_primitive_types() -> dict[str, Any]: _ARROW_PRIMITIVE_TYPES: dict[str, Any] = _build_arrow_primitive_types() + + +# --------------------------------------------------------------------------- +# Builtin Python type map for schema deserialization from to_config() format +# --------------------------------------------------------------------------- + +_BUILTIN_TYPE_MAP: dict[str, type] = { + "": int, + "": float, + "": str, + "": bool, + "": bytes, +} diff --git a/tests/test_core/packet_function/test_packet_function_proxy.py b/tests/test_core/packet_function/test_packet_function_proxy.py index 105f6680..c2699328 100644 --- a/tests/test_core/packet_function/test_packet_function_proxy.py +++ b/tests/test_core/packet_function/test_packet_function_proxy.py @@ -41,7 +41,6 @@ def _make_proxy_from_function(pf: PythonPacketFunction) -> PacketFunctionProxy: config = pf.to_config() return PacketFunctionProxy( config=config, - uri=tuple(pf.uri), content_hash_str=pf.content_hash().to_string(), pipeline_hash_str=pf.pipeline_hash().to_string(), ) diff --git a/tests/test_pipeline/test_serialization_helpers.py b/tests/test_pipeline/test_serialization_helpers.py index 91eb7e11..7feee9cb 100644 --- a/tests/test_pipeline/test_serialization_helpers.py +++ b/tests/test_pipeline/test_serialization_helpers.py @@ -319,8 +319,8 @@ def test_resolve_packet_function_fallback_to_proxy(self): "output_keys": ["y"], }, } - # Without fallback, should raise - with pytest.raises(Exception): + # Without fallback, should raise ImportError/ModuleNotFoundError + with pytest.raises((ImportError, ModuleNotFoundError)): resolve_packet_function_from_config(config) # With fallback, should return proxy @@ -366,6 +366,7 @@ def test_function_pod_from_config_fallback_to_proxy(self): "uri": ["some_func", "hash123", "v1", "python.function.v0"], "packet_function": { "packet_function_type_id": "python.function.v0", + "uri": ["some_func", "hash123", "v1", "python.function.v0"], "config": { "module_path": "nonexistent.module", "callable_name": "some_func", @@ -378,16 +379,62 @@ def test_function_pod_from_config_fallback_to_proxy(self): "node_config": None, } - with pytest.raises(Exception): + with pytest.raises((ImportError, ModuleNotFoundError)): FunctionPod.from_config(config) pod = FunctionPod.from_config(config, fallback_to_proxy=True) assert isinstance(pod.packet_function, PacketFunctionProxy) assert pod.packet_function.canonical_function_name == "some_func" - # URI should be injected from parent config + # URI comes from the packet function config assert pod.packet_function.uri == ( "some_func", "hash123", "v1", "python.function.v0", ) + + +# --------------------------------------------------------------------------- +# Schema round-trip consistency +# --------------------------------------------------------------------------- + + +class TestSchemaRoundTripConsistency: + """Verify serialize/deserialize round-trips preserve schema structure and hashes.""" + + def test_schema_round_trip_consistency(self): + """Verify deserialize_schema(serialize_schema(schema)) produces equivalent schemas.""" + from orcapod.contexts import resolve_context + + tc = resolve_context(None).type_converter + + schemas = [ + Schema({"x": int, "y": float, "name": str}), + Schema({"flag": bool, "data": bytes}), + Schema({"items": list[int], "mapping": dict[str, float]}), + ] + for schema in schemas: + serialized = serialize_schema(schema, type_converter=tc) + deserialized = Schema(deserialize_schema(serialized, type_converter=tc)) + assert set(deserialized.keys()) == set(schema.keys()), ( + f"Keys mismatch: {set(schema.keys())} vs {set(deserialized.keys())}" + ) + + def test_schema_round_trip_hash_consistency(self): + """Verify that schema hash is preserved through serialize/deserialize.""" + from orcapod.contexts import resolve_context + + ctx = resolve_context(None) + hasher = ctx.semantic_hasher + tc = ctx.type_converter + + schema = Schema({"age": int, "name": str, "score": float}) + original_hash = hasher.hash_object(schema).to_string() + + serialized = serialize_schema(schema, type_converter=tc) + deserialized = Schema(deserialize_schema(serialized, type_converter=tc)) + round_trip_hash = hasher.hash_object(deserialized).to_string() + + assert original_hash == round_trip_hash, ( + f"Hash diverged: {original_hash} vs {round_trip_hash}" + )