Skip to content

Commit 081325a

Browse files
timsaucerclaude
andauthored
feat: Python UDFs: per-session inlining toggle and strict refusal setting (#1546)
* feat: per-session Python UDF inlining toggle + sender ctx + strict refusal Adds a per-session toggle that turns inline Python UDF encoding on or off, plus the supporting plumbing to make it usable through pickle.dumps. Codec layer: * PythonLogicalCodec / PythonPhysicalCodec gain a python_udf_inlining bool (default true) and a with_python_udf_inlining(enabled) builder. Each try_encode_udf{,af,wf} short-circuits to inner when the toggle is off; each try_decode_udf{,af,wf} that recognizes a DFPY* magic on a strict codec returns a clean Execution error instead of invoking cloudpickle.loads. The refusal message names the UDF and the wire family so an operator can see at a glance whether to re-encode the bytes or register the UDF on the receiver. Session layer: * PySessionContext::with_python_udf_inlining(enabled) returns a new session whose stacked logical + physical codecs both carry the toggle. The Arc<SessionState> is cloned (cheap), only the codec pair is rebuilt, so registrations and config stay attached. * SessionContext.with_python_udf_inlining(*, enabled) is the Python wrapper. enabled is keyword-only because positional booleans at the call site read as opaque. Sender-side context: * datafusion.ipc gains set_sender_ctx / get_sender_ctx / clear_sender_ctx thread-locals. Expr.__reduce__ now consults get_sender_ctx() to pick the codec for outbound pickles, which is the only path through which a strict session affects pickle.dumps (the protocol calls __reduce__ with no arguments). Without a sender context the default codec is used. Tests: * test_pickle_expr.py picks up TestPythonUdfInliningToggle (covers both directions of the toggle plus the explicit-ctx fast path), TestWorkerCtxLifecycle (set/clear/threading), and TestSenderCtxLifecycle. * New test_pickle_multiprocessing.py + helpers exercise the full driver -> worker round-trip on a multiprocessing.Pool with set_*_ctx installed in the worker initializer. * CI workflow gets a 30-minute timeout-minutes backstop so a hung pickle worker can't block the matrix indefinitely. User-guide docs and the runnable examples land in PR4 of this series. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * update uv lock * docs: clarify Python UDF inlining docstring; drop unresolved :doc: refs Rewrite with_python_udf_inlining docstring for readability and remove references to /user-guide/io/distributing_work, which does not exist yet. Keep security warning inline as a .. warning:: Security block, matching the existing pattern in Expr.to_bytes / from_bytes / __reduce__. The central doc will land in a follow-on PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: add doctest examples for sender ctx + UDF inlining toggle Per CLAUDE.md, every Python function needs a docstring example. Adds examples to with_python_udf_inlining, set_sender_ctx, clear_sender_ctx, and get_sender_ctx. Also clarifies that with_python_udf_inlining returns a new SessionContext and leaves the original unchanged, matching the with_logical_extension_codec pattern. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: address review nits for UDF inlining toggle + sender ctx * codec: strict refusal routes through `read_framed_payload` so malformed inline bytes surface their own diagnostic; the "inlining is disabled" message now fires only when the payload would have decoded. * codec: add summary line above `PythonPhysicalCodec::with_python_udf_inlining` cross-link for rustdoc rendering. * expr: hoist `get_sender_ctx` import to module top; note that `__reduce__` also drives `copy.copy` / `copy.deepcopy`. * context: accept `with_python_udf_inlining` positionally or as kwarg (drop `*,`). * tests: replace size-ratio heuristic with semantic check for the `DFPYUDF` family prefix; switch single-batch closure test to `pool.apply`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: keyword-only inlining flag, skip GIL on prefix mismatch - `SessionContext.with_python_udf_inlining` now keyword-only (`*, enabled`) to match the documented call style and the existing doctests/tests. - `refuse_if_inline` and the three `try_decode_python_*` decoders short- circuit on a `starts_with(family)` check before `Python::attach`, so plans whose UDFs are not Python-defined no longer pay a GIL acquisition per decode call. Semantics preserved: `strip_wire_header` already returns `Ok(None)` when the prefix does not match. - `datafusion.ipc` module docstring wraps the `set_sender_ctx` example in `try`/`finally` and notes that the thread-local holds a strong reference to the installed `SessionContext` until cleared. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * Add dev dependency * Add testing for CI failure * Additional debugging for mp tests in CI * Set path for workers * more path updates for unit tests * test(pickle): remove multiprocessing CI debug instrumentation Multiprocessing forkserver/spawn hang was diagnosed and fixed: workers could not import `tests._pickle_multiprocessing_helpers` because `pytest --import-mode=importlib` does not add the test parent dir to `sys.path`. The fix (appending the parent dir to `sys.path` so it is inherited by mp workers without shadowing the installed `datafusion` wheel) is retained. This commit drops the diagnostic scaffolding that was added to identify the hang point: - `_diag` + per-import / per-task log writes to /tmp - `snapshot_processes` and the `threading.Timer` that captured worker state mid-hang - `diag_init` Pool initializer - "Dump multiprocessing diagnostic log" CI step Pre-existing infrastructure is kept: per-test `@pytest.mark.timeout(120)` (backed by `pytest-timeout` dev dep) and the job-level `timeout-minutes: 30` backstop on the test matrix. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * Shorten rust side docstring since it's duplicative of the exposed python docstring * docs: clarify strict-mode refusal message and to_bytes inlining docs Address PR review feedback: - codec.rs: rewrite strict-refusal error to present the two real remediations (sender re-encode by-name + receiver register; or receiver enables inlining, accepting cloudpickle risk) instead of bundling registration with both-side inlining. - expr.py: qualify to_bytes docstring so Python UDF self-contained behavior is conditional on with_python_udf_inlining being enabled. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: clarify with_python_udf_inlining enabled arg is required Reword docstring to drop misleading "(the default)" claim. The `enabled` parameter is keyword-only and required — there is no argument default. Note instead that fresh sessions inline UDFs until the toggle overrides them (a session-level default, not an argument default). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: demonstrate strict-mode refusal in with_python_udf_inlining docstring Replace placeholder isinstance check with a doctest that registers a Python UDF, encodes an expression on the default session, then shows the strict session refusing to decode the inline payload. Exercises the actual behavior the toggle controls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: convert sender-ctx example to executable doctest Replace the code-block in the ipc module docstring that demonstrated set_sender_ctx with a doctest that actually runs. Worker-init example remains a code-block since it documents a Pool-initializer pattern that does not fit naturally into a doctest. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: use 'thread-local sender context' as adjectival phrase Bare 'thread-local' as a noun reads ambiguously next to the _local.ctx attribute name. Hyphenate as adjective with explicit 'sender context' noun so the referent is unambiguous. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: drop trailing clear_sender_ctx from set_sender_ctx example The trailing cleanup call was test hygiene, not API teaching, and risked implying callers must always pair set with clear. Adjacent clear_sender_ctx and get_sender_ctx doctests are self-contained (they explicitly set or clear before asserting), so removing the cleanup line does not affect doctest outcomes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent fa021fe commit 081325a

11 files changed

Lines changed: 847 additions & 45 deletions

File tree

.github/workflows/test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ env:
2929
jobs:
3030
test-matrix:
3131
runs-on: ubuntu-latest
32+
# Backstop: a hung multiprocessing worker (e.g. during a pickle regression)
33+
# should not block CI longer than this.
34+
timeout-minutes: 30
3235
strategy:
3336
fail-fast: false
3437
matrix:

crates/core/src/codec.rs

Lines changed: 137 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -232,16 +232,39 @@ fn strip_wire_header<'a>(
232232
#[derive(Debug)]
233233
pub struct PythonLogicalCodec {
234234
inner: Arc<dyn LogicalExtensionCodec>,
235+
python_udf_inlining: bool,
235236
}
236237

237238
impl PythonLogicalCodec {
238239
pub fn new(inner: Arc<dyn LogicalExtensionCodec>) -> Self {
239-
Self { inner }
240+
Self {
241+
inner,
242+
python_udf_inlining: true,
243+
}
240244
}
241245

242246
pub fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> {
243247
&self.inner
244248
}
249+
250+
/// Toggle inline encoding of Python UDFs. See
251+
/// `SessionContext.with_python_udf_inlining` (Python) for full
252+
/// behavior and use cases.
253+
///
254+
/// Security scope: strict mode (`false`) narrows only the codec
255+
/// layer — it stops `Expr::from_bytes` from invoking
256+
/// `cloudpickle.loads` on the inline `DFPY*` payload. It does
257+
/// **not** make `pickle.loads(untrusted_bytes)` safe; treat every
258+
/// `pickle.loads` on untrusted input as unsafe regardless of this
259+
/// setting.
260+
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
261+
self.python_udf_inlining = enabled;
262+
self
263+
}
264+
265+
pub fn python_udf_inlining(&self) -> bool {
266+
self.python_udf_inlining
267+
}
245268
}
246269

