diff --git a/.zed/rules b/.zed/rules index dc83f5e7..ba2385a4 100644 --- a/.zed/rules +++ b/.zed/rules @@ -59,7 +59,8 @@ refactor: 1. Check for an existing issue — search Linear for a corresponding issue. 2. If none exists — ask the developer whether to create one. Do not proceed without either a linked issue or explicit approval to skip. -3. When a new issue is discovered during development (bug, design problem, deferred +3. When starting work on an issue — update its Linear status to "In Progress". +4. When a new issue is discovered during development (bug, design problem, deferred work), create a corresponding Linear issue using the template below. When creating Linear issues, always use this template for the description: @@ -96,6 +97,9 @@ Remove any optional sections that don't apply rather than leaving them empty. When working on a feature, create and checkout a git branch using the gitBranchName returned by the primary Linear issue (e.g. eywalker/plt-911-add-documentation-for-orcapod-python). +Feature branch PRs always target the "dev" branch. The dev → main PR is used +for versioning/releases only. + If a feature branch / PR corresponds to multiple Linear issues, list all of them in the PR description body so that Linear's GitHub integration auto-tracks the PR against each issue. Use the format "Fixes PLT-123" or "Closes PLT-123" (GitHub magic words) for issues diff --git a/CLAUDE.md b/CLAUDE.md index 722c6d2a..e71f6add 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -63,7 +63,8 @@ refactor: 1. **Check for an existing issue** — search Linear for a corresponding issue. 2. **If none exists** — ask the developer whether to create one. Do not proceed without either a linked issue or explicit approval to skip. -3. **When a new issue is discovered** during development (bug, design problem, deferred +3. **When starting work on an issue** — update its Linear status to **In Progress**. +4. **When a new issue is discovered** during development (bug, design problem, deferred work), create a corresponding Linear issue using the template below. When creating Linear issues, always use this template for the description: @@ -102,6 +103,9 @@ Remove any optional sections that don't apply rather than leaving them empty. When working on a feature, create and checkout a git branch using the `gitBranchName` returned by the primary Linear issue (e.g. `eywalker/plt-911-add-documentation-for-orcapod-python`). +**Feature branch PRs always target the `dev` branch.** The `dev` → `main` PR is used +for versioning/releases only. + If a feature branch / PR corresponds to multiple Linear issues, list all of them in the PR description body so that Linear's GitHub integration auto-tracks the PR against each issue. Use the format `Fixes PLT-123` or `Closes PLT-123` (GitHub magic words) for issues diff --git a/DESIGN_ISSUES.md b/DESIGN_ISSUES.md index 9dcab19b..f429ff46 100644 --- a/DESIGN_ISSUES.md +++ b/DESIGN_ISSUES.md @@ -892,6 +892,26 @@ For derived sources (e.g., `DerivedSource`), the stream may not have a meaningfu --- +## `src/orcapod/core/cached_function_pod.py` / `src/orcapod/core/result_cache.py` + +### RC1 — `ResultCache.record_path` not overridable via public API +**Status:** open +**Severity:** medium + +When loading a pipeline with a `PacketFunctionProxy` (PLT-931), the +`CachedFunctionPod`'s `record_path` may differ from the original because the +pod-level URI is recomputed from a deserialized schema (whose semantic hash +may diverge from the original due to the `Python type → Arrow string → Python type` +round-trip). The workaround reaches into private fields: +`node._cached_function_pod._cache._record_path = stored_path`. + +**Fix:** Add a public `override_record_path(path)` method to `CachedFunctionPod` +(or `ResultCache`) that validates and sets the record path. Alternatively, accept +an optional `record_path_override` in `CachedFunctionPod.__init__` that takes +precedence over the computed path when provided. + +--- + ## `src/orcapod/core/nodes/` — Config and context delegation chain ### T2 — `orcapod_config` not on any protocol; delegation chain needs review diff --git a/src/orcapod/core/function_pod.py b/src/orcapod/core/function_pod.py index 5a1db639..7e78cddd 100644 --- a/src/orcapod/core/function_pod.py +++ b/src/orcapod/core/function_pod.py @@ -295,11 +295,18 @@ def to_config(self) -> dict[str, Any]: return config @classmethod - def from_config(cls, config: dict[str, Any]) -> "FunctionPod": + def from_config( + cls, + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, + ) -> "FunctionPod": """Reconstruct a FunctionPod from a config dict. Args: config: A dict as produced by :meth:`to_config`. + fallback_to_proxy: If ``True`` and the packet function cannot be + resolved, use a ``PacketFunctionProxy`` instead of raising. Returns: A new ``FunctionPod`` instance. @@ -307,7 +314,9 @@ def from_config(cls, config: dict[str, Any]) -> "FunctionPod": from orcapod.pipeline.serialization import resolve_packet_function_from_config pf_config = config["packet_function"] - packet_function = resolve_packet_function_from_config(pf_config) + packet_function = resolve_packet_function_from_config( + pf_config, fallback_to_proxy=fallback_to_proxy + ) node_config = None if config.get("node_config") is not None: diff --git a/src/orcapod/core/nodes/function_node.py b/src/orcapod/core/nodes/function_node.py index ca3b198c..2bb8a911 100644 --- a/src/orcapod/core/nodes/function_node.py +++ b/src/orcapod/core/nodes/function_node.py @@ -224,23 +224,30 @@ def from_descriptor( pipeline_path = tuple(descriptor.get("pipeline_path", ())) # Derive pipeline_path_prefix by stripping the suffix that # __init__ appends (packet_function.uri + node hash element). - # We pass the full pipeline_path_prefix from the descriptor. # The descriptor stores the complete pipeline_path; we need # to reconstruct the prefix that was originally passed to # __init__. The suffix added is: pf.uri + (f"node:{hash}",). - # Instead of reverse-engineering, use the descriptor's path - # minus what __init__ will add. For full mode we let __init__ - # recompute pipeline_path from the prefix. pf_uri_len = len(function_pod.packet_function.uri) + 1 # +1 for node:hash prefix = ( pipeline_path[:-pf_uri_len] if len(pipeline_path) > pf_uri_len else () ) + # Derive result_path_prefix from the stored result_record_path + # by stripping the URI suffix that CachedFunctionPod appends. + stored_result_path = tuple( + descriptor.get("result_record_path", ()) + ) + uri_len = len(function_pod.packet_function.uri) + result_prefix: tuple[str, ...] | None = None + if stored_result_path and len(stored_result_path) > uri_len: + result_prefix = stored_result_path[:-uri_len] + node = cls( function_pod=function_pod, input_stream=input_stream, pipeline_database=pipeline_db, result_database=result_db, + result_path_prefix=result_prefix, pipeline_path_prefix=prefix, label=descriptor.get("label"), ) diff --git a/src/orcapod/core/packet_function.py b/src/orcapod/core/packet_function.py index bd26ea11..5ee9afdb 100644 --- a/src/orcapod/core/packet_function.py +++ b/src/orcapod/core/packet_function.py @@ -574,6 +574,7 @@ def to_config(self) -> dict[str, Any]: """ return { "packet_function_type_id": self.packet_function_type_id, + "uri": list(self.uri), "config": { "module_path": self._function.__module__, "callable_name": self._function_name, diff --git a/src/orcapod/core/packet_function_proxy.py b/src/orcapod/core/packet_function_proxy.py new file mode 100644 index 00000000..615b94a9 --- /dev/null +++ b/src/orcapod/core/packet_function_proxy.py @@ -0,0 +1,336 @@ +"""Proxy for unavailable packet functions in deserialized pipelines. + +When a pipeline is loaded in an environment where the original packet +function cannot be imported, ``PacketFunctionProxy`` stands in so that +``FunctionPod``, ``FunctionNode``, and ``CachedFunctionPod`` can still be +constructed and cached data can be accessed. Invoking the proxy raises +``PacketFunctionUnavailableError`` unless a real function has been bound +via :meth:`bind`. +""" + +from __future__ import annotations + +from typing import Any + +from orcapod.core.packet_function import PacketFunctionBase +from orcapod.errors import PacketFunctionUnavailableError +from orcapod.protocols.core_protocols import PacketFunctionProtocol +from orcapod.types import ContentHash, Schema + + +class PacketFunctionProxy(PacketFunctionBase): + """Stand-in for an unavailable packet function. + + Satisfies ``PacketFunctionProtocol`` so pipeline construction succeeds. + All execution methods raise ``PacketFunctionUnavailableError`` until a + real function is attached via :meth:`bind`. + + Args: + config: Serialized packet function config dict (as produced by + ``PythonPacketFunction.to_config()``). + content_hash_str: Optional stored content hash string. + pipeline_hash_str: Optional stored pipeline hash string. + """ + + def __init__( + self, + config: dict[str, Any], + content_hash_str: str | None = None, + pipeline_hash_str: str | None = None, + ) -> None: + inner = config["config"] + self._original_config = config + self._packet_function_type_id = config["packet_function_type_id"] + self._canonical_function_name = inner["callable_name"] + + # Eagerly deserialize schemas. + self._raw_input_schema_dict = inner["input_packet_schema"] + self._raw_output_schema_dict = inner["output_packet_schema"] + self._input_packet_schema = _deserialize_schema_from_config( + self._raw_input_schema_dict + ) + self._output_packet_schema = _deserialize_schema_from_config( + self._raw_output_schema_dict + ) + + # Call super().__init__ so that major_version and + # output_packet_schema_hash are available for URI fallback. + version = inner["version"] + super().__init__(version=version) + + # URI: read from config if present, otherwise compute from metadata. + uri_list = config.get("uri") + if uri_list is not None: + self._stored_uri = tuple(uri_list) + else: + # Fallback: compute from available metadata + # (same structure as PacketFunctionBase.uri). + self._stored_uri = ( + self._canonical_function_name, + self.output_packet_schema_hash, + f"v{self.major_version}", + self._packet_function_type_id, + ) + + # Stored identity hashes + self._stored_content_hash = content_hash_str + self._stored_pipeline_hash = pipeline_hash_str + + # Late-binding slot + self._bound_function: PacketFunctionProtocol | None = None + + # ==================== Identity properties ==================== + + @property + def packet_function_type_id(self) -> str: + """Unique function type identifier.""" + return self._packet_function_type_id + + @property + def canonical_function_name(self) -> str: + """Human-readable function identifier.""" + return self._canonical_function_name + + @property + def input_packet_schema(self) -> Schema: + """Schema describing the input packets this function accepts.""" + return self._input_packet_schema + + @property + def output_packet_schema(self) -> Schema: + """Schema describing the output packets this function produces.""" + return self._output_packet_schema + + @property + def uri(self) -> tuple[str, ...]: + """Return the stored URI, avoiding recomputation via semantic_hasher.""" + return self._stored_uri + + # ==================== Hash overrides ==================== + + def content_hash(self, hasher=None) -> ContentHash: + """Return the stored content hash.""" + if self._stored_content_hash is not None: + return ContentHash.from_string(self._stored_content_hash) + return super().content_hash(hasher) + + def pipeline_hash(self, hasher=None) -> ContentHash: + """Return the stored pipeline hash.""" + if self._stored_pipeline_hash is not None: + return ContentHash.from_string(self._stored_pipeline_hash) + return super().pipeline_hash(hasher) + + # ==================== Execution ==================== + + def _raise_unavailable(self) -> None: + """Raise ``PacketFunctionUnavailableError`` with context.""" + raise PacketFunctionUnavailableError( + f"Packet function '{self._canonical_function_name}' is not available. " + f"Use bind() to attach a real function, or access cached results only." + ) + + def call(self, packet: "PacketProtocol") -> "PacketProtocol | None": + """Process a single packet; delegates to bound function or raises.""" + if self._bound_function is not None: + return self._bound_function.call(packet) + self._raise_unavailable() + + async def async_call(self, packet: "PacketProtocol") -> "PacketProtocol | None": + """Async counterpart of ``call``.""" + if self._bound_function is not None: + return await self._bound_function.async_call(packet) + self._raise_unavailable() + + def direct_call(self, packet: "PacketProtocol") -> "PacketProtocol | None": + """Direct execution; delegates to bound function or raises.""" + if self._bound_function is not None: + return self._bound_function.direct_call(packet) + self._raise_unavailable() + + async def direct_async_call(self, packet: "PacketProtocol") -> "PacketProtocol | None": + """Async direct execution; delegates to bound function or raises.""" + if self._bound_function is not None: + return await self._bound_function.direct_async_call(packet) + self._raise_unavailable() + + # ==================== Variation / execution data ==================== + + def get_function_variation_data(self) -> dict[str, Any]: + """Return function variation data, or empty dict when unbound.""" + if self._bound_function is not None: + return self._bound_function.get_function_variation_data() + return {} + + def get_execution_data(self) -> dict[str, Any]: + """Return execution data, or empty dict when unbound.""" + if self._bound_function is not None: + return self._bound_function.get_execution_data() + return {} + + # ==================== Executor ==================== + + @property + def executor(self) -> "PacketFunctionExecutorProtocol | None": + """Return executor from bound function, or None.""" + if self._bound_function is not None: + return self._bound_function.executor + return None + + @executor.setter + def executor(self, executor: "PacketFunctionExecutorProtocol | None") -> None: + """Set executor on bound function; no-op when unbound.""" + if self._bound_function is not None: + self._bound_function.executor = executor + + # ==================== Serialization ==================== + + def to_config(self) -> dict[str, Any]: + """Return the original config dict.""" + return self._original_config + + @classmethod + def from_config(cls, config: dict[str, Any]) -> PacketFunctionProxy: + """Construct a proxy from a serialized config dict. + + The URI is read from the config's ``"uri"`` field if present; + otherwise a fallback is computed from config metadata. + + Args: + config: A dict as produced by ``PythonPacketFunction.to_config()``. + + Returns: + A new ``PacketFunctionProxy`` instance. + """ + return cls(config=config) + + # ==================== Bind / unbind ==================== + + @property + def is_bound(self) -> bool: + """Return whether a real function is currently bound.""" + return self._bound_function is not None + + def bind(self, packet_function: PacketFunctionProtocol) -> None: + """Attach a real packet function, validating compatibility. + + Checks that the bound function matches this proxy on: + ``canonical_function_name``, ``major_version``, + ``packet_function_type_id``, ``input_packet_schema``, + ``output_packet_schema``, ``uri``, and ``content_hash()`` + (if a stored hash exists). + + Args: + packet_function: The real function to bind. + + Raises: + ValueError: If any compatibility check fails, listing all + mismatches. + """ + mismatches: list[str] = [] + + if packet_function.canonical_function_name != self._canonical_function_name: + mismatches.append( + f"canonical_function_name: expected {self._canonical_function_name!r}, " + f"got {packet_function.canonical_function_name!r}" + ) + + if packet_function.major_version != self.major_version: + mismatches.append( + f"major_version: expected {self.major_version!r}, " + f"got {packet_function.major_version!r}" + ) + + if packet_function.packet_function_type_id != self._packet_function_type_id: + mismatches.append( + f"packet_function_type_id: expected {self._packet_function_type_id!r}, " + f"got {packet_function.packet_function_type_id!r}" + ) + + if packet_function.input_packet_schema != self._input_packet_schema: + # Also compare via serialized form in case type repr differs + bound_input = { + k: str(v) for k, v in packet_function.input_packet_schema.items() + } + if bound_input != self._raw_input_schema_dict: + mismatches.append( + f"input_packet_schema: expected {self._input_packet_schema!r}, " + f"got {packet_function.input_packet_schema!r}" + ) + + if packet_function.output_packet_schema != self._output_packet_schema: + bound_output = { + k: str(v) for k, v in packet_function.output_packet_schema.items() + } + if bound_output != self._raw_output_schema_dict: + mismatches.append( + f"output_packet_schema: expected {self._output_packet_schema!r}, " + f"got {packet_function.output_packet_schema!r}" + ) + + if tuple(packet_function.uri) != tuple(self._stored_uri): + mismatches.append( + f"uri: expected {self._stored_uri!r}, " + f"got {tuple(packet_function.uri)!r}" + ) + + if self._stored_content_hash is not None: + bound_hash = packet_function.content_hash().to_string() + if bound_hash != self._stored_content_hash: + mismatches.append( + f"content_hash: expected {self._stored_content_hash!r}, " + f"got {bound_hash!r}" + ) + + if mismatches: + raise ValueError( + f"Cannot bind packet function: {', '.join(mismatches)}" + ) + + self._bound_function = packet_function + + def unbind(self) -> None: + """Detach the bound function, reverting to proxy mode.""" + self._bound_function = None + + +def _deserialize_schema_from_config(schema_dict: dict[str, str]) -> Schema: + """Reconstruct a Schema from a to_config() schema dict. + + Handles both ``str(python_type)`` format (e.g. ``""``), + used by ``PythonPacketFunction.to_config()``, and Arrow type string + format (e.g. ``"int64"``), used by the serialization module. + + Unrecognized type strings are coerced to ``object``. + + Args: + schema_dict: Dict mapping field names to type strings. + + Returns: + A Schema with Python types reconstructed from the strings. + """ + from orcapod.pipeline.serialization import ( + _BUILTIN_TYPE_MAP, + deserialize_schema, + ) + + result: dict[str, Any] = {} + needs_arrow_fallback: list[str] = [] + + for name, type_str in schema_dict.items(): + if type_str in _BUILTIN_TYPE_MAP: + result[name] = _BUILTIN_TYPE_MAP[type_str] + else: + needs_arrow_fallback.append(name) + + if needs_arrow_fallback: + arrow_result = deserialize_schema( + {k: schema_dict[k] for k in needs_arrow_fallback} + ) + for k, v in arrow_result.items(): + # If deserialize_schema fell back to a raw string, coerce to object + if isinstance(v, str): + result[k] = object + else: + result[k] = v + + return Schema(result) diff --git a/src/orcapod/errors.py b/src/orcapod/errors.py index 9d1c05cf..c7a0e3d3 100644 --- a/src/orcapod/errors.py +++ b/src/orcapod/errors.py @@ -11,6 +11,15 @@ class DuplicateTagError(ValueError): pass +class PacketFunctionUnavailableError(RuntimeError): + """Raised when a packet function proxy is invoked without a bound function. + + This occurs when a pipeline is loaded in an environment where the + original packet function is not available. Only cached results can + be accessed. + """ + + class FieldNotResolvableError(LookupError): """ Raised when a source cannot resolve a field value for a given record ID. diff --git a/src/orcapod/pipeline/graph.py b/src/orcapod/pipeline/graph.py index c5a9595e..755240e2 100644 --- a/src/orcapod/pipeline/graph.py +++ b/src/orcapod/pipeline/graph.py @@ -722,7 +722,8 @@ def load(cls, path: str | Path, mode: str = "full") -> "Pipeline": upstream_usable = ( upstream_node is not None and hasattr(upstream_node, "load_status") - and upstream_node.load_status == LoadStatus.FULL + and upstream_node.load_status + in (LoadStatus.FULL, LoadStatus.READ_ONLY) ) # Build databases dict @@ -743,7 +744,9 @@ def load(cls, path: str | Path, mode: str = "full") -> "Pipeline": # Check if all upstreams are usable all_upstreams_usable = ( all( - hasattr(n, "load_status") and n.load_status == LoadStatus.FULL + hasattr(n, "load_status") + and n.load_status + in (LoadStatus.FULL, LoadStatus.READ_ONLY) for n in upstream_nodes ) if upstream_nodes @@ -857,40 +860,59 @@ def _load_function_node( ) -> FunctionNode: """Reconstruct a FunctionNode from a descriptor. + When the upstream is usable and mode is not ``"read_only"``, attempts + to reconstruct the function pod with ``fallback_to_proxy=True`` so + that a ``PacketFunctionProxy`` is used when the original function + cannot be imported. Nodes backed by a proxy get + ``LoadStatus.READ_ONLY`` — they can read cached results but cannot + compute new ones. + Args: descriptor: The serialized node descriptor. mode: Load mode. upstream_node: The reconstructed upstream node, or ``None``. - upstream_usable: Whether the upstream is in FULL mode. + upstream_usable: Whether the upstream is usable (FULL or + READ_ONLY). databases: Database role mapping. Returns: A ``FunctionNode`` instance. """ from orcapod.core.function_pod import FunctionPod + from orcapod.core.packet_function_proxy import PacketFunctionProxy + from orcapod.pipeline.serialization import LoadStatus + + if mode != "read_only" and upstream_usable: + try: + pod = FunctionPod.from_config( + descriptor["function_pod"], fallback_to_proxy=True + ) + node = FunctionNode.from_descriptor( + descriptor, + function_pod=pod, + input_stream=upstream_node, + databases=databases, + ) + + # Determine load status: proxies can only read cached data. + if isinstance(pod.packet_function, PacketFunctionProxy): + node._load_status = LoadStatus.READ_ONLY + else: + node._load_status = LoadStatus.FULL - if mode == "full": - if not upstream_usable: + return node + except Exception: logger.warning( - "Upstream for function node %r is not usable, " + "Failed to reconstruct function node %r, " "falling back to read-only.", descriptor.get("label"), ) - else: - try: - pod = FunctionPod.from_config(descriptor["function_pod"]) - return FunctionNode.from_descriptor( - descriptor, - function_pod=pod, - input_stream=upstream_node, - databases=databases, - ) - except Exception: - logger.warning( - "Failed to reconstruct function node %r, " - "falling back to read-only.", - descriptor.get("label"), - ) + elif mode != "read_only" and not upstream_usable: + logger.warning( + "Upstream for function node %r is not usable, " + "falling back to read-only.", + descriptor.get("label"), + ) return FunctionNode.from_descriptor( descriptor, diff --git a/src/orcapod/pipeline/serialization.py b/src/orcapod/pipeline/serialization.py index 99ea59f6..066cc8ea 100644 --- a/src/orcapod/pipeline/serialization.py +++ b/src/orcapod/pipeline/serialization.py @@ -199,28 +199,54 @@ def resolve_operator_from_config(config: dict[str, Any]) -> Any: return cls.from_config(config) -def resolve_packet_function_from_config(config: dict[str, Any]) -> Any: +def resolve_packet_function_from_config( + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, +) -> Any: """Reconstruct a packet function from a config dict. Args: config: Dict with at least a ``"packet_function_type_id"`` key matching a registered packet function type. + fallback_to_proxy: If ``True`` and reconstruction fails (unknown type, + ``ImportError``, ``AttributeError``, etc.), return a + ``PacketFunctionProxy`` instead of raising. Returns: - A new packet function instance constructed from the config. + A new packet function instance constructed from the config, or a + ``PacketFunctionProxy`` if reconstruction fails and + *fallback_to_proxy* is ``True``. Raises: - ValueError: If the type ID is missing or unknown. + ValueError: If the type ID is missing or unknown and + *fallback_to_proxy* is ``False``. """ _ensure_registries() type_id = config.get("packet_function_type_id") if type_id not in PACKET_FUNCTION_REGISTRY: + if fallback_to_proxy: + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + return PacketFunctionProxy.from_config(config) raise ValueError( f"Unknown packet function type: {type_id!r}. " f"Known types: {sorted(PACKET_FUNCTION_REGISTRY.keys())}" ) cls = PACKET_FUNCTION_REGISTRY[type_id] - return cls.from_config(config) + try: + return cls.from_config(config) + except (ImportError, ModuleNotFoundError, AttributeError) as exc: + if fallback_to_proxy: + logger.warning( + "Could not reconstruct packet function from config (%s); " + "returning PacketFunctionProxy.", + exc, + ) + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + return PacketFunctionProxy.from_config(config) + raise def resolve_source_from_config( @@ -597,3 +623,16 @@ def _build_arrow_primitive_types() -> dict[str, Any]: _ARROW_PRIMITIVE_TYPES: dict[str, Any] = _build_arrow_primitive_types() + + +# --------------------------------------------------------------------------- +# Builtin Python type map for schema deserialization from to_config() format +# --------------------------------------------------------------------------- + +_BUILTIN_TYPE_MAP: dict[str, type] = { + "": int, + "": float, + "": str, + "": bool, + "": bytes, +} diff --git a/superpowers/plans/2026-03-15-packet-function-proxy-plan.md b/superpowers/plans/2026-03-15-packet-function-proxy-plan.md new file mode 100644 index 00000000..967d3c01 --- /dev/null +++ b/superpowers/plans/2026-03-15-packet-function-proxy-plan.md @@ -0,0 +1,1507 @@ +# PacketFunctionProxy Implementation Plan + +> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Enable cached data access from loaded pipelines when the original packet function is unavailable, by introducing `PacketFunctionProxy`. + +**Architecture:** `PacketFunctionProxy` implements `PacketFunctionProtocol` using stored metadata from the serialized config. It plugs into the existing `FunctionPod` → `FunctionNode` → `CachedFunctionPod` chain unchanged, so cached DB reads work. Invocation raises `PacketFunctionUnavailableError`. Late binding via `bind()` allows attaching a real function with identity validation. + +**Tech Stack:** Python, PyArrow, orcapod internals (PacketFunctionBase, FunctionPod, FunctionNode, CachedFunctionPod, Pipeline serialization) + +**Spec:** `superpowers/specs/2026-03-15-stub-packet-function-design.md` + +--- + +## File Structure + +| File | Action | Responsibility | +|------|--------|----------------| +| `src/orcapod/errors.py` | Modify | Add `PacketFunctionUnavailableError` | +| `src/orcapod/core/packet_function_proxy.py` | Create | `PacketFunctionProxy` class | +| `src/orcapod/pipeline/serialization.py` | Modify | `fallback_to_proxy` in `resolve_packet_function_from_config` | +| `src/orcapod/core/function_pod.py` | Modify | `fallback_to_proxy` in `FunctionPod.from_config` | +| `src/orcapod/pipeline/graph.py` | Modify | `_load_function_node` proxy support; upstream usability relaxation | +| `tests/test_core/packet_function/test_packet_function_proxy.py` | Create | Proxy unit tests | +| `tests/test_pipeline/test_serialization.py` | Modify | Pipeline load integration tests | + +--- + +## Chunk 1: PacketFunctionProxy Core + +### Task 1: Add `PacketFunctionUnavailableError` + +**Files:** +- Modify: `src/orcapod/errors.py` + +- [ ] **Step 1: Add the error class** + +Add at the end of `src/orcapod/errors.py`: + +```python +class PacketFunctionUnavailableError(RuntimeError): + """Raised when a packet function proxy is invoked without a bound function. + + This occurs when a pipeline is loaded in an environment where the + original packet function is not available. Only cached results can + be accessed. + """ +``` + +- [ ] **Step 2: Verify import works** + +Run: `uv run python -c "from orcapod.errors import PacketFunctionUnavailableError; print('OK')"` +Expected: `OK` + +- [ ] **Step 3: Commit** + +```bash +git add src/orcapod/errors.py +git commit -m "feat(errors): add PacketFunctionUnavailableError" +``` + +--- + +### Task 2: Create `PacketFunctionProxy` — construction and metadata + +**Files:** +- Create: `src/orcapod/core/packet_function_proxy.py` +- Create: `tests/test_core/packet_function/test_packet_function_proxy.py` + +**Context:** `PacketFunctionProxy` subclasses `PacketFunctionBase` (from `src/orcapod/core/packet_function.py`). It needs to: +- Accept a config dict (same format as `PythonPacketFunction.to_config()` output) +- Extract version from `config["config"]["version"]` and pass to `super().__init__(version=...)` +- Eagerly deserialize schemas via `deserialize_schema()` from `pipeline.serialization` +- Store the original config, pre-computed hashes, and a `_uri` override +- Override `uri` property to return stored value (avoiding semantic hasher recomputation) +- Override `content_hash()` and `pipeline_hash()` to return stored values + +The config format from `PythonPacketFunction.to_config()` is: +```python +{ + "packet_function_type_id": "python.function.v0", + "config": { + "module_path": "some.module", + "callable_name": "my_func", + "version": "v0.1", + "input_packet_schema": {"age": "int64", "name": "large_string"}, + "output_packet_schema": {"result": "float64"}, + "output_keys": ["result"], + }, +} +``` + +The `uri` comes from the parent `FunctionPod.to_config()` which wraps the above: +```python +{ + "uri": ["my_func", "", "v0", "python.function.v0"], + "packet_function": { ... the above ... }, + "node_config": None, +} +``` + +So the proxy receives the `packet_function` sub-dict as `config`, and receives +`uri` separately (or from the parent `function_pod` config in `_load_function_node`). + +- [ ] **Step 1: Write failing tests for construction and metadata** + +Create `tests/test_core/packet_function/test_packet_function_proxy.py`: + +```python +"""Tests for PacketFunctionProxy.""" + +from __future__ import annotations + +import pytest + +from orcapod.core.packet_function import PythonPacketFunction +from orcapod.core.packet_function_proxy import PacketFunctionProxy +from orcapod.errors import PacketFunctionUnavailableError +from orcapod.types import ContentHash, Schema + + +def _make_sample_function(): + """Create a simple packet function for testing.""" + + def double_age(age: int) -> int: + return age * 2 + + return PythonPacketFunction( + double_age, + output_keys="doubled_age", + version="v1.0", + ) + + +def _make_proxy_from_function(pf: PythonPacketFunction) -> PacketFunctionProxy: + """Create a proxy from a live packet function's config.""" + config = pf.to_config() + return PacketFunctionProxy( + config=config, + uri=tuple(pf.uri), + content_hash_str=pf.content_hash().to_string(), + pipeline_hash_str=pf.pipeline_hash().to_string(), + ) + + +class TestPacketFunctionProxyConstruction: + def test_construction_from_config(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy is not None + + def test_canonical_function_name(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.canonical_function_name == pf.canonical_function_name + + def test_major_version(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.major_version == pf.major_version + + def test_minor_version_string(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.minor_version_string == pf.minor_version_string + + def test_packet_function_type_id(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.packet_function_type_id == pf.packet_function_type_id + + def test_input_packet_schema(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert dict(proxy.input_packet_schema) == dict(pf.input_packet_schema) + + def test_output_packet_schema(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert dict(proxy.output_packet_schema) == dict(pf.output_packet_schema) + + def test_uri_returns_stored_value(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.uri == pf.uri + + def test_content_hash_returns_stored_value(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.content_hash().to_string() == pf.content_hash().to_string() + + def test_pipeline_hash_returns_stored_value(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.pipeline_hash().to_string() == pf.pipeline_hash().to_string() + + def test_to_config_returns_original(self): + pf = _make_sample_function() + config = pf.to_config() + proxy = PacketFunctionProxy( + config=config, + uri=tuple(pf.uri), + content_hash_str=pf.content_hash().to_string(), + pipeline_hash_str=pf.pipeline_hash().to_string(), + ) + assert proxy.to_config() == config + + def test_executor_returns_none(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.executor is None + + def test_executor_setter_is_noop(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.executor = None # should not raise + assert proxy.executor is None + + def test_is_bound_initially_false(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.is_bound is False +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py -v` +Expected: All FAIL (ModuleNotFoundError for `packet_function_proxy`) + +- [ ] **Step 3: Implement `PacketFunctionProxy`** + +Create `src/orcapod/core/packet_function_proxy.py`: + +```python +"""PacketFunctionProxy — stands in for an unavailable packet function.""" + +from __future__ import annotations + +from typing import Any + +from orcapod.core.packet_function import PacketFunctionBase +from orcapod.errors import PacketFunctionUnavailableError +from orcapod.protocols.core_protocols import ( + PacketFunctionExecutorProtocol, + PacketFunctionProtocol, + PacketProtocol, +) +from orcapod.types import ContentHash, Schema + + +class PacketFunctionProxy(PacketFunctionBase): + """Proxy for a packet function that is not available in this environment. + + Carries all identifying metadata (schemas, URI, hashes) from the + serialized config so that ``FunctionPod``, ``FunctionNode``, and + ``CachedFunctionPod`` can be fully constructed. Invocation methods + raise ``PacketFunctionUnavailableError`` unless a real function has + been bound via ``bind()``. + + Args: + config: The packet function config dict as produced by + ``PythonPacketFunction.to_config()``. + uri: The original URI tuple from the ``FunctionPod.to_config()`` + output. Stored directly and returned from the ``uri`` + property to avoid semantic hasher recomputation. + content_hash_str: Pre-computed content hash string from the + pipeline descriptor. + pipeline_hash_str: Pre-computed pipeline hash string from the + pipeline descriptor. + """ + + def __init__( + self, + config: dict[str, Any], + uri: tuple[str, ...], + content_hash_str: str | None = None, + pipeline_hash_str: str | None = None, + ) -> None: + self._original_config = config + inner = config.get("config", config) + + # Extract version for PacketFunctionBase.__init__ + version = inner.get("version", "v0.0") + + super().__init__(version=version) + + # Store identity metadata + self._proxy_type_id = config.get( + "packet_function_type_id", "python.function.v0" + ) + self._proxy_function_name = inner.get("callable_name", "unknown") + self._stored_uri = uri + self._stored_content_hash = content_hash_str + self._stored_pipeline_hash = pipeline_hash_str + + # Eagerly deserialize schemas + from orcapod.pipeline.serialization import deserialize_schema + + raw_input = inner.get("input_packet_schema", {}) + raw_output = inner.get("output_packet_schema", {}) + self._input_schema = Schema(deserialize_schema(raw_input)) + self._output_schema = Schema(deserialize_schema(raw_output)) + + # Late binding state + self._bound_function: PacketFunctionProtocol | None = None + + # ==================== Identity & Metadata ==================== + + @property + def packet_function_type_id(self) -> str: + return self._proxy_type_id + + @property + def canonical_function_name(self) -> str: + return self._proxy_function_name + + @property + def input_packet_schema(self) -> Schema: + return self._input_schema + + @property + def output_packet_schema(self) -> Schema: + return self._output_schema + + @property + def uri(self) -> tuple[str, ...]: + """Return the stored original URI, avoiding recomputation.""" + return self._stored_uri + + # ==================== Hashing ==================== + + def content_hash(self, hasher=None) -> ContentHash: + """Return the stored content hash.""" + if self._stored_content_hash is not None: + return ContentHash.from_string(self._stored_content_hash) + return super().content_hash(hasher) + + def pipeline_hash(self, hasher=None) -> ContentHash: + """Return the stored pipeline hash.""" + if self._stored_pipeline_hash is not None: + return ContentHash.from_string(self._stored_pipeline_hash) + return super().pipeline_hash(hasher) + + # ==================== Variation / Execution Data ==================== + + def get_function_variation_data(self) -> dict[str, Any]: + if self._bound_function is not None: + return self._bound_function.get_function_variation_data() + return {} + + def get_execution_data(self) -> dict[str, Any]: + if self._bound_function is not None: + return self._bound_function.get_execution_data() + return {} + + # ==================== Executor ==================== + + @property + def executor(self) -> PacketFunctionExecutorProtocol | None: + if self._bound_function is not None: + return self._bound_function.executor + return None + + @executor.setter + def executor(self, executor: PacketFunctionExecutorProtocol | None) -> None: + if self._bound_function is not None: + self._bound_function.executor = executor + # No-op when unbound — avoids breaking _apply_execution_engine + + # ==================== Invocation ==================== + + def _raise_unavailable(self) -> None: + raise PacketFunctionUnavailableError( + f"Packet function '{self._proxy_function_name}' is not available " + f"in this environment. Only cached results can be accessed." + ) + + def call(self, packet: PacketProtocol) -> PacketProtocol | None: + if self._bound_function is not None: + return self._bound_function.call(packet) + self._raise_unavailable() + + async def async_call(self, packet: PacketProtocol) -> PacketProtocol | None: + if self._bound_function is not None: + return await self._bound_function.async_call(packet) + self._raise_unavailable() + + def direct_call(self, packet: PacketProtocol) -> PacketProtocol | None: + if self._bound_function is not None: + return self._bound_function.direct_call(packet) + self._raise_unavailable() + + async def direct_async_call( + self, packet: PacketProtocol + ) -> PacketProtocol | None: + if self._bound_function is not None: + return await self._bound_function.direct_async_call(packet) + self._raise_unavailable() + + # ==================== Serialization ==================== + + def to_config(self) -> dict[str, Any]: + """Return the original config dict (round-trip preservation).""" + return self._original_config + + @classmethod + def from_config(cls, config: dict[str, Any]) -> "PacketFunctionProxy": + """Construct a proxy from a packet function config dict. + + Args: + config: A dict as produced by ``PythonPacketFunction.to_config()``. + + Returns: + A new ``PacketFunctionProxy`` instance. + """ + # URI not available from packet_function config alone — caller + # must provide it separately via the constructor. from_config + # builds a proxy without URI/hash overrides. + inner = config.get("config", config) + # Build a placeholder URI from available metadata + type_id = config.get("packet_function_type_id", "python.function.v0") + name = inner.get("callable_name", "unknown") + version = inner.get("version", "v0.0") + import re + + match = re.match(r"\D*(\d+)\.(.*)", version) + major = int(match.group(1)) if match else 0 + uri = (name, "", f"v{major}", type_id) + + return cls(config=config, uri=uri) + + # ==================== Binding ==================== + + @property + def is_bound(self) -> bool: + """Return whether a real packet function has been bound.""" + return self._bound_function is not None + + def bind(self, packet_function: PacketFunctionProtocol) -> None: + """Bind a real packet function to this proxy. + + Validates that the provided function matches the proxy's stored + identity before accepting it. + + Args: + packet_function: The real packet function to bind. + + Raises: + ValueError: If the provided function's identifying information + does not match the proxy's stored identity. + """ + mismatches: list[str] = [] + + if packet_function.canonical_function_name != self.canonical_function_name: + mismatches.append( + f"canonical_function_name: expected {self.canonical_function_name!r}, " + f"got {packet_function.canonical_function_name!r}" + ) + if packet_function.major_version != self.major_version: + mismatches.append( + f"major_version: expected {self.major_version}, " + f"got {packet_function.major_version}" + ) + if packet_function.packet_function_type_id != self.packet_function_type_id: + mismatches.append( + f"packet_function_type_id: expected {self.packet_function_type_id!r}, " + f"got {packet_function.packet_function_type_id!r}" + ) + if dict(packet_function.input_packet_schema) != dict(self.input_packet_schema): + mismatches.append( + f"input_packet_schema: expected {dict(self.input_packet_schema)}, " + f"got {dict(packet_function.input_packet_schema)}" + ) + if dict(packet_function.output_packet_schema) != dict( + self.output_packet_schema + ): + mismatches.append( + f"output_packet_schema: expected {dict(self.output_packet_schema)}, " + f"got {dict(packet_function.output_packet_schema)}" + ) + if packet_function.uri != self.uri: + mismatches.append( + f"uri: expected {self.uri}, got {packet_function.uri}" + ) + if ( + self._stored_content_hash is not None + and packet_function.content_hash().to_string() + != self._stored_content_hash + ): + mismatches.append( + f"content_hash: expected {self._stored_content_hash}, " + f"got {packet_function.content_hash().to_string()}" + ) + + if mismatches: + raise ValueError( + f"Cannot bind packet function: identity mismatch.\n" + + "\n".join(f" - {m}" for m in mismatches) + ) + + self._bound_function = packet_function + + def unbind(self) -> None: + """Remove the bound packet function, reverting to proxy-only mode.""" + self._bound_function = None +``` + +**Important note about `call()` and `async_call()`:** These are inherited from +`PacketFunctionBase` which routes through `direct_call()` / `direct_async_call()` +(via the executor if set, or directly). Since the proxy overrides `direct_call()` +and `direct_async_call()` to raise `PacketFunctionUnavailableError` (when unbound) +or delegate (when bound), the inherited `call()` / `async_call()` will do the +right thing automatically. Check `PacketFunctionBase.call()` implementation to +confirm this routing — if it calls `self.direct_call()`, we're good. If it has +its own logic, we may need to override `call()` too. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py -v` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/core/packet_function_proxy.py tests/test_core/packet_function/test_packet_function_proxy.py +git commit -m "feat(core): add PacketFunctionProxy with metadata and identity" +``` + +--- + +### Task 3: Test invocation behavior (unbound raises, bound delegates) + +**Files:** +- Modify: `tests/test_core/packet_function/test_packet_function_proxy.py` + +- [ ] **Step 1: Write failing tests for invocation** + +Append to the test file: + +```python +class TestPacketFunctionProxyInvocation: + def test_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + # Create a minimal packet to pass + from orcapod.core.datagrams.tag_packet import Packet + + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError, match="double_age"): + proxy.call(packet) + + @pytest.mark.asyncio + async def test_async_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + from orcapod.core.datagrams.tag_packet import Packet + + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError, match="double_age"): + await proxy.async_call(packet) + + def test_direct_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + from orcapod.core.datagrams.tag_packet import Packet + + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError): + proxy.direct_call(packet) + + def test_variation_data_empty_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.get_function_variation_data() == {} + + def test_execution_data_empty_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.get_execution_data() == {} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py::TestPacketFunctionProxyInvocation -v` +Expected: FAIL (depending on whether `call()` routes through `direct_call()`) + +- [ ] **Step 3: Fix any test issues** + +The `call()`, `async_call()`, `direct_call()`, and `direct_async_call()` +overrides are already in the Task 2 implementation. They raise +`PacketFunctionUnavailableError` when unbound and delegate when bound. +If tests fail, check that the error message includes the function name. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py -v` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add tests/test_core/packet_function/test_packet_function_proxy.py src/orcapod/core/packet_function_proxy.py +git commit -m "test(proxy): add invocation behavior tests for PacketFunctionProxy" +``` + +--- + +### Task 4: Test `bind()`, `unbind()`, and identity mismatch rejection + +**Files:** +- Modify: `tests/test_core/packet_function/test_packet_function_proxy.py` + +- [ ] **Step 1: Write failing tests for bind/unbind** + +Append to the test file: + +```python +class TestPacketFunctionProxyBinding: + def test_bind_succeeds_with_matching_function(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.is_bound is True + + def test_call_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + from orcapod.core.datagrams.tag_packet import Packet + + packet = Packet({"age": 25}) + result = proxy.call(packet) + assert result is not None + assert result.as_dict()["doubled_age"] == 50 + + def test_variation_data_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.get_function_variation_data() == pf.get_function_variation_data() + + def test_execution_data_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.get_execution_data() == pf.get_execution_data() + + def test_unbind_reverts_to_proxy_mode(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.is_bound is True + proxy.unbind() + assert proxy.is_bound is False + from orcapod.core.datagrams.tag_packet import Packet + + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError): + proxy.call(packet) + + def test_bind_rejects_mismatched_function_name(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def triple_age(age: int) -> int: + return age * 3 + + other_pf = PythonPacketFunction( + triple_age, output_keys="tripled_age", version="v1.0" + ) + with pytest.raises(ValueError, match="canonical_function_name"): + proxy.bind(other_pf) + + def test_bind_rejects_mismatched_version(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def double_age(age: int) -> int: + return age * 2 + + other_pf = PythonPacketFunction( + double_age, output_keys="doubled_age", version="v2.0" + ) + with pytest.raises(ValueError, match="major_version"): + proxy.bind(other_pf) + + def test_bind_rejects_mismatched_output_schema(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def double_age(age: int) -> str: + return str(age * 2) + + other_pf = PythonPacketFunction( + double_age, output_keys="doubled_age", version="v1.0" + ) + with pytest.raises(ValueError, match="output_packet_schema"): + proxy.bind(other_pf) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py::TestPacketFunctionProxyBinding -v` +Expected: FAIL + +- [ ] **Step 3: Fix any issues in implementation** + +The `bind()` and `unbind()` implementations are already in the Task 2 code. +If `call()` delegation doesn't work, ensure the bound function's `call()` is +used (not `direct_call()`). + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py -v` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add tests/test_core/packet_function/test_packet_function_proxy.py src/orcapod/core/packet_function_proxy.py +git commit -m "test(proxy): add bind/unbind and identity validation tests" +``` + +--- + +### Task 5: Test `FunctionPod` with proxy + +**Files:** +- Modify: `tests/test_core/packet_function/test_packet_function_proxy.py` + +- [ ] **Step 1: Write failing tests for FunctionPod integration** + +Append to the test file: + +```python +class TestFunctionPodWithProxy: + def test_function_pod_constructs_with_proxy(self): + from orcapod.core.function_pod import FunctionPod + + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + assert pod is not None + assert pod.packet_function is proxy + + def test_function_pod_output_schema(self): + from orcapod.core.function_pod import FunctionPod + from orcapod.core.sources.dict_source import DictSource + + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + source = DictSource({"age": [10, 20, 30]}) + tag_schema, packet_schema = pod.output_schema(source) + assert "doubled_age" in packet_schema + + def test_function_pod_process_packet_raises(self): + from orcapod.core.datagrams.tag_packet import Packet, Tag + from orcapod.core.function_pod import FunctionPod + + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + tag = Tag({}) + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError): + pod.process_packet(tag, packet) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py::TestFunctionPodWithProxy -v` +Expected: FAIL + +- [ ] **Step 3: Fix any issues** + +The `FunctionPod` constructor calls `_FunctionPodBase.uri` which accesses +`self.packet_function.output_packet_schema`. Since the proxy's `uri` is +overridden, we need to check if `_FunctionPodBase.uri` is even called during +construction. If `_output_schema_hash` is set lazily (only when `uri` is +accessed), construction should work. If `CachedFunctionPod` init triggers +`uri` access, we need to ensure the proxy's override takes precedence. + +**Key insight:** `_FunctionPodBase.uri` is on the *pod*, not the packet +function. The pod's `uri` calls `self.packet_function.canonical_function_name`, +`self.data_context.semantic_hasher.hash_object(self.packet_function.output_packet_schema)`, +etc. But the *proxy's* `uri` property (on `PacketFunctionBase`) is a different +property. The pod accesses `self.packet_function.uri` only in +`identity_structure()` and `pipeline_identity_structure()`, which return +`self.packet_function.identity_structure()` → `self.uri` on the proxy. So the +proxy's `uri` override IS used for identity/hashing, which is correct. + +The pod's own `uri` (on `_FunctionPodBase`) is used for `CachedFunctionPod` +record path. This pod-level `uri` computes `semantic_hasher.hash_object( +self.packet_function.output_packet_schema)`. Since the proxy's +`output_packet_schema` is eagerly deserialized, this should work — but the +resulting hash may differ from the original. This is why the spec says to store +the URI and use it. However, the pod-level `uri` is separate from the proxy's +`uri`. + +If the pod-level `uri` hash differs, `CachedFunctionPod` will look in the wrong +DB path. **Solution**: In `_load_function_node`, after constructing the +`FunctionNode`, override `_cached_function_pod._cache.record_path` with the +stored `result_record_path` from the descriptor. OR store the +`_output_schema_hash` on the FunctionPod from the stored URI. + +This is a subtlety the implementation will need to handle. The test will reveal +if it's a problem. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function_proxy.py -v` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add tests/test_core/packet_function/test_packet_function_proxy.py src/orcapod/core/packet_function_proxy.py +git commit -m "test(proxy): add FunctionPod integration tests" +``` + +--- + +## Chunk 2: Serialization and Pipeline Loading + +### Task 6: Add `fallback_to_proxy` to serialization resolver + +**Files:** +- Modify: `src/orcapod/pipeline/serialization.py` +- Modify: `tests/test_pipeline/test_serialization_helpers.py` + +- [ ] **Step 1: Write failing test for fallback** + +Add to `tests/test_pipeline/test_serialization_helpers.py`: + +```python +def test_resolve_packet_function_fallback_to_proxy(): + """When the module can't be imported, fallback_to_proxy returns a proxy.""" + from orcapod.core.packet_function_proxy import PacketFunctionProxy + from orcapod.pipeline.serialization import resolve_packet_function_from_config + + config = { + "packet_function_type_id": "python.function.v0", + "config": { + "module_path": "nonexistent.module.that.does.not.exist", + "callable_name": "some_func", + "version": "v1.0", + "input_packet_schema": {"x": "int64"}, + "output_packet_schema": {"y": "float64"}, + "output_keys": ["y"], + }, + } + + # Without fallback, should raise + with pytest.raises(Exception): + resolve_packet_function_from_config(config) + + # With fallback, should return proxy + result = resolve_packet_function_from_config(config, fallback_to_proxy=True) + assert isinstance(result, PacketFunctionProxy) + assert result.canonical_function_name == "some_func" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py::test_resolve_packet_function_fallback_to_proxy -v` +Expected: FAIL (TypeError — `fallback_to_proxy` not recognized) + +- [ ] **Step 3: Implement fallback in `resolve_packet_function_from_config`** + +In `src/orcapod/pipeline/serialization.py`, modify `resolve_packet_function_from_config`: + +```python +def resolve_packet_function_from_config( + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, +) -> Any: + """Reconstruct a packet function from a config dict. + + Args: + config: Dict with at least a ``"packet_function_type_id"`` key matching + a registered packet function type. + fallback_to_proxy: If ``True`` and reconstruction fails, return a + ``PacketFunctionProxy`` preserving identity from the config. + + Returns: + A new packet function instance constructed from the config. + + Raises: + ValueError: If the type ID is missing or unknown (and fallback is off). + """ + _ensure_registries() + type_id = config.get("packet_function_type_id") + if type_id not in PACKET_FUNCTION_REGISTRY: + if fallback_to_proxy: + return _packet_function_proxy_from_config(config) + raise ValueError( + f"Unknown packet function type: {type_id!r}. " + f"Known types: {sorted(PACKET_FUNCTION_REGISTRY.keys())}" + ) + cls = PACKET_FUNCTION_REGISTRY[type_id] + try: + return cls.from_config(config) + except Exception: + if fallback_to_proxy: + logger.warning( + "Could not reconstruct packet function from config; " + "returning PacketFunctionProxy." + ) + return _packet_function_proxy_from_config(config) + raise + + +def _packet_function_proxy_from_config(config: dict[str, Any]) -> Any: + """Create a ``PacketFunctionProxy`` from a packet function config. + + Args: + config: Packet function config dict. + + Returns: + A ``PacketFunctionProxy`` preserving the original identity. + """ + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + return PacketFunctionProxy.from_config(config) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py::test_resolve_packet_function_fallback_to_proxy -v` +Expected: PASS + +- [ ] **Step 5: Run full serialization helper tests** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py -v` +Expected: All PASS (no regressions) + +- [ ] **Step 6: Commit** + +```bash +git add src/orcapod/pipeline/serialization.py tests/test_pipeline/test_serialization_helpers.py +git commit -m "feat(serialization): add fallback_to_proxy for packet function resolution" +``` + +--- + +### Task 7: Add `fallback_to_proxy` to `FunctionPod.from_config` + +**Files:** +- Modify: `src/orcapod/core/function_pod.py` + +- [ ] **Step 1: Write failing test** + +Add to `tests/test_pipeline/test_serialization_helpers.py`: + +```python +def test_function_pod_from_config_fallback_to_proxy(): + """FunctionPod.from_config with fallback_to_proxy returns pod with proxy.""" + from orcapod.core.function_pod import FunctionPod + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + config = { + "uri": ["some_func", "hash123", "v1", "python.function.v0"], + "packet_function": { + "packet_function_type_id": "python.function.v0", + "config": { + "module_path": "nonexistent.module", + "callable_name": "some_func", + "version": "v1.0", + "input_packet_schema": {"x": "int64"}, + "output_packet_schema": {"y": "float64"}, + "output_keys": ["y"], + }, + }, + "node_config": None, + } + + # Without fallback, should raise + with pytest.raises(Exception): + FunctionPod.from_config(config) + + # With fallback, should return FunctionPod with proxy + pod = FunctionPod.from_config(config, fallback_to_proxy=True) + assert isinstance(pod.packet_function, PacketFunctionProxy) + assert pod.packet_function.canonical_function_name == "some_func" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py::test_function_pod_from_config_fallback_to_proxy -v` +Expected: FAIL + +- [ ] **Step 3: Modify `FunctionPod.from_config`** + +In `src/orcapod/core/function_pod.py`, change `FunctionPod.from_config` (around line 297): + +```python +@classmethod +def from_config( + cls, + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, +) -> "FunctionPod": + """Reconstruct a FunctionPod from a config dict. + + Args: + config: A dict as produced by :meth:`to_config`. + fallback_to_proxy: If ``True`` and the packet function cannot be + reconstructed, use a ``PacketFunctionProxy`` instead. + + Returns: + A new ``FunctionPod`` instance. + """ + from orcapod.pipeline.serialization import resolve_packet_function_from_config + + pf_config = config["packet_function"] + + if fallback_to_proxy: + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + packet_function = resolve_packet_function_from_config( + pf_config, fallback_to_proxy=True + ) + # If we got a proxy, inject the URI from the parent config + if isinstance(packet_function, PacketFunctionProxy): + uri_list = config.get("uri") + if uri_list is not None: + packet_function._stored_uri = tuple(uri_list) + else: + packet_function = resolve_packet_function_from_config(pf_config) + + node_config = None + if config.get("node_config") is not None: + node_config = NodeConfig(**config["node_config"]) + + return cls(packet_function=packet_function, node_config=node_config) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py::test_function_pod_from_config_fallback_to_proxy -v` +Expected: PASS + +- [ ] **Step 5: Run full test suite for regressions** + +Run: `uv run pytest tests/test_pipeline/test_serialization_helpers.py -v` +Expected: All PASS + +- [ ] **Step 6: Commit** + +```bash +git add src/orcapod/core/function_pod.py tests/test_pipeline/test_serialization_helpers.py +git commit -m "feat(function_pod): add fallback_to_proxy to FunctionPod.from_config" +``` + +--- + +### Task 8: Update `Pipeline.load` — upstream usability and function node loading + +**Files:** +- Modify: `src/orcapod/pipeline/graph.py` + +This is the integration point. Two changes: + +1. Relax upstream usability check to accept `READ_ONLY` +2. Modify `_load_function_node` to use `fallback_to_proxy=True` and set `load_status` + +- [ ] **Step 1: Modify upstream usability check in `Pipeline.load`** + +In `src/orcapod/pipeline/graph.py`, find the upstream usability check for function +nodes (around line 722-726): + +Change: +```python +upstream_usable = ( + upstream_node is not None + and hasattr(upstream_node, "load_status") + and upstream_node.load_status == LoadStatus.FULL +) +``` + +To: +```python +upstream_usable = ( + upstream_node is not None + and hasattr(upstream_node, "load_status") + and upstream_node.load_status + in (LoadStatus.FULL, LoadStatus.READ_ONLY) +) +``` + +Do the same for the operator node `all_upstreams_usable` check (around line 744-751): + +Change: +```python +all_upstreams_usable = ( + all( + hasattr(n, "load_status") and n.load_status == LoadStatus.FULL + for n in upstream_nodes + ) + if upstream_nodes + else False +) +``` + +To: +```python +all_upstreams_usable = ( + all( + hasattr(n, "load_status") + and n.load_status in (LoadStatus.FULL, LoadStatus.READ_ONLY) + for n in upstream_nodes + ) + if upstream_nodes + else False +) +``` + +- [ ] **Step 2: Modify `_load_function_node` to use proxy fallback** + +In `src/orcapod/pipeline/graph.py`, replace `_load_function_node` (around line 851-900): + +```python +@staticmethod +def _load_function_node( + descriptor: dict[str, Any], + mode: str, + upstream_node: Any | None, + upstream_usable: bool, + databases: dict[str, Any], +) -> FunctionNode: + """Reconstruct a FunctionNode from a descriptor. + + Args: + descriptor: The serialized node descriptor. + mode: Load mode. + upstream_node: The reconstructed upstream node, or ``None``. + upstream_usable: Whether the upstream can provide data. + databases: Database role mapping. + + Returns: + A ``FunctionNode`` instance. + """ + from orcapod.core.function_pod import FunctionPod + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + if mode != "read_only" and upstream_usable: + try: + pod = FunctionPod.from_config( + descriptor["function_pod"], fallback_to_proxy=True + ) + node = FunctionNode.from_descriptor( + descriptor, + function_pod=pod, + input_stream=upstream_node, + databases=databases, + ) + # Set load_status based on whether packet function is a proxy + if isinstance(pod.packet_function, PacketFunctionProxy): + node._load_status = LoadStatus.READ_ONLY + # Override CachedFunctionPod's record_path with the stored + # value from the descriptor. The pod-level URI computation + # (which hashes output_packet_schema via semantic_hasher) + # may produce a different hash after schema round-trip, + # causing DB lookups to miss. The descriptor stores the + # original record_path that was used when data was written. + stored_result_path = tuple( + descriptor.get("result_record_path", ()) + ) + if stored_result_path and node._cached_function_pod is not None: + # NOTE: This reaches into private fields. Logged as a + # design issue — CachedFunctionPod / ResultCache should + # expose a public API for record_path override. + node._cached_function_pod._cache._record_path = ( + stored_result_path + ) + else: + node._load_status = LoadStatus.FULL + return node + except Exception: + logger.warning( + "Failed to reconstruct function node %r, " + "falling back to read-only.", + descriptor.get("label"), + ) + + # Fall through: upstream not usable, read_only mode, or reconstruction failed + if not upstream_usable and mode != "read_only": + logger.warning( + "Upstream for function node %r is not usable, " + "falling back to read-only.", + descriptor.get("label"), + ) + + return FunctionNode.from_descriptor( + descriptor, + function_pod=None, + input_stream=None, + databases=databases, + ) +``` + +- [ ] **Step 3: Run existing serialization tests to check for regressions** + +Run: `uv run pytest tests/test_pipeline/test_serialization.py -v` +Expected: All PASS (existing behavior preserved) + +- [ ] **Step 4: Commit** + +```bash +git add src/orcapod/pipeline/graph.py +git commit -m "feat(pipeline): use PacketFunctionProxy in load, relax upstream usability" +``` + +--- + +### Task 9: Integration test — pipeline load with unavailable function + +**Files:** +- Modify: `tests/test_pipeline/test_serialization.py` + +- [ ] **Step 1: Write integration test** + +Add to `tests/test_pipeline/test_serialization.py`: + +```python +class TestPipelineLoadWithUnavailableFunction: + """Tests for loading a pipeline when the packet function is not importable.""" + + def _build_and_save_pipeline(self, tmp_path): + """Build a pipeline with a function pod, run it, and save to JSON.""" + from orcapod.core.function_pod import FunctionPod, function_pod + from orcapod.core.sources.dict_source import DictSource + from orcapod.databases.delta_lake_databases import DeltaTableDatabase + from orcapod.pipeline import Pipeline + + db_path = str(tmp_path / "pipeline_db") + db = DeltaTableDatabase(db_path) + + source = DictSource( + {"name": ["alice", "bob"], "age": [30, 25]}, + tag_columns=["name"], + ) + + @function_pod(output_keys="doubled_age", version="v1.0") + def double_age(age: int) -> int: + return age * 2 + + with Pipeline("test_pipeline", pipeline_database=db) as p: + stream = source + stream = double_age.pod(stream) + + p.run() + + save_path = str(tmp_path / "pipeline.json") + p.save(save_path) + return save_path, db_path + + def test_load_with_unavailable_function_has_read_only_status(self, tmp_path): + """Function node should be READ_ONLY when function can't be loaded.""" + import json + + from orcapod.pipeline import LoadStatus, Pipeline + + save_path, _ = self._build_and_save_pipeline(tmp_path) + + # Corrupt the module path to simulate unavailable function + with open(save_path) as f: + data = json.load(f) + for node in data["nodes"].values(): + if node.get("node_type") == "function": + pf_config = node["function_pod"]["packet_function"]["config"] + pf_config["module_path"] = "nonexistent.module.path" + with open(save_path, "w") as f: + json.dump(data, f) + + loaded = Pipeline.load(save_path) + + # Find the function node + fn_node = None + for node in loaded.compiled_nodes.values(): + if node.node_type == "function": + fn_node = node + break + + assert fn_node is not None + assert fn_node.load_status == LoadStatus.READ_ONLY + + def test_load_with_unavailable_function_can_get_all_records(self, tmp_path): + """get_all_records() should return cached data.""" + import json + + from orcapod.pipeline import Pipeline + + save_path, _ = self._build_and_save_pipeline(tmp_path) + + with open(save_path) as f: + data = json.load(f) + for node in data["nodes"].values(): + if node.get("node_type") == "function": + pf_config = node["function_pod"]["packet_function"]["config"] + pf_config["module_path"] = "nonexistent.module.path" + with open(save_path, "w") as f: + json.dump(data, f) + + loaded = Pipeline.load(save_path) + fn_node = None + for node in loaded.compiled_nodes.values(): + if node.node_type == "function": + fn_node = node + break + + records = fn_node.get_all_records() + assert records is not None + assert records.num_rows == 2 + + def test_load_with_unavailable_function_iter_packets_yields_cached( + self, tmp_path + ): + """iter_packets() should yield cached results from Phase 1.""" + import json + + from orcapod.pipeline import Pipeline + + save_path, _ = self._build_and_save_pipeline(tmp_path) + + with open(save_path) as f: + data = json.load(f) + for node in data["nodes"].values(): + if node.get("node_type") == "function": + pf_config = node["function_pod"]["packet_function"]["config"] + pf_config["module_path"] = "nonexistent.module.path" + with open(save_path, "w") as f: + json.dump(data, f) + + loaded = Pipeline.load(save_path) + fn_node = None + for node in loaded.compiled_nodes.values(): + if node.node_type == "function": + fn_node = node + break + + packets = list(fn_node.iter_packets()) + assert len(packets) == 2 + # Verify data content + values = sorted(p.as_dict()["doubled_age"] for _, p in packets) + assert values == [50, 60] +``` + +- [ ] **Step 2: Run integration tests** + +Run: `uv run pytest tests/test_pipeline/test_serialization.py::TestPipelineLoadWithUnavailableFunction -v` +Expected: Tests may FAIL if there are issues with the pod-level `uri` hash +divergence (see Task 5 notes). Debug and fix as needed. + +- [ ] **Step 3: Fix any issues discovered** + +The most likely issue: `CachedFunctionPod.record_path` computed from +`_FunctionPodBase.uri` (which hashes `output_packet_schema`) may differ from +the original `result_record_path` stored in the descriptor. If so, after +constructing the `FunctionNode` in `_load_function_node`, override the record +path: + +```python +# In _load_function_node, after constructing the node with proxy: +if isinstance(pod.packet_function, PacketFunctionProxy): + stored_result_path = tuple(descriptor.get("result_record_path", ())) + if stored_result_path and node._cached_function_pod is not None: + node._cached_function_pod._cache._record_path = stored_result_path +``` + +Check if `ResultCache` has `_record_path` as a settable attribute or if +`record_path` is a property. Adjust accordingly. + +- [ ] **Step 4: Run integration tests again** + +Run: `uv run pytest tests/test_pipeline/test_serialization.py::TestPipelineLoadWithUnavailableFunction -v` +Expected: All PASS + +- [ ] **Step 5: Run full serialization test suite** + +Run: `uv run pytest tests/test_pipeline/test_serialization.py -v` +Expected: All PASS (no regressions) + +- [ ] **Step 6: Commit** + +```bash +git add tests/test_pipeline/test_serialization.py src/orcapod/pipeline/graph.py +git commit -m "test(pipeline): add integration tests for loading with unavailable function" +``` + +--- + +### Task 10: Integration test — downstream computation from cached data + +**Files:** +- Modify: `tests/test_pipeline/test_serialization.py` + +- [ ] **Step 1: Write downstream operator test** + +Add to `TestPipelineLoadWithUnavailableFunction`: + +```python + def test_downstream_operator_computes_from_cached(self, tmp_path): + """An operator downstream of a proxy-backed node should work.""" + import json + + from orcapod.core.operators import SelectPacketColumns + from orcapod.core.function_pod import function_pod + from orcapod.core.sources.dict_source import DictSource + from orcapod.databases.delta_lake_databases import DeltaTableDatabase + from orcapod.pipeline import LoadStatus, Pipeline + + db_path = str(tmp_path / "pipeline_db") + db = DeltaTableDatabase(db_path) + + source = DictSource( + {"name": ["alice", "bob"], "age": [30, 25]}, + tag_columns=["name"], + ) + + @function_pod(output_keys=("doubled_age", "original_age"), version="v1.0") + def transform(age: int) -> tuple[int, int]: + return age * 2, age + + select_op = SelectPacketColumns(columns=["doubled_age"]) + + with Pipeline("test_downstream", pipeline_database=db) as p: + stream = source + stream = transform.pod(stream) + stream = select_op(stream) + + p.run() + + save_path = str(tmp_path / "pipeline.json") + p.save(save_path) + + # Corrupt function module path + with open(save_path) as f: + data = json.load(f) + for node in data["nodes"].values(): + if node.get("node_type") == "function": + pf_config = node["function_pod"]["packet_function"]["config"] + pf_config["module_path"] = "nonexistent.module.path" + with open(save_path, "w") as f: + json.dump(data, f) + + loaded = Pipeline.load(save_path) + + # Find the operator node + op_node = None + fn_node = None + for node in loaded.compiled_nodes.values(): + if node.node_type == "operator": + op_node = node + elif node.node_type == "function": + fn_node = node + + assert fn_node is not None + assert fn_node.load_status == LoadStatus.READ_ONLY + + assert op_node is not None + assert op_node.load_status == LoadStatus.FULL + + # The operator should be able to compute from cached data + table = op_node.as_table() + assert table is not None + assert "doubled_age" in table.column_names + assert table.num_rows == 2 +``` + +- [ ] **Step 2: Run test** + +Run: `uv run pytest tests/test_pipeline/test_serialization.py::TestPipelineLoadWithUnavailableFunction::test_downstream_operator_computes_from_cached -v` +Expected: PASS + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_pipeline/test_serialization.py +git commit -m "test(pipeline): add downstream operator test for cached data flow" +``` + +--- + +### Task 11: Run full test suite and final cleanup + +**Files:** +- All modified files + +- [ ] **Step 1: Run the full test suite** + +Run: `uv run pytest tests/ -v --timeout=120` +Expected: All PASS + +- [ ] **Step 2: Fix any failures** + +Address any test failures discovered. + +- [ ] **Step 3: Final commit if any fixes were needed** + +```bash +git add -u +git commit -m "fix: address test failures from PacketFunctionProxy integration" +``` diff --git a/superpowers/specs/2026-03-15-stub-packet-function-design.md b/superpowers/specs/2026-03-15-stub-packet-function-design.md new file mode 100644 index 00000000..9992d463 --- /dev/null +++ b/superpowers/specs/2026-03-15-stub-packet-function-design.md @@ -0,0 +1,458 @@ +# PLT-931: PacketFunctionProxy for Read-Only Pipeline Loading + +## Problem + +When a pipeline is saved to JSON and loaded in an environment where the original +packet function (Python callable) is unavailable, `FunctionPod.from_config()` fails +and the loading code falls back to constructing a `FunctionNode` in read-only mode +with `function_pod=None`. This causes: + +1. **`iter_packets()`** crashes — calls `_input_stream.iter_packets()` on `None` +2. **`get_all_records()`** returns `None` — short-circuits because + `_cached_function_pod is None`, even though the DB and record paths are known +3. **`as_table()`** crashes — delegates to `iter_packets()` +4. **Downstream nodes** are marked read-only because upstream `load_status != FULL`, + blocking all downstream computation even for operators and function pods whose + functions *are* available + +The root cause: `FunctionNode.from_descriptor()` bypasses `__init__` entirely when +`function_pod=None`, leaving all internal state as `None`. Every code path that +touches the function pod, packet function, input stream, or cached function pod +fails. + +## Solution: PacketFunctionProxy + +Introduce a `PacketFunctionProxy` (consistent with the existing `SourceProxy` +naming pattern) that carries all metadata of the original packet function +(schemas, identity, URI) but raises a clear error when invoked. This allows the +full `FunctionNode.__init__` → `attach_databases()` → `CachedFunctionPod` path +to work, so cached data access flows through existing code paths unchanged. + +The proxy also supports **late binding**: a real packet function can be bound +to the proxy after loading via `bind()`. The proxy wraps (not replaces) the +bound function, preserving the fact that the function was not loaded originally. + +### Design principles + +- **No special-casing in FunctionNode**: The proxy satisfies + `PacketFunctionProtocol`, so `FunctionNode.__init__`, `CachedFunctionPod`, + `output_schema()`, `keys()`, `pipeline_path`, and `get_all_records()` all + work without modification. +- **Clear failure at point of invocation**: `call()` / `async_call()` / + `direct_call()` / `direct_async_call()` raise `PacketFunctionUnavailableError` + with a message naming the function and explaining only cached results are + available — unless a real function has been bound via `bind()`. +- **Correct identity**: The proxy reproduces the same `content_hash()` and + `pipeline_hash()` as the original packet function, so DB path scoping and + record lookups work correctly. +- **Downstream nodes can compute**: Because the `FunctionNode` with a proxy is + fully constructed (with DB-backed `CachedFunctionPod`), it can serve cached + data via `iter_packets()` Phase 1. Downstream operators and function pods + (whose functions are available) can run in full mode on that data. +- **Late binding preserves provenance**: `bind()` wraps the bound function + inside the proxy rather than replacing the proxy, so the proxy remains in the + call chain. This preserves the information that the function was not loaded + from the serialized config — useful for provenance tracking and debugging. +- **Binding validates identity**: `bind()` verifies that the provided packet + function matches the proxy's stored identity (URI, schemas, content hash) + before accepting it. If any identifying information mismatches, `bind()` + raises `ValueError` with a clear message describing which fields differ. + This prevents silently substituting a different function for the original. + +## Components + +### 1. `PacketFunctionUnavailableError` (in `errors.py`) + +```python +class PacketFunctionUnavailableError(RuntimeError): + """Raised when a packet function proxy is invoked without a bound function. + + This occurs when a pipeline is loaded in an environment where the + original packet function is not available. Only cached results can + be accessed. + """ +``` + +Subclasses `RuntimeError` so existing `except RuntimeError` handlers still catch +it, while allowing callers to catch this specific condition. + +### 2. `PacketFunctionProxy` (new file: `src/orcapod/core/packet_function_proxy.py`) + +A concrete implementation of `PacketFunctionProtocol` that stands in for an +unavailable packet function, preserving all identifying metadata and supporting +optional late binding of a real function. + +#### Metadata and identity + +- **Stores metadata from the serialized config**: `packet_function_type_id`, + `canonical_function_name`, `major_version`, `minor_version_string`, + `input_packet_schema`, `output_packet_schema` +- **Stores the original `uri` tuple**: Extracted from + `config["uri"]` (the `FunctionPod.to_config()` output includes it). + The `uri` property is overridden to return this stored value directly, + avoiding any recomputation via `semantic_hasher.hash_object()`. This is + critical because the schema round-trip (`Python type → Arrow string → + Python type`) may not produce identical hash inputs, which would cause + `CachedFunctionPod` to look in the wrong DB path. +- **Stores pre-computed hashes**: `content_hash` and `pipeline_hash` strings + from the pipeline descriptor, overriding `content_hash()` and + `pipeline_hash()` to return them directly (no recomputation needed) +- **`to_config()`** returns the original config dict (round-trip preservation), + regardless of whether a function has been bound +- **`from_config()`** class method constructs from the same config dict that + `PythonPacketFunction.to_config()` produces, extracting schemas and metadata + without importing the callable + +#### Invocation behavior + +When **unbound** (no real function has been bound via `bind()`): +- `call()`, `async_call()`, `direct_call()`, `direct_async_call()` all raise + `PacketFunctionUnavailableError` with a message like: + *"Packet function 'my_func' is not available in this environment. + Only cached results can be accessed."* +- `get_function_variation_data()` and `get_execution_data()` return empty dicts. + Note: these are only called by `CachedFunctionPod.process_packet()` when + *storing* a new result. Since the proxy raises + `PacketFunctionUnavailableError` before the store call is reached, the empty + dicts are never actually persisted. +- `executor` property returns `None`; setter is a **no-op** (silently ignored). + This avoids conflicts with code paths like `Pipeline._apply_execution_engine` + that iterate all function nodes and set executors — raising here would break + pipeline loading unnecessarily. + +When **bound** (a real function has been attached via `bind()`): +- `call()`, `async_call()`, `direct_call()`, `direct_async_call()` delegate + to the bound function +- `get_function_variation_data()` and `get_execution_data()` delegate to the + bound function +- `executor` property and setter delegate to the bound function + +#### Constructor signature + +```python +class PacketFunctionProxy(PacketFunctionBase): + def __init__( + self, + config: dict[str, Any], + content_hash_str: str | None = None, + pipeline_hash_str: str | None = None, + ) -> None: +``` + +The `config` parameter is the `packet_function` config dict from the serialized +pipeline (same format as `PythonPacketFunction.to_config()` output). The hash +strings come from the node descriptor. + +**Version string handling**: `PacketFunctionBase.__init__` requires a `version` +string matching the regex `\D*(\d+)\.(.*)`. The proxy extracts this from +`config["config"]["version"]` (the same field `PythonPacketFunction.to_config()` +writes) and passes it to `super().__init__(version=...)`. + +#### Schema reconstruction + +The config contains `input_packet_schema` and `output_packet_schema` as +`dict[str, str]` (Arrow type strings). The proxy uses +`deserialize_schema()` from `pipeline.serialization` to reconstruct them +as `Schema` objects with Python types. **This must happen eagerly during +`__init__`** (not lazily), because `CachedFunctionPod` accesses +`output_packet_schema` immediately during its initialization chain via +`WrappedFunctionPod.uri → _FunctionPodBase.uri → semantic_hasher.hash_object( +self.packet_function.output_packet_schema)`. + +However, since the proxy **overrides the `uri` property** to return the stored +original URI, the `semantic_hasher.hash_object()` call on the schema never +actually happens for the proxy. The schemas are still reconstructed eagerly +because `FunctionNode.__init__` calls `schema_utils.check_schema_compatibility()` +on input schema validation. + +#### `bind()` — late binding of a real packet function + +```python +def bind(self, packet_function: PacketFunctionProtocol) -> None: + """Bind a real packet function to this proxy. + + Validates that the provided function matches the proxy's stored + identity before accepting it. After binding, invocation methods + delegate to the bound function. + + Args: + packet_function: The real packet function to bind. + + Raises: + ValueError: If the provided function's identifying information + does not match the proxy's stored identity. + """ +``` + +**Validation checks** (all must pass): +- `canonical_function_name` matches +- `major_version` matches +- `packet_function_type_id` matches +- `input_packet_schema` matches +- `output_packet_schema` matches +- `uri` matches +- `content_hash()` matches (if the proxy has a stored content hash) + +If any check fails, `bind()` raises `ValueError` with a message listing +the mismatched fields and their expected vs actual values. + +The bound function is stored as `self._bound_function` (initially `None`). +The `is_bound` property exposes whether a function has been bound. + +#### `unbind()` — reverting to proxy-only mode + +```python +def unbind(self) -> None: + """Remove the bound packet function, reverting to proxy-only mode.""" +``` + +Sets `self._bound_function = None`. After unbinding, invocation methods +raise `PacketFunctionUnavailableError` again. Included for API consistency +with `SourceProxy`. + +**Why wrap, not replace**: The proxy remains in the call chain even after +binding. This preserves the fact that the function was not loaded from the +serialized config — useful for provenance tracking, debugging, and +distinguishing "loaded from config" vs "bound after load" in downstream +logic. + +### 3. Changes to `resolve_packet_function_from_config` (in `pipeline/serialization.py`) + +Add a `fallback_to_proxy` parameter (default `False`): + +```python +def resolve_packet_function_from_config( + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, +) -> PacketFunctionProtocol: +``` + +When `fallback_to_proxy=True` and the normal resolution fails (ImportError, +AttributeError, etc.), return a `PacketFunctionProxy` constructed from the config +instead of re-raising. + +### 4. Changes to `FunctionPod.from_config` (in `function_pod.py`) + +Add a `fallback_to_proxy` parameter that passes through to +`resolve_packet_function_from_config`: + +```python +@classmethod +def from_config( + cls, + config: dict[str, Any], + *, + fallback_to_proxy: bool = False, +) -> "FunctionPod": +``` + +When the packet function resolves to a proxy, `FunctionPod` is constructed +normally — the proxy satisfies `PacketFunctionProtocol`. + +### 5. Changes to `Pipeline._load_function_node` (in `pipeline/graph.py`) + +The current logic: + +```python +if mode == "full": + if not upstream_usable: + # fall back to read-only + else: + try: + pod = FunctionPod.from_config(descriptor["function_pod"]) + return FunctionNode.from_descriptor( + descriptor, function_pod=pod, input_stream=upstream_node, + databases=databases, + ) + except Exception: + # fall back to read-only +``` + +Changes: +1. Call `FunctionPod.from_config(..., fallback_to_proxy=True)` so it never + raises — it returns a FunctionPod with either a live or proxy packet function. +2. Determine `load_status` based on whether the packet function is a proxy. +3. Always construct `FunctionNode` via the normal `from_descriptor` full-mode + path (with `function_pod` and `input_stream`), so `__init__` runs and + `CachedFunctionPod` is created. + +### 6. Changes to upstream usability check (in `Pipeline.load`) + +Currently in `Pipeline.load()`, the upstream usability check requires +`LoadStatus.FULL`: + +```python +upstream_usable = ( + upstream_node is not None + and hasattr(upstream_node, "load_status") + and upstream_node.load_status == LoadStatus.FULL +) +``` + +Change to also accept `READ_ONLY`: + +```python +upstream_usable = ( + upstream_node is not None + and hasattr(upstream_node, "load_status") + and upstream_node.load_status in (LoadStatus.FULL, LoadStatus.READ_ONLY) +) +``` + +This allows downstream operators and function pods to compute on cached data +from upstream proxy-backed nodes. + +The same change applies to the `all_upstreams_usable` check for operator nodes. + +### 7. `FunctionNode.from_descriptor` and `load_status` + +With the proxy approach, `from_descriptor` receives a real `function_pod` +(containing a proxy) and a real `input_stream` (upstream node). It takes +the normal full-mode `__init__` path. + +After construction, `load_status` is set based on whether the packet function +is a proxy: + +```python +from orcapod.core.packet_function_proxy import PacketFunctionProxy + +if isinstance(pod.packet_function, PacketFunctionProxy): + node._load_status = LoadStatus.READ_ONLY +else: + node._load_status = LoadStatus.FULL +``` + +This is set in `_load_function_node` after the node is constructed. + +### 8. `FunctionNode.iter_packets()` behavior with proxy + +With a proxy-backed node: + +- **Phase 1** (cached results from DB): Works unchanged — `CachedFunctionPod` + reads from the result DB, pipeline DB join produces (tag, packet) pairs. +- **Phase 2** (compute missing packets): If any input packets are not in the + cache, `_process_packet_internal` calls `CachedFunctionPod.process_packet()`, + which calls `self._function_pod.process_packet(tag, packet)`, which calls + `proxy.call(packet)` → raises `PacketFunctionUnavailableError`. + +This is the correct behavior: cached data flows, new computation fails clearly. + +### 9. Simplification of `FunctionNode.from_descriptor` read-only path + +The current read-only path (lines 251–307) with `cls.__new__` and manual +attribute assignment becomes dead code for pipeline loading, since all loaded +function nodes now go through the normal `__init__` path (with either a live or +proxy packet function). + +However, we should keep the read-only path for the case where the DB is also +unavailable (e.g., in-memory DB from original run). In that case, `load_status` +remains `UNAVAILABLE` and the node truly cannot serve any data. + +The read-only `__new__` path is only used when `pipeline_db` is `None` or when +the DB type is in-memory (which warns and provides no cached data). + +## Data flow summary + +``` +Pipeline.load() + │ + ├─ Source node: reconstructed or proxy (unchanged) + │ + ├─ Function node (function available): + │ └─ FunctionPod(PythonPacketFunction) → FunctionNode (FULL) + │ + ├─ Function node (function unavailable, DB available): + │ └─ FunctionPod(PacketFunctionProxy) → FunctionNode (READ_ONLY) + │ ├─ iter_packets() Phase 1: serves cached data ✓ + │ ├─ iter_packets() Phase 2: raises PacketFunctionUnavailableError ✗ + │ ├─ get_all_records(): works ✓ + │ └─ as_table(): works (from Phase 1 cache) ✓ + │ + ├─ Function node (function unavailable, DB unavailable): + │ └─ from_descriptor read-only path → FunctionNode (UNAVAILABLE) + │ + └─ Downstream operator/function pod: + ├─ upstream FULL or READ_ONLY → construct in FULL mode + └─ upstream UNAVAILABLE → construct in READ_ONLY/UNAVAILABLE mode +``` + +## Files changed + +| File | Change | +|------|--------| +| `src/orcapod/errors.py` | Add `PacketFunctionUnavailableError` | +| `src/orcapod/core/packet_function_proxy.py` | New: `PacketFunctionProxy` class | +| `src/orcapod/pipeline/serialization.py` | `resolve_packet_function_from_config`: add `fallback_to_proxy` param | +| `src/orcapod/core/function_pod.py` | `FunctionPod.from_config`: add `fallback_to_proxy` param | +| `src/orcapod/pipeline/graph.py` | `_load_function_node`: use proxy fallback; `load()`: relax upstream usability check | +| `src/orcapod/core/nodes/function_node.py` | `from_descriptor`: set `load_status` based on proxy detection | + +## Test plan + +1. **PacketFunctionProxy unit tests** + - Construction from a `PythonPacketFunction.to_config()` output + - Correct `input_packet_schema`, `output_packet_schema` + - `uri` returns stored original value (not recomputed) + - `content_hash()` and `pipeline_hash()` return stored values + - `call()` raises `PacketFunctionUnavailableError` + - `async_call()` raises `PacketFunctionUnavailableError` + - `to_config()` round-trips (returns original config) + - `executor` returns `None`; setter is no-op (no error) + +2. **FunctionPod with proxy** + - `FunctionPod(PacketFunctionProxy(...))` constructs without error + - `output_schema()` returns correct schemas + - `process_packet()` raises `PacketFunctionUnavailableError` + +3. **Pipeline load with unavailable function** + - Save a pipeline with a function pod, load in environment where function + is not importable + - Function node has `load_status == READ_ONLY` + - `get_all_records()` returns cached data + - `iter_packets()` yields cached results (Phase 1) + - `as_table()` returns cached data as table + - Attempting to process a new (uncached) packet raises + `PacketFunctionUnavailableError` + +4. **Downstream computation from cached data** + - Save a pipeline: source → function_pod → operator + - Load with function unavailable but operator available + - Function node is `READ_ONLY`, operator node is `FULL` + - Operator can process cached data from function node + - End-to-end: `operator_node.as_table()` returns correct results + +5. **Downstream function pod with available function** + - Save a pipeline: source → function_pod_A → function_pod_B + - Load with function_A unavailable, function_B available + - function_pod_A node is `READ_ONLY`, function_pod_B node is `FULL` + - function_pod_B can compute on cached output from function_pod_A + +6. **`bind()` — successful late binding** + - Create a `PacketFunctionProxy` from a config, then `bind()` the + matching `PythonPacketFunction` + - `is_bound` returns `True` + - `call()` delegates to the bound function (no error) + - `executor` getter/setter delegate to the bound function + +7. **`bind()` — identity mismatch rejection** + - Create a `PacketFunctionProxy`, attempt `bind()` with a function + that has a different `canonical_function_name` → `ValueError` + - Same for mismatched `major_version`, `input_packet_schema`, + `output_packet_schema`, `uri`, `content_hash` + - Error message lists which fields differ + +8. **`unbind()` — reverting to proxy-only mode** + - Bind a function, then `unbind()` + - `is_bound` returns `False` + - `call()` raises `PacketFunctionUnavailableError` again + +9. **Schema round-trip fidelity** + - Verify that `deserialize_schema(serialize_schema(schema))` produces + schemas that are functionally equivalent for all supported types + - This guards against type-converter round-trip divergence + +10. **Fully unavailable (no DB)** + - Load pipeline with in-memory DB → function node is `UNAVAILABLE` + - `iter_packets()` / `as_table()` raise `RuntimeError` + - Downstream nodes are also `UNAVAILABLE` or `READ_ONLY` diff --git a/tests/test_core/packet_function/test_packet_function_proxy.py b/tests/test_core/packet_function/test_packet_function_proxy.py new file mode 100644 index 00000000..c2699328 --- /dev/null +++ b/tests/test_core/packet_function/test_packet_function_proxy.py @@ -0,0 +1,236 @@ +"""Tests for PacketFunctionProxy invocation, bind, and unbind behavior.""" + +import pytest + +from orcapod.core.datagrams.tag_packet import Packet, Tag +from orcapod.core.function_pod import FunctionPod +from orcapod.core.packet_function import PythonPacketFunction +from orcapod.core.packet_function_proxy import PacketFunctionProxy +from orcapod.core.sources.dict_source import DictSource +from orcapod.errors import PacketFunctionUnavailableError + + +# ==================== Task 2: Construction tests ==================== + + +class TestPacketFunctionProxyConstruction: + """Tests for proxy construction and executor property.""" + + def test_executor_returns_none(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.executor is None + + def test_executor_setter_is_noop(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.executor = None # should not raise + + +# ==================== Helpers ==================== + + +def _make_sample_function() -> PythonPacketFunction: + def double_age(age: int) -> int: + return age * 2 + + return PythonPacketFunction(double_age, output_keys="doubled_age", version="v1.0") + + +def _make_proxy_from_function(pf: PythonPacketFunction) -> PacketFunctionProxy: + config = pf.to_config() + return PacketFunctionProxy( + config=config, + content_hash_str=pf.content_hash().to_string(), + pipeline_hash_str=pf.pipeline_hash().to_string(), + ) + + +# ==================== Task 3: Invocation tests ==================== + + +class TestPacketFunctionProxyInvocation: + """Tests for proxy behavior when no function is bound.""" + + def test_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + packet = Packet({"age": 25}) + with pytest.raises( + PacketFunctionUnavailableError, match="double_age" + ): + proxy.call(packet) + + @pytest.mark.asyncio + async def test_async_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + packet = Packet({"age": 25}) + with pytest.raises( + PacketFunctionUnavailableError, match="double_age" + ): + await proxy.async_call(packet) + + def test_direct_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + packet = Packet({"age": 25}) + with pytest.raises( + PacketFunctionUnavailableError, match="double_age" + ): + proxy.direct_call(packet) + + @pytest.mark.asyncio + async def test_direct_async_call_raises_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + packet = Packet({"age": 25}) + with pytest.raises( + PacketFunctionUnavailableError, match="double_age" + ): + await proxy.direct_async_call(packet) + + def test_variation_data_empty_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.get_function_variation_data() == {} + + def test_execution_data_empty_when_unbound(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + assert proxy.get_execution_data() == {} + + +# ==================== Task 4: Bind/unbind tests ==================== + + +class TestPacketFunctionProxyBinding: + """Tests for bind/unbind and identity mismatch detection.""" + + def test_bind_succeeds_with_matching_function(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.is_bound + + def test_call_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + packet = Packet({"age": 25}) + result = proxy.call(packet) + assert result is not None + assert result.as_dict()["doubled_age"] == 50 + + def test_variation_data_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + data = proxy.get_function_variation_data() + assert "function_name" in data + assert data["function_name"] == "double_age" + + def test_execution_data_delegates_after_bind(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + data = proxy.get_execution_data() + assert "python_version" in data + + def test_unbind_reverts_to_proxy_mode(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + proxy.bind(pf) + assert proxy.is_bound + proxy.unbind() + assert not proxy.is_bound + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError): + proxy.call(packet) + + def test_bind_rejects_mismatched_function_name(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def other_func(age: int) -> int: + return age + 1 + + other_pf = PythonPacketFunction( + other_func, output_keys="doubled_age", version="v1.0" + ) + with pytest.raises(ValueError, match="canonical_function_name"): + proxy.bind(other_pf) + + def test_bind_rejects_mismatched_version(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def double_age(age: int) -> int: + return age * 2 + + other_pf = PythonPacketFunction( + double_age, output_keys="doubled_age", version="v2.0" + ) + with pytest.raises(ValueError, match="major_version"): + proxy.bind(other_pf) + + def test_bind_rejects_mismatched_output_schema(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def double_age(age: int) -> str: + return str(age * 2) + + other_pf = PythonPacketFunction( + double_age, output_keys="doubled_age", version="v1.0" + ) + with pytest.raises(ValueError, match="output_packet_schema"): + proxy.bind(other_pf) + + def test_bind_rejects_mismatched_input_schema(self): + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + + def double_age(name: str) -> int: + return len(name) * 2 + + other_pf = PythonPacketFunction( + double_age, output_keys="doubled_age", version="v1.0" + ) + with pytest.raises(ValueError, match="input_packet_schema"): + proxy.bind(other_pf) + + +# ==================== Task 5: FunctionPod with proxy ==================== + + +class TestFunctionPodWithProxy: + """Tests for FunctionPod constructed with a PacketFunctionProxy.""" + + def test_function_pod_constructs_with_proxy(self): + """FunctionPod accepts a proxy and exposes it as packet_function.""" + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + assert pod.packet_function is proxy + + def test_function_pod_output_schema(self): + """FunctionPod with proxy correctly reports output schema.""" + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + source = DictSource( + data=[{"age": 10}, {"age": 20}, {"age": 30}], + ) + _tag_schema, packet_schema = pod.output_schema(source) + assert "doubled_age" in packet_schema + + def test_function_pod_process_packet_raises(self): + """FunctionPod with unbound proxy raises on process_packet.""" + pf = _make_sample_function() + proxy = _make_proxy_from_function(pf) + pod = FunctionPod(packet_function=proxy) + tag = Tag({}) + packet = Packet({"age": 25}) + with pytest.raises(PacketFunctionUnavailableError): + pod.process_packet(tag, packet) diff --git a/tests/test_pipeline/test_serialization.py b/tests/test_pipeline/test_serialization.py index 86357911..2eb8bd31 100644 --- a/tests/test_pipeline/test_serialization.py +++ b/tests/test_pipeline/test_serialization.py @@ -1067,3 +1067,190 @@ def test_function_database_round_trip(self, tmp_path): assert loaded.function_database is not None assert type(loaded.function_database) is DeltaTableDatabase + + +# --------------------------------------------------------------------------- +# Pipeline load with unavailable function (Tasks 9-10) +# --------------------------------------------------------------------------- + + +def _double_age(age: int) -> tuple[int, int]: + """A simple function that doubles age and preserves original.""" + return age * 2, age + + +def _corrupt_function_module_path(save_path): + """Edit saved pipeline JSON to make function module unimportable.""" + with open(save_path) as f: + data = json.load(f) + for node in data["nodes"].values(): + if node.get("node_type") == "function": + pf_config = node["function_pod"]["packet_function"]["config"] + pf_config["module_path"] = "nonexistent.module.that.does.not.exist" + with open(save_path, "w") as f: + json.dump(data, f) + + +class TestPipelineLoadWithUnavailableFunction: + """Tests for loading a pipeline when the function cannot be imported.""" + + @staticmethod + def _write_csv(path, rows): + """Write a list of dicts as a CSV file.""" + fieldnames = list(rows[0].keys()) + with open(path, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + def _build_and_save_pipeline(self, tmp_path): + """Build pipeline with CSV source and function pod, run it, save to JSON. + + Uses CSVSource (reconstructable) so the source survives load and the + function node enters the proxy path rather than the pure read-only path. + + Returns: + Tuple of (save_path, db_path). + """ + csv_path = str(tmp_path / "data.csv") + self._write_csv( + csv_path, + [{"name": "alice", "age": "30"}, {"name": "bob", "age": "25"}], + ) + + db_path = str(tmp_path / "db") + db = DeltaTableDatabase(base_path=db_path) + source = CSVSource( + file_path=csv_path, + tag_columns=["name"], + source_id="people", + ) + pf = PythonPacketFunction( + function=_double_age, + output_keys=["doubled_age", "original_age"], + function_name="_double_age", + ) + pod = FunctionPod(packet_function=pf) + pipeline = Pipeline(name="proxy_test", pipeline_database=db) + with pipeline: + pod.process(source, label="transform") + pipeline.run() + db.flush() + + save_path = str(tmp_path / "pipeline.json") + pipeline.save(save_path) + return save_path, db_path + + def _build_and_save_pipeline_with_operator(self, tmp_path): + """Build pipeline: CSV source -> function_pod -> SelectPacketColumns. + + Returns: + Tuple of (save_path, db_path). + """ + from orcapod.core.operators import SelectPacketColumns + + csv_path = str(tmp_path / "data.csv") + self._write_csv( + csv_path, + [{"name": "alice", "age": "30"}, {"name": "bob", "age": "25"}], + ) + + db_path = str(tmp_path / "db") + db = DeltaTableDatabase(base_path=db_path) + source = CSVSource( + file_path=csv_path, + tag_columns=["name"], + source_id="people", + ) + pf = PythonPacketFunction( + function=_double_age, + output_keys=["doubled_age", "original_age"], + function_name="_double_age", + ) + pod = FunctionPod(packet_function=pf) + select_op = SelectPacketColumns(columns=["doubled_age"]) + + pipeline = Pipeline(name="proxy_op_test", pipeline_database=db) + with pipeline: + fn_result = pod.process(source, label="transform") + select_op.process(fn_result, label="select_col") + pipeline.run() + db.flush() + + save_path = str(tmp_path / "pipeline.json") + pipeline.save(save_path) + return save_path, db_path + + # -- Task 9 tests -- + + def test_load_with_unavailable_function_has_read_only_status(self, tmp_path): + """Loading a pipeline with corrupted module_path yields READ_ONLY function node.""" + save_path, _ = self._build_and_save_pipeline(tmp_path) + _corrupt_function_module_path(save_path) + + loaded = Pipeline.load(save_path, mode="full") + + fn_nodes = [ + n + for n in loaded.compiled_nodes.values() + if n.node_type == "function" + ] + assert len(fn_nodes) == 1 + assert fn_nodes[0].load_status == LoadStatus.READ_ONLY + + def test_load_with_unavailable_function_can_get_all_records(self, tmp_path): + """A READ_ONLY function node with proxy can retrieve all cached records.""" + save_path, _ = self._build_and_save_pipeline(tmp_path) + _corrupt_function_module_path(save_path) + + loaded = Pipeline.load(save_path, mode="full") + fn_node = loaded.compiled_nodes["transform"] + + records = fn_node.get_all_records() + assert records is not None + assert records.num_rows == 2 + + def test_load_with_unavailable_function_iter_packets_yields_cached(self, tmp_path): + """A READ_ONLY function node with proxy yields cached packets via iter_packets.""" + save_path, _ = self._build_and_save_pipeline(tmp_path) + _corrupt_function_module_path(save_path) + + loaded = Pipeline.load(save_path, mode="full") + fn_node = loaded.compiled_nodes["transform"] + + packets = list(fn_node.iter_packets()) + assert len(packets) == 2 + + # Verify actual data values + doubled_ages = sorted(p.as_dict()["doubled_age"] for _, p in packets) + assert doubled_ages == [50, 60] + + original_ages = sorted(p.as_dict()["original_age"] for _, p in packets) + assert original_ages == [25, 30] + + # -- Task 10 test -- + + def test_downstream_operator_computes_from_cached(self, tmp_path): + """Operator downstream of READ_ONLY function node can compute from cached data.""" + save_path, _ = self._build_and_save_pipeline_with_operator(tmp_path) + _corrupt_function_module_path(save_path) + + loaded = Pipeline.load(save_path, mode="full") + + # Function node should be READ_ONLY (proxy) + fn_node = loaded.compiled_nodes["transform"] + assert fn_node.load_status == LoadStatus.READ_ONLY + + # Operator node should be FULL (it can reconstruct from cached upstream) + op_node = loaded.compiled_nodes["select_col"] + assert op_node.load_status == LoadStatus.FULL + + # Operator should produce correct results + table = op_node.as_table() + assert table.num_rows == 2 + assert "doubled_age" in table.column_names + # original_age should have been dropped by SelectPacketColumns + assert "original_age" not in table.column_names + + doubled_ages = sorted(table.column("doubled_age").to_pylist()) + assert doubled_ages == [50, 60] diff --git a/tests/test_pipeline/test_serialization_helpers.py b/tests/test_pipeline/test_serialization_helpers.py index 90d778cb..7feee9cb 100644 --- a/tests/test_pipeline/test_serialization_helpers.py +++ b/tests/test_pipeline/test_serialization_helpers.py @@ -294,3 +294,147 @@ def test_preserves_field_order(self): schema = {"z": int, "a": str, "m": float} result = self._round_trip(schema) assert list(result.keys()) == ["z", "a", "m"] + + +# --------------------------------------------------------------------------- +# PacketFunction proxy fallback +# --------------------------------------------------------------------------- + + +class TestResolvePacketFunctionFallbackToProxy: + """resolve_packet_function_from_config with fallback_to_proxy.""" + + def test_resolve_packet_function_fallback_to_proxy(self): + from orcapod.core.packet_function_proxy import PacketFunctionProxy + from orcapod.pipeline.serialization import resolve_packet_function_from_config + + config = { + "packet_function_type_id": "python.function.v0", + "config": { + "module_path": "nonexistent.module.that.does.not.exist", + "callable_name": "some_func", + "version": "v1.0", + "input_packet_schema": {"x": "int64"}, + "output_packet_schema": {"y": "float64"}, + "output_keys": ["y"], + }, + } + # Without fallback, should raise ImportError/ModuleNotFoundError + with pytest.raises((ImportError, ModuleNotFoundError)): + resolve_packet_function_from_config(config) + + # With fallback, should return proxy + result = resolve_packet_function_from_config(config, fallback_to_proxy=True) + assert isinstance(result, PacketFunctionProxy) + assert result.canonical_function_name == "some_func" + + def test_unknown_type_id_fallback(self): + from orcapod.core.packet_function_proxy import PacketFunctionProxy + from orcapod.pipeline.serialization import resolve_packet_function_from_config + + config = { + "packet_function_type_id": "unknown.type.v99", + "config": { + "callable_name": "mystery", + "version": "v1.0", + "input_packet_schema": {"a": "int64"}, + "output_packet_schema": {"b": "int64"}, + "output_keys": ["b"], + }, + } + with pytest.raises(ValueError, match="Unknown packet function type"): + resolve_packet_function_from_config(config) + + result = resolve_packet_function_from_config(config, fallback_to_proxy=True) + assert isinstance(result, PacketFunctionProxy) + assert result.canonical_function_name == "mystery" + + +# --------------------------------------------------------------------------- +# FunctionPod.from_config proxy fallback +# --------------------------------------------------------------------------- + + +class TestFunctionPodFromConfigFallbackToProxy: + """FunctionPod.from_config with fallback_to_proxy.""" + + def test_function_pod_from_config_fallback_to_proxy(self): + from orcapod.core.function_pod import FunctionPod + from orcapod.core.packet_function_proxy import PacketFunctionProxy + + config = { + "uri": ["some_func", "hash123", "v1", "python.function.v0"], + "packet_function": { + "packet_function_type_id": "python.function.v0", + "uri": ["some_func", "hash123", "v1", "python.function.v0"], + "config": { + "module_path": "nonexistent.module", + "callable_name": "some_func", + "version": "v1.0", + "input_packet_schema": {"x": "int64"}, + "output_packet_schema": {"y": "float64"}, + "output_keys": ["y"], + }, + }, + "node_config": None, + } + + with pytest.raises((ImportError, ModuleNotFoundError)): + FunctionPod.from_config(config) + + pod = FunctionPod.from_config(config, fallback_to_proxy=True) + assert isinstance(pod.packet_function, PacketFunctionProxy) + assert pod.packet_function.canonical_function_name == "some_func" + # URI comes from the packet function config + assert pod.packet_function.uri == ( + "some_func", + "hash123", + "v1", + "python.function.v0", + ) + + +# --------------------------------------------------------------------------- +# Schema round-trip consistency +# --------------------------------------------------------------------------- + + +class TestSchemaRoundTripConsistency: + """Verify serialize/deserialize round-trips preserve schema structure and hashes.""" + + def test_schema_round_trip_consistency(self): + """Verify deserialize_schema(serialize_schema(schema)) produces equivalent schemas.""" + from orcapod.contexts import resolve_context + + tc = resolve_context(None).type_converter + + schemas = [ + Schema({"x": int, "y": float, "name": str}), + Schema({"flag": bool, "data": bytes}), + Schema({"items": list[int], "mapping": dict[str, float]}), + ] + for schema in schemas: + serialized = serialize_schema(schema, type_converter=tc) + deserialized = Schema(deserialize_schema(serialized, type_converter=tc)) + assert set(deserialized.keys()) == set(schema.keys()), ( + f"Keys mismatch: {set(schema.keys())} vs {set(deserialized.keys())}" + ) + + def test_schema_round_trip_hash_consistency(self): + """Verify that schema hash is preserved through serialize/deserialize.""" + from orcapod.contexts import resolve_context + + ctx = resolve_context(None) + hasher = ctx.semantic_hasher + tc = ctx.type_converter + + schema = Schema({"age": int, "name": str, "score": float}) + original_hash = hasher.hash_object(schema).to_string() + + serialized = serialize_schema(schema, type_converter=tc) + deserialized = Schema(deserialize_schema(serialized, type_converter=tc)) + round_trip_hash = hasher.hash_object(deserialized).to_string() + + assert original_hash == round_trip_hash, ( + f"Hash diverged: {original_hash} vs {round_trip_hash}" + )