Skip to content

Commit f7550ec

Browse files
timsaucerclaude
andcommitted
docs: user guide page + runnable examples for distributing expressions
Wraps up the Expr-pickle work with the user-facing material: * docs/source/user-guide/io/distributing_work.rst — new user guide page covering the multiprocessing, Ray, and datafusion-distributed patterns. Includes the Security section that is the canonical home for the cloudpickle / pickle.loads threat model. * docs/source/user-guide/io/index.rst — toctree entry. * examples/multiprocessing_pickle_expr.py — runnable example: a Pool.map of a closure-capturing UDF across processes, with worker context registration in the initializer. * examples/ray_pickle_expr.py — Ray actor analogue. * examples/datafusion-ffi-example/python/tests/_test_pickle_strict_ffi.py — exercises the strict-mode refusal end to end against an FFI capsule scalar UDF (kept under the FFI example crate because the test needs that crate's compiled artifacts). * examples/README.md — index entries for the new files. Also tightens three docstrings that previously duplicated the security warning so they point at the canonical Security section instead: * PythonLogicalCodec::with_python_udf_inlining (rustdoc): one-line summary plus a relative pointer to distributing_work.rst and the upstream Python pickle module security warning. * SessionContext.with_python_udf_inlining: one-sentence summary plus :doc: link to the user guide. * datafusion.ipc module docstring: cross-reference to the user guide for the full pattern. The crate-level codec.rs module rustdoc also updates "pure-Python scalar UDFs" to "scalar / aggregate / window UDFs" now that all three are covered. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 081325a commit f7550ec

7 files changed

Lines changed: 765 additions & 12 deletions

File tree

crates/core/src/codec.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
//!
2020
//! Datafusion-python plans can carry references to Python-defined
2121
//! objects that the upstream protobuf codecs do not know how to
22-
//! serialize: pure-Python scalar UDFs, Python query-planning
23-
//! extensions, and so on. Their state lives inside `Py<PyAny>`
24-
//! callables and closures rather than being recoverable from a name
25-
//! in the receiver's function registry. To ship a plan across a
26-
//! process boundary (pickle, `multiprocessing`, Ray actor,
22+
//! serialize: pure-Python scalar / aggregate / window UDFs, Python
23+
//! query-planning extensions, and so on. Their state lives inside
24+
//! `Py<PyAny>` callables and closures rather than being recoverable
25+
//! from a name in the receiver's function registry. To ship a plan
26+
//! across a process boundary (pickle, `multiprocessing`, Ray actor,
2727
//! `datafusion-distributed`, etc.) those payloads have to be encoded
2828
//! into the proto wire format itself.
2929
//!
@@ -256,7 +256,12 @@ impl PythonLogicalCodec {
256256
/// `cloudpickle.loads` on the inline `DFPY*` payload. It does
257257
/// **not** make `pickle.loads(untrusted_bytes)` safe; treat every
258258
/// `pickle.loads` on untrusted input as unsafe regardless of this
259-
/// setting.
259+
/// setting. See `docs/source/user-guide/io/distributing_work.rst`
260+
/// (Security section) for the full threat model, and Python's
261+
/// [pickle module security warning][1] for why `pickle.loads` is
262+
/// unsafe in general.
263+
///
264+
/// [1]: https://docs.python.org/3/library/pickle.html#module-pickle
260265
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
261266
self.python_udf_inlining = enabled;
262267
self
@@ -433,7 +438,7 @@ fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusio
433438
/// encoding on this layer too — otherwise a plan with a Python UDF
434439
/// would round-trip at the logical level but break at the physical
435440
/// level. Both layers reuse the shared payload framing
436-
/// ([`PY_SCALAR_UDF_FAMILY`]) so the wire format is identical.
441+
/// ([`PY_SCALAR_UDF_FAMILY`] et al.) so the wire format is identical.
437442
#[derive(Debug)]
438443
pub struct PythonPhysicalCodec {
439444
inner: Arc<dyn PhysicalExtensionCodec>,
@@ -563,10 +568,10 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
563568
// =============================================================================
564569

565570
/// Encode a Python scalar UDF inline if `node` is one. Returns
566-
/// `Ok(true)` when the payload (`DFPYUDF` family prefix, version byte,
567-
/// cloudpickled tuple) was written and the caller should skip its
568-
/// inner codec. Returns `Ok(false)` for any non-Python UDF, signalling
569-
/// the caller to delegate to its `inner`.
571+
/// `Ok(true)` when the payload (`DFPYUDF1` prefix + cloudpickled
572+
/// tuple) was written and the caller should skip its inner codec.
573+
/// Returns `Ok(false)` for any non-Python UDF, signalling the caller
574+
/// to delegate to its `inner`.
570575
pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<bool> {
571576
let Some(py_udf) = node.inner().downcast_ref::<PythonFunctionScalarUDF>() else {
572577
return Ok(false);
@@ -806,7 +811,7 @@ fn read_framed_payload<'a>(
806811

807812
/// Cached handle to the `cloudpickle` module.
808813
///
809-
/// The encode/decode helpers above would otherwise re-resolve the
814+
/// Six encode/decode helpers below would otherwise re-resolve the
810815
/// module on every call. `py.import` is backed by `sys.modules` and
811816
/// therefore cheap, but each call still walks a dict and re-binds the
812817
/// result; a plan with many Python UDFs pays that cost per UDF.

0 commit comments

Comments
 (0)