247270
impl Default for PythonLogicalCodec {
@@ -301,48 +324,104 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
301324
}
302325

303326
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
304-
if try_encode_python_scalar_udf(node, buf)? {
327+
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
305328
return Ok(());
306329
}
307330
self.inner.try_encode_udf(node, buf)
308331
}
309332

310333
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
311-
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
312-
return Ok(udf);
334+
if self.python_udf_inlining {
335+
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
336+
return Ok(udf);
337+
}
338+
} else {
339+
refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?;
313340
}
314341
self.inner.try_decode_udf(name, buf)
315342
}
316343

317344
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
318-
if try_encode_python_udaf(node, buf)? {
345+
if self.python_udf_inlining && try_encode_python_udaf(node, buf)? {
319346
return Ok(());
320347
}
321348
self.inner.try_encode_udaf(node, buf)
322349
}
323350

324351
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
325-
if let Some(udaf) = try_decode_python_udaf(buf)? {
326-
return Ok(udaf);
352+
if self.python_udf_inlining {
353+
if let Some(udaf) = try_decode_python_udaf(buf)? {
354+
return Ok(udaf);
355+
}
356+
} else {
357+
refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?;
327358
}
328359
self.inner.try_decode_udaf(name, buf)
329360
}
330361

331362
fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
332-
if try_encode_python_udwf(node, buf)? {
363+
if self.python_udf_inlining && try_encode_python_udwf(node, buf)? {
333364
return Ok(());
334365
}
335366
self.inner.try_encode_udwf(node, buf)
336367
}
337368

