feat: Python UDFs: per-session inlining toggle and strict refusal setting#1546
Conversation
…fusal
Adds a per-session toggle that turns inline Python UDF encoding on or
off, plus the supporting plumbing to make it usable through
pickle.dumps.
Codec layer:
* PythonLogicalCodec / PythonPhysicalCodec gain a python_udf_inlining
bool (default true) and a with_python_udf_inlining(enabled) builder.
Each try_encode_udf{,af,wf} short-circuits to inner when the toggle
is off; each try_decode_udf{,af,wf} that recognizes a DFPY* magic
on a strict codec returns a clean Execution error instead of
invoking cloudpickle.loads. The refusal message names the UDF and
the wire family so an operator can see at a glance whether to
re-encode the bytes or register the UDF on the receiver.
Session layer:
* PySessionContext::with_python_udf_inlining(enabled) returns a new
session whose stacked logical + physical codecs both carry the
toggle. The Arc<SessionState> is cloned (cheap), only the codec
pair is rebuilt, so registrations and config stay attached.
* SessionContext.with_python_udf_inlining(*, enabled) is the Python
wrapper. enabled is keyword-only because positional booleans at
the call site read as opaque.
Sender-side context:
* datafusion.ipc gains set_sender_ctx / get_sender_ctx /
clear_sender_ctx thread-locals. Expr.__reduce__ now consults
get_sender_ctx() to pick the codec for outbound pickles, which is
the only path through which a strict session affects pickle.dumps
(the protocol calls __reduce__ with no arguments). Without a
sender context the default codec is used.
Tests:
* test_pickle_expr.py picks up TestPythonUdfInliningToggle (covers
both directions of the toggle plus the explicit-ctx fast path),
TestWorkerCtxLifecycle (set/clear/threading), and
TestSenderCtxLifecycle.
* New test_pickle_multiprocessing.py + helpers exercise the full
driver -> worker round-trip on a multiprocessing.Pool with set_*_ctx
installed in the worker initializer.
* CI workflow gets a 30-minute timeout-minutes backstop so a hung
pickle worker can't block the matrix indefinitely.
User-guide docs and the runnable examples land in PR4 of this series.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
d680e12 to
14178db
Compare
Rewrite with_python_udf_inlining docstring for readability and remove references to /user-guide/io/distributing_work, which does not exist yet. Keep security warning inline as a .. warning:: Security block, matching the existing pattern in Expr.to_bytes / from_bytes / __reduce__. The central doc will land in a follow-on PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per CLAUDE.md, every Python function needs a docstring example. Adds examples to with_python_udf_inlining, set_sender_ctx, clear_sender_ctx, and get_sender_ctx. Also clarifies that with_python_udf_inlining returns a new SessionContext and leaves the original unchanged, matching the with_logical_extension_codec pattern. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* codec: strict refusal routes through `read_framed_payload` so malformed inline bytes surface their own diagnostic; the "inlining is disabled" message now fires only when the payload would have decoded. * codec: add summary line above `PythonPhysicalCodec::with_python_udf_inlining` cross-link for rustdoc rendering. * expr: hoist `get_sender_ctx` import to module top; note that `__reduce__` also drives `copy.copy` / `copy.deepcopy`. * context: accept `with_python_udf_inlining` positionally or as kwarg (drop `*,`). * tests: replace size-ratio heuristic with semantic check for the `DFPYUDF` family prefix; switch single-batch closure test to `pool.apply`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- `SessionContext.with_python_udf_inlining` now keyword-only (`*, enabled`) to match the documented call style and the existing doctests/tests. - `refuse_if_inline` and the three `try_decode_python_*` decoders short- circuit on a `starts_with(family)` check before `Python::attach`, so plans whose UDFs are not Python-defined no longer pay a GIL acquisition per decode call. Semantics preserved: `strip_wire_header` already returns `Ok(None)` when the prefix does not match. - `datafusion.ipc` module docstring wraps the `set_sender_ctx` example in `try`/`finally` and notes that the thread-local holds a strong reference to the installed `SessionContext` until cleared. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Multiprocessing forkserver/spawn hang was diagnosed and fixed: workers could not import `tests._pickle_multiprocessing_helpers` because `pytest --import-mode=importlib` does not add the test parent dir to `sys.path`. The fix (appending the parent dir to `sys.path` so it is inherited by mp workers without shadowing the installed `datafusion` wheel) is retained. This commit drops the diagnostic scaffolding that was added to identify the hang point: - `_diag` + per-import / per-task log writes to /tmp - `snapshot_processes` and the `threading.Timer` that captured worker state mid-hang - `diag_init` Pool initializer - "Dump multiprocessing diagnostic log" CI step Pre-existing infrastructure is kept: per-test `@pytest.mark.timeout(120)` (backed by `pytest-timeout` dev dep) and the job-level `timeout-minutes: 30` backstop on the test matrix. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds a per-session switch to control whether Python UDF definitions are inlined into serialized expressions, enabling “strict” decode behavior that refuses inline (cloudpickled) payloads, and introduces a driver-side sender context so pickle.dumps(Expr) can honor that session configuration.
Changes:
- Introduce
SessionContext.with_python_udf_inlining(enabled=...)and plumb the toggle through the Rust logical/physical codecs to gate inline encode/decode (and refuse inline payloads when disabled). - Add
datafusion.ipcsender-context APIs (set_sender_ctx/get_sender_ctx/clear_sender_ctx) and wireExpr.__reduce__to use the sender context for pickling. - Add unit tests (including multiprocessing coverage) plus CI/test harness guardrails (
pytest-timeout, workflow timeout).
Reviewed changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
crates/core/src/codec.rs |
Adds the inlining toggle to Python codecs and strict refusal logic for inline UDF payloads. |
crates/core/src/context.rs |
Exposes a Rust PySessionContext.with_python_udf_inlining constructor-style method. |
python/datafusion/context.py |
Adds public Python API SessionContext.with_python_udf_inlining. |
python/datafusion/expr.py |
Makes pickling honor a driver-side sender context via get_sender_ctx() and updates serialization docs. |
python/datafusion/ipc.py |
Adds sender-context thread-local APIs and expands driver/worker distribution docs. |
python/tests/test_pickle_expr.py |
Adds coverage for strict inlining behavior and sender/worker context lifecycle semantics. |
python/tests/test_pickle_multiprocessing.py |
Adds cross-process pickle tests across multiprocessing start methods with timeouts. |
python/tests/_pickle_multiprocessing_helpers.py |
Provides importable worker helpers for multiprocessing tests (not pytest-collected). |
.github/workflows/test.yml |
Adds a job-level timeout to prevent hung multiprocessing runs from stalling CI. |
pyproject.toml |
Adds pytest-timeout to dev dependencies. |
uv.lock |
Locks pytest-timeout and updates the resolved dev dependency set. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address PR review feedback: - codec.rs: rewrite strict-refusal error to present the two real remediations (sender re-encode by-name + receiver register; or receiver enables inlining, accepting cloudpickle risk) instead of bundling registration with both-side inlining. - expr.py: qualify to_bytes docstring so Python UDF self-contained behavior is conditional on with_python_udf_inlining being enabled. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| // codec time, not a planner-stage failure. Downstream error | ||
| // classification keys off the variant — surfacing this as a planner | ||
| // error would mis-route it into "fix your SQL" buckets. | ||
| datafusion::error::DataFusionError::Execution(format!( |
There was a problem hiding this comment.
It would be nice if there was a page on this so we could include a url with even more context. I think your descriptions in the previous two prs are good but I suspect someone will stumble upon this with very little context as a general user and thinking about how to mitigate that. Can be wrapped in next PR or even a follow on for further nits/clarifiaction.
Reword docstring to drop misleading "(the default)" claim. The `enabled` parameter is keyword-only and required — there is no argument default. Note instead that fresh sessions inline UDFs until the toggle overrides them (a session-level default, not an argument default). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…string Replace placeholder isinstance check with a doctest that registers a Python UDF, encodes an expression on the default session, then shows the strict session refusing to decode the inline payload. Exercises the actual behavior the toggle controls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the code-block in the ipc module docstring that demonstrated set_sender_ctx with a doctest that actually runs. Worker-init example remains a code-block since it documents a Pool-initializer pattern that does not fit naturally into a doctest. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Bare 'thread-local' as a noun reads ambiguously next to the _local.ctx attribute name. Hyphenate as adjective with explicit 'sender context' noun so the referent is unambiguous. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The trailing cleanup call was test hygiene, not API teaching, and risked implying callers must always pair set with clear. Adjacent clear_sender_ctx and get_sender_ctx doctests are self-contained (they explicitly set or clear before asserting), so removing the cleanup line does not affect doctest outcomes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Which issue does this PR close?
Addresses part of #1517
This is PR 3 of 4. The subsequent PRs target this branch's tip until it merges.
Follow up PR:
Rationale for this change
PRs 1 and 2 ship Python UDFs inline through the codec. There is a follow-on need: Untrusted-input decoding. A receiver that may read
Expr.from_bytesinput from an untrusted source must refuse to invokecloudpickle.loadson the inline payload. (pickle.loadson untrusted input is still unsafe regardless of this toggle — see the security note in the docstrings.)We resolve this by an on/off switch at the codec level. The codec already sits on every session, so the toggle is naturally per-session.
What changes are included in this PR?
SessionContextlevel to enable/disable Python inlining of UDFs, which gets passed through to the codec layer.Are there any user-facing changes?
Yes, but it's pure addition:
SessionContext.with_python_udf_inliningis a new method.datafusion.ipc.set_sender_ctx/get_sender_ctx/clear_sender_ctxare new functions for propagating a configured session throughpickle.dumps.The user-guide page documenting the full pattern (and the multiprocessing / Ray runnable examples) lands in PR 4 of this series.