Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/orcapod/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
FunctionPod,
function_pod,
)
from .core.sources.arrow_table_source import ArrowTableSource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've adjusted import in the latest dev branch so that the entire sources subpackage (as in orcapod.core.sources) is available in the root namespace of the package. Let's discuss what specific Python objects other than Pipeline, FunctionPod, function_pod, if any, should be made available in the package root scope.

from .pipeline import Pipeline

# Subpackage re-exports for clean public API
Expand All @@ -13,6 +14,7 @@
from . import types # noqa: F401

__all__ = [
"ArrowTableSource",
"FunctionPod",
"function_pod",
"Pipeline",
Expand Down
55 changes: 55 additions & 0 deletions src/orcapod/core/cached_function_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
if TYPE_CHECKING:
import pyarrow as pa

from orcapod.pipeline.logging_capture import CapturedLogs

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -125,6 +127,59 @@ async def async_process_packet(
output = output.with_meta_columns(**{self.RESULT_COMPUTED_FLAG: True})
return tag, output

def process_packet_with_capture(
self, tag: TagProtocol, packet: PacketProtocol
) -> "tuple[TagProtocol, PacketProtocol | None, CapturedLogs]":
"""Process with pod-level caching, returning CapturedLogs alongside.

On cache hit, returns empty CapturedLogs (no function was executed).
"""
from orcapod.pipeline.logging_capture import CapturedLogs

cached = self._cache.lookup(packet)
if cached is not None:
logger.info("Pod-level cache hit")
return tag, cached, CapturedLogs(success=True)

tag, output, captured = self._function_pod.process_packet_with_capture(
tag, packet
)
if output is not None and captured.success:
pf = self._function_pod.packet_function
self._cache.store(
packet,
output,
variation_data=pf.get_function_variation_data(),
execution_data=pf.get_execution_data(),
)
output = output.with_meta_columns(**{self.RESULT_COMPUTED_FLAG: True})
return tag, output, captured

async def async_process_packet_with_capture(
self, tag: TagProtocol, packet: PacketProtocol
) -> "tuple[TagProtocol, PacketProtocol | None, CapturedLogs]":
"""Async counterpart of ``process_packet_with_capture``."""
from orcapod.pipeline.logging_capture import CapturedLogs

cached = self._cache.lookup(packet)
if cached is not None:
logger.info("Pod-level cache hit")
return tag, cached, CapturedLogs(success=True)

tag, output, captured = await self._function_pod.async_process_packet_with_capture(
tag, packet
)
if output is not None and captured.success:
pf = self._function_pod.packet_function
self._cache.store(
packet,
output,
variation_data=pf.get_function_variation_data(),
execution_data=pf.get_execution_data(),
)
output = output.with_meta_columns(**{self.RESULT_COMPUTED_FLAG: True})
return tag, output, captured

def get_all_cached_outputs(
self, include_system_columns: bool = False
) -> "pa.Table | None":
Expand Down
31 changes: 18 additions & 13 deletions src/orcapod/core/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from orcapod.pipeline.logging_capture import CapturedLogs
from orcapod.protocols.core_protocols import PacketFunctionProtocol, PacketProtocol


Expand Down Expand Up @@ -48,20 +49,20 @@ def execute(
self,
packet_function: PacketFunctionProtocol,
packet: PacketProtocol,
) -> PacketProtocol | None:
) -> "tuple[PacketProtocol | None, CapturedLogs]":
"""Synchronously execute *packet_function* on *packet*.

Implementations should call ``packet_function.direct_call(packet)``
to invoke the function's native computation, bypassing executor
routing.
routing, and pass through the ``(result, CapturedLogs)`` tuple.
"""
...

async def async_execute(
self,
packet_function: PacketFunctionProtocol,
packet: PacketProtocol,
) -> PacketProtocol | None:
) -> "tuple[PacketProtocol | None, CapturedLogs]":
"""Asynchronous counterpart of ``execute``.

The default implementation delegates to ``execute`` synchronously.
Expand Down Expand Up @@ -96,33 +97,37 @@ def execute_callable(
fn: Callable[..., Any],
kwargs: dict[str, Any],
executor_options: dict[str, Any] | None = None,
) -> Any:
"""Synchronously execute *fn* with *kwargs*.
) -> "tuple[Any, CapturedLogs]":
"""Synchronously execute *fn* with *kwargs*, returning captured I/O.

Default implementation calls ``fn(**kwargs)`` in-process.
Subclasses should override for remote/distributed execution.
Default implementation calls ``fn(**kwargs)`` with no capture and
returns empty :class:`~orcapod.pipeline.logging_capture.CapturedLogs`.
Exceptions propagate to the caller. Subclasses (e.g.
``LocalExecutor``, ``RayExecutor``) override to add I/O capture and
exception swallowing.

Args:
fn: The Python callable to execute.
kwargs: Keyword arguments to pass to *fn*.
executor_options: Optional per-call options.

Returns:
The raw return value of *fn*.
``(raw_result, CapturedLogs)``
"""
return fn(**kwargs)
from orcapod.pipeline.logging_capture import CapturedLogs