338369
fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
339-
if let Some(udwf) = try_decode_python_udwf(buf)? {
340-
return Ok(udwf);
370+
if self.python_udf_inlining {
371+
if let Some(udwf) = try_decode_python_udwf(buf)? {
372+
return Ok(udwf);
373+
}
374+
} else {
375+
refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?;
341376
}
342377
self.inner.try_decode_udwf(name, buf)
343378
}
344379
}
345380

381+
/// Strict-mode gate: if `buf` is a well-framed inline payload for
382+
/// `family`, return the strict-refusal error; otherwise return
383+
/// `Ok(())` so the caller can delegate to its `inner` codec.
384+
///
385+
/// Routing through [`read_framed_payload`] (rather than a bare
386+
/// `starts_with` probe) means malformed inline bytes — wrong
387+
/// wire-format version, mismatched Python version, truncated header —
388+
/// surface *their* diagnostic instead of the strict-mode message.
389+
/// The strict message implies sender intent ("inlining is disabled"),
390+
/// so it should fire only when the bytes really would have decoded.
391+
///
392+
/// Fast path: short-circuit on the family-magic prefix before
393+
/// acquiring the GIL. Plans with many non-Python UDFs would otherwise
394+
/// pay a GIL acquisition per decode call just to confirm "not a
395+
/// Python UDF". `read_framed_payload` itself rejects buffers that
396+
/// don't start with `family`, so this is purely an optimization.
397+
fn refuse_if_inline(buf: &[u8], family: &[u8], kind: &str, name: &str) -> Result<()> {
398+
if !buf.starts_with(family) {
399+
return Ok(());
400+
}
401+
Python::attach(|py| match read_framed_payload(py, buf, family, kind)? {
402+
Some(_) => Err(refuse_inline_payload(kind, name)),
403+
None => Ok(()),
404+
})
405+
}
406+
407+
/// Build the error returned by a strict codec when it receives an
408+
/// inline Python-UDF payload it has been told not to deserialize.
409+
fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusionError {
410+
// `Execution`, not `Plan`: this is a wire-format decode refusal at
411+
// codec time, not a planner-stage failure. Downstream error
412+
// classification keys off the variant — surfacing this as a planner
413+
// error would mis-route it into "fix your SQL" buckets.
414+
datafusion::error::DataFusionError::Execution(format!(
415+
"Refusing to deserialize inline Python {kind} '{name}': Python UDF \
416+
inlining is disabled on this session. Two remediations: \
417+
(1) ask the sender to re-encode with inlining disabled so '{name}' \
418+
travels by name, and register '{name}' on this receiver; or \
419+
(2) enable inlining on this receiver (accepts the cloudpickle \
420+
execution risk on inbound payloads). Receivers cannot re-encode \
421+
bytes they did not produce."
422+
))
423+
}
424+
346425
/// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`] parked
347426
/// on the same `SessionContext`. Carries the Python-aware encoding
348427
/// hooks for physical-layer types (`ExecutionPlan`, `PhysicalExpr`)
@@ -358,16 +437,33 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
358437
#[derive(Debug)]
359438
pub struct PythonPhysicalCodec {
360439
inner: Arc<dyn PhysicalExtensionCodec>,
440+
python_udf_inlining: bool,
361441
}
362442

363443
impl PythonPhysicalCodec {
364444
pub fn new(inner: Arc<dyn PhysicalExtensionCodec>) -> Self {
365-
Self { inner }
445+
Self {
446+
inner,
447+
python_udf_inlining: true,
448+
}
366449
}
367450

368451
pub fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
369452
&self.inner
370453
}
454+
455+
/// Toggle inline encoding of Python UDFs on this physical codec.
456+
///
457+
/// Mirrors [`PythonLogicalCodec::with_python_udf_inlining`]; see
458+
/// that method for the full security and portability discussion.
459+
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
460+
self.python_udf_inlining = enabled;
461+
self
462+
}
463+
464+
pub fn python_udf_inlining(&self) -> bool {
465+
self.python_udf_inlining
466+
}
371467
}
372468

373469
impl Default for PythonPhysicalCodec {
@@ -391,15 +487,19 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
391487
}
392488

393489
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
394-
if try_encode_python_scalar_udf(node, buf)? {
490+
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
395491
return Ok(());
396492
}
397493
self.inner.try_encode_udf(node, buf)
398494
}
399495

400496
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
401-
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
402-
return Ok(udf);
497+
if self.python_udf_inlining {
498+
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
499+
return Ok(udf);
500+
}
501+
} else {
502+
refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?;
403503
}
404504
self.inner.try_decode_udf(name, buf)
405505
}
@@ -417,29 +517,37 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
417517
}
418518

419519
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
420-
if try_encode_python_udaf(node, buf)? {
520+
if self.python_udf_inlining && try_encode_python_udaf(node, buf)? {
421521
return Ok(());
422522
}
423523
self.inner.try_encode_udaf(node, buf)
424524
}
425525

426526
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
427-
if let Some(udaf) = try_decode_python_udaf(buf)? {
428-
return Ok(udaf);
527+
if self.python_udf_inlining {
528+
if let Some(udaf) = try_decode_python_udaf(buf)? {
529+
return Ok(udaf);
530+
}
531+
} else {
532+
refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?;
429533
}
430534
self.inner.try_decode_udaf(name, buf)
431535
}
432536

433537
fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
434-
if try_encode_python_udwf(node, buf)? {
538+
if self.python_udf_inlining && try_encode_python_udwf(node, buf)? {
435539
return Ok(());
436540
}
437541
self.inner.try_encode_udwf(node, buf)
438542
}
439543