return fn(**kwargs), CapturedLogs()

async def async_execute_callable(
self,
fn: Callable[..., Any],
kwargs: dict[str, Any],
executor_options: dict[str, Any] | None = None,
) -> Any:
"""Asynchronously execute *fn* with *kwargs*.
) -> "tuple[Any, CapturedLogs]":
"""Asynchronously execute *fn* with *kwargs*, returning captured I/O.

Default implementation delegates to ``execute_callable``
synchronously. Subclasses should override for truly async
execution.
synchronously. Subclasses should override for truly async execution.
"""
return self.execute_callable(fn, kwargs, executor_options)

Expand Down
50 changes: 39 additions & 11 deletions src/orcapod/core/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import asyncio
import inspect
import traceback as _traceback_module
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

from orcapod.core.executors.base import PacketFunctionExecutorBase

if TYPE_CHECKING:
from orcapod.pipeline.logging_capture import CapturedLogs
from orcapod.protocols.core_protocols import PacketFunctionProtocol, PacketProtocol


Expand All @@ -29,14 +31,14 @@ def execute(
self,
packet_function: PacketFunctionProtocol,
packet: PacketProtocol,
) -> PacketProtocol | None:
) -> "tuple[PacketProtocol | None, CapturedLogs]":
return packet_function.direct_call(packet)

async def async_execute(
self,
packet_function: PacketFunctionProtocol,
packet: PacketProtocol,
) -> PacketProtocol | None:
) -> "tuple[PacketProtocol | None, CapturedLogs]":
return await packet_function.direct_async_call(packet)

# -- PythonFunctionExecutorProtocol --
Expand All @@ -46,10 +48,23 @@ def execute_callable(
fn: Callable[..., Any],
kwargs: dict[str, Any],
executor_options: dict[str, Any] | None = None,
) -> Any:
if inspect.iscoroutinefunction(fn):
return self._run_async_sync(fn, kwargs)
return fn(**kwargs)
) -> "tuple[Any, CapturedLogs]":
from orcapod.pipeline.logging_capture import CapturedLogs, LocalCaptureContext

ctx = LocalCaptureContext()
raw_result = None
success = True
tb: str | None = None
with ctx:
try:
if inspect.iscoroutinefunction(fn):
raw_result = self._run_async_sync(fn, kwargs)
else:
raw_result = fn(**kwargs)
except Exception:
success = False
tb = _traceback_module.format_exc()
return raw_result, ctx.get_captured(success=success, tb=tb)

@staticmethod
def _run_async_sync(fn: Callable[..., Any], kwargs: dict[str, Any]) -> Any:
Expand All @@ -69,11 +84,24 @@ async def async_execute_callable(
fn: Callable[..., Any],
kwargs: dict[str, Any],
executor_options: dict[str, Any] | None = None,
) -> Any:
if inspect.iscoroutinefunction(fn):
return await fn(**kwargs)
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: fn(**kwargs))
) -> "tuple[Any, CapturedLogs]":
from orcapod.pipeline.logging_capture import CapturedLogs, LocalCaptureContext

ctx = LocalCaptureContext()
raw_result = None
success = True
tb: str | None = None
with ctx:
try:
if inspect.iscoroutinefunction(fn):
raw_result = await fn(**kwargs)
else:
loop = asyncio.get_running_loop()
raw_result = await loop.run_in_executor(None, lambda: fn(**kwargs))
except Exception:
success = False
tb = _traceback_module.format_exc()
return raw_result, ctx.get_captured(success=success, tb=tb)

def with_options(self, **opts: Any) -> "LocalExecutor":
"""Return a new ``LocalExecutor``.
Expand Down
Loading
Loading