440544
fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
441-
if let Some(udwf) = try_decode_python_udwf(buf)? {
442-
return Ok(udwf);
545+
if self.python_udf_inlining {
546+
if let Some(udwf) = try_decode_python_udwf(buf)? {
547+
return Ok(udwf);
548+
}
549+
} else {
550+
refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?;
443551
}
444552
self.inner.try_decode_udwf(name, buf)
445553
}
@@ -476,6 +584,9 @@ pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec<u8>)
476584
/// the caller to delegate to its `inner` codec (and eventually the
477585
/// `FunctionRegistry`).
478586
pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result<Option<Arc<ScalarUDF>>> {
587+
if !buf.starts_with(PY_SCALAR_UDF_FAMILY) {
588+
return Ok(None);
589+
}
479590
Python::attach(|py| -> Result<Option<Arc<ScalarUDF>>> {
480591
let Some(payload) = read_framed_payload(py, buf, PY_SCALAR_UDF_FAMILY, "scalar UDF")?
481592
else {
@@ -732,6 +843,9 @@ pub(crate) fn try_encode_python_udwf(node: &WindowUDF, buf: &mut Vec<u8>) -> Res
732843
}
733844

734845
pub(crate) fn try_decode_python_udwf(buf: &[u8]) -> Result<Option<Arc<WindowUDF>>> {
846+
if !buf.starts_with(PY_WINDOW_UDF_FAMILY) {
847+
return Ok(None);
848+
}
735849
Python::attach(|py| -> Result<Option<Arc<WindowUDF>>> {
736850
let Some(payload) = read_framed_payload(py, buf, PY_WINDOW_UDF_FAMILY, "window UDF")?
737851
else {
@@ -814,6 +928,9 @@ pub(crate) fn try_encode_python_udaf(node: &AggregateUDF, buf: &mut Vec<u8>) ->
814928
}
815929

816930
pub(crate) fn try_decode_python_udaf(buf: &[u8]) -> Result<Option<Arc<AggregateUDF>>> {
931+
if !buf.starts_with(PY_AGG_UDF_FAMILY) {
932+
return Ok(None);
933+
}
817934
Python::attach(|py| -> Result<Option<Arc<AggregateUDF>>> {
818935
let Some(payload) = read_framed_payload(py, buf, PY_AGG_UDF_FAMILY, "aggregate UDF")?
819936
else {

crates/core/src/context.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,6 +1453,22 @@ impl PySessionContext {
14531453
physical_codec,
14541454
})
14551455
}
1456+
1457+
pub fn with_python_udf_inlining(&self, enabled: bool) -> Self {
1458+
let logical_codec = Arc::new(
1459+
PythonLogicalCodec::new(Arc::clone(self.logical_codec.inner()))
1460+
.with_python_udf_inlining(enabled),
1461+
);
1462+
let physical_codec = Arc::new(
1463+
PythonPhysicalCodec::new(Arc::clone(self.physical_codec.inner()))
1464+
.with_python_udf_inlining(enabled),
1465+
);
1466+
Self {
1467+
ctx: Arc::clone(&self.ctx),
1468+
logical_codec,
1469+
physical_codec,
1470+
}
1471+
}
14561472
}
14571473

14581474
impl PySessionContext {

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ dev = [
203203
"pyarrow>=19.0.0",
204204
"pygithub==2.5.0",
205205
"pytest-asyncio>=0.23.3",
206+
"pytest-timeout>=2.3.1",
206207
"pytest>=7.4.4",
207208
"pyyaml>=6.0.3",
208209
"ruff>=0.15.1",

python/datafusion/context.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1963,3 +1963,61 @@ def with_physical_extension_codec(
19631963
new = SessionContext.__new__(SessionContext)
19641964
new.ctx = new_internal
19651965
return new
1966+
1967+
def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext:
1968+
"""Control whether Python UDFs are embedded in serialized expressions.
1969+
1970+
``enabled`` is keyword-only and required: callers must pick a
1971+
mode explicitly. Fresh sessions inline UDFs (``enabled=True``
1972+
behavior) until this method overrides the toggle.
1973+
1974+
With ``enabled=True``, serialized expressions carry the Python
1975+
code for any scalar, aggregate, or window UDFs they reference.
1976+
The receiver rebuilds the UDFs from those bytes and does not
1977+
need to register them first.
1978+
1979+
With ``enabled=False``, serialized expressions store only the
1980+
UDF names. This has two uses:
1981+
1982+
* **Cross-language portability.** The bytes can be decoded by a
1983+
non-Python receiver, which must already have UDFs registered
1984+
under matching names.
1985+
* **Safer deserialization.** :meth:`Expr.from_bytes` will refuse
1986+
to rebuild Python UDFs rather than call ``cloudpickle.loads``
1987+
on untrusted input.
1988+
1989+
The setting affects :meth:`Expr.to_bytes` and
1990+
:meth:`Expr.from_bytes` whenever this session is passed as the
1991+
``ctx`` argument. :func:`pickle.dumps` and :func:`pickle.loads`
1992+
do not pass a context, so to apply the setting through pickle,
1993+
register this session with
1994+
:func:`datafusion.ipc.set_sender_ctx` on the sender and
1995+
:func:`datafusion.ipc.set_worker_ctx` on the receiver.
1996+
1997+
.. warning:: Security
1998+
This setting narrows only :meth:`Expr.from_bytes`. Calling
1999+
:func:`pickle.loads` on untrusted bytes remains unsafe
2000+
regardless of the toggle.
2001+
2002+
Returns a new :class:`SessionContext` with the toggle applied;
2003+
the original session is unchanged.
2004+
2005+
Examples:
2006+
>>> import pyarrow as pa
2007+
>>> from datafusion import SessionContext, Expr, col, udf
2008+
>>> ctx = SessionContext()
2009+
>>> identity = udf(lambda a: a, [pa.int64()], pa.int64(),
2010+
... volatility="immutable", name="identity_demo")
2011+
>>> ctx.register_udf(identity)
2012+
>>> blob = identity(col("x")).to_bytes(ctx)
2013+
>>> strict = SessionContext().with_python_udf_inlining(enabled=False)
2014+
>>> try:
2015+
... Expr.from_bytes(blob, strict)
2016+
... except Exception as e:
2017+
... print("Refusing to deserialize" in str(e))
2018+
True
2019+
"""
2020+
new_internal = self.ctx.with_python_udf_inlining(enabled)
2021+
new = SessionContext.__new__(SessionContext)
2022+
new.ctx = new_internal
2023+
return new

0 commit comments

Comments
 (0)