Skip to content

Commit 4665983

Browse files
committed
Merge branch 'main' into pr3-toggle-sender-strict
2 parents a41bf17 + fa021fe commit 4665983

14 files changed

Lines changed: 515 additions & 35 deletions

File tree

.ai/skills/check-upstream/SKILL.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,17 @@ The user may specify an area via `$ARGUMENTS`. If no area is specified or "all"
6666
- Python API: `python/datafusion/functions.py` — each function wraps a call to `datafusion._internal.functions`
6767
- Rust bindings: `crates/core/src/functions.rs``#[pyfunction]` definitions registered via `init_module()`
6868

69+
**Evaluated and not requiring separate Python exposure:**
70+
- `get_field_path` — already covered by `get_field(expr, *names)`, which takes a
71+
variadic field path and dispatches to the same underlying
72+
`functions::core::get_field` UDF as the upstream `get_field_path` helper.
73+
6974
**How to check:**
7075
1. Fetch the upstream scalar function documentation page
7176
2. Compare against functions listed in `python/datafusion/functions.py` (check the `__all__` list and function definitions)
7277
3. A function is covered if it exists in the Python API — it does NOT need a dedicated Rust `#[pyfunction]`. Many functions are aliases that reuse another function's Rust binding.
73-
4. Only report functions that are missing from the Python `__all__` list / function definitions
78+
4. Check against the "evaluated and not requiring exposure" list before flagging as a gap
79+
5. Only report functions that are missing from the Python `__all__` list / function definitions
7480

7581
### 2. Aggregate Functions
7682

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
repos:
1919
- repo: https://github.com/rhysd/actionlint
20-
rev: v1.7.6
20+
rev: v1.7.12
2121
hooks:
2222
- id: actionlint-docker
2323
- repo: https://github.com/astral-sh/ruff-pre-commit

crates/core/src/context.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use datafusion::datasource::listing::{
3535
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
3636
};
3737
use datafusion::datasource::{MemTable, TableProvider};
38-
use datafusion::execution::TaskContextProvider;
3938
use datafusion::execution::context::{
4039
DataFilePaths, SQLOptions, SessionConfig, SessionContext, TaskContext,
4140
};
@@ -44,6 +43,7 @@ use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, Unboun
4443
use datafusion::execution::options::{ArrowReadOptions, ReadOptions};
4544
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
4645
use datafusion::execution::session_state::SessionStateBuilder;
46+
use datafusion::execution::{FunctionRegistry, TaskContextProvider};
4747
use datafusion::prelude::{
4848
AvroReadOptions, CsvReadOptions, DataFrame, JsonReadOptions, ParquetReadOptions,
4949
};
@@ -847,6 +847,13 @@ impl PySessionContext {
847847
Ok(())
848848
}
849849

850+
pub fn read_batches(
851+
&self,
852+
batches: PyArrowType<Vec<RecordBatch>>,
853+
) -> PyDataFusionResult<PyDataFrame> {
854+
Ok(PyDataFrame::new(self.ctx.read_batches(batches.0)?))
855+
}
856+
850857
#[allow(clippy::too_many_arguments)]
851858
#[pyo3(signature = (name, path, table_partition_cols=vec![],
852859
parquet_pruning=true,
@@ -1065,6 +1072,48 @@ impl PySessionContext {
10651072
self.ctx.deregister_udwf(name);
10661073
}
10671074

1075+
pub fn udf(&self, name: &str) -> PyResult<PyScalarUDF> {
1076+
if !self.ctx.udfs().contains(name) {
1077+
return Err(PyKeyError::new_err(format!("no UDF named '{name}'")));
1078+
}
1079+
let function = (*self.ctx.udf(name).map_err(py_datafusion_err)?).clone();
1080+
Ok(PyScalarUDF { function })
1081+
}
1082+
1083+
pub fn udaf(&self, name: &str) -> PyResult<PyAggregateUDF> {
1084+
if !self.ctx.udafs().contains(name) {
1085+
return Err(PyKeyError::new_err(format!("no UDAF named '{name}'")));
1086+
}
1087+
let function = (*self.ctx.udaf(name).map_err(py_datafusion_err)?).clone();
1088+
Ok(PyAggregateUDF { function })
1089+
}
1090+
1091+
pub fn udwf(&self, name: &str) -> PyResult<PyWindowUDF> {
1092+
if !self.ctx.udwfs().contains(name) {
1093+
return Err(PyKeyError::new_err(format!("no UDWF named '{name}'")));
1094+
}
1095+
let function = (*self.ctx.udwf(name).map_err(py_datafusion_err)?).clone();
1096+
Ok(PyWindowUDF { function })
1097+
}
1098+
1099+
pub fn udfs(&self) -> Vec<String> {
1100+
let mut names: Vec<String> = self.ctx.udfs().into_iter().collect();
1101+
names.sort();
1102+
names
1103+
}
1104+
1105+
pub fn udafs(&self) -> Vec<String> {
1106+
let mut names: Vec<String> = self.ctx.udafs().into_iter().collect();
1107+
names.sort();
1108+
names
1109+
}
1110+
1111+
pub fn udwfs(&self) -> Vec<String> {
1112+
let mut names: Vec<String> = self.ctx.udwfs().into_iter().collect();
1113+
names.sort();
1114+
names
1115+
}
1116+
10681117
#[pyo3(signature = (name="datafusion"))]
10691118
pub fn catalog(&self, py: Python, name: &str) -> PyResult<Py<PyAny>> {
10701119
let catalog = self.ctx.catalog(name).ok_or(PyKeyError::new_err(format!(

crates/core/src/functions.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -574,10 +574,10 @@ expr_fn!(union_tag, arg1);
574574
expr_fn!(random);
575575

576576
#[pyfunction]
577-
fn get_field(expr: PyExpr, name: PyExpr) -> PyExpr {
578-
functions::core::get_field()
579-
.call(vec![expr.into(), name.into()])
580-
.into()
577+
fn get_field(expr: PyExpr, names: Vec<PyExpr>) -> PyExpr {
578+
let mut args = vec![expr.into()];
579+
args.extend(names.into_iter().map(Into::into));
580+
functions::core::get_field().call(args).into()
581581
}
582582

583583
#[pyfunction]

examples/datafusion-ffi-example/src/table_function.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717

1818
use std::sync::Arc;
1919

20-
use datafusion_catalog::{TableFunctionImpl, TableProvider};
20+
use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider};
2121
use datafusion_common::error::Result as DataFusionResult;
22-
use datafusion_expr::Expr;
2322
use datafusion_ffi::udtf::FFI_TableFunction;
2423
use datafusion_python_util::ffi_logical_codec_from_pycapsule;
2524
use pyo3::types::PyCapsule;
@@ -59,7 +58,7 @@ impl MyTableFunction {
5958
}
6059

6160
impl TableFunctionImpl for MyTableFunction {
62-
fn call(&self, _args: &[Expr]) -> DataFusionResult<Arc<dyn TableProvider>> {
61+
fn call_with_args(&self, _args: TableFunctionArgs) -> DataFusionResult<Arc<dyn TableProvider>> {
6362
let provider = MyTableProvider::new(4, 3, 2).create_table()?;
6463
Ok(Arc::new(provider))
6564
}

python/datafusion/context.py

Lines changed: 201 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,20 @@
8282

8383
if TYPE_CHECKING:
8484
import pathlib
85-
from collections.abc import Sequence
85+
from collections.abc import Iterable, Sequence
8686

8787
import pandas as pd
8888
import polars as pl # type: ignore[import]
89+
from _typeshed import CapsuleType as _PyCapsule
8990

9091
from datafusion.catalog import CatalogProvider, Table
9192
from datafusion.common import DFSchema
9293
from datafusion.expr import Expr, SortKey
9394
from datafusion.plan import ExecutionPlan, LogicalPlan
9495
from datafusion.user_defined import (
9596
AggregateUDF,
97+
LogicalExtensionCodecExportable,
98+
PhysicalExtensionCodecExportable,
9699
ScalarUDF,
97100
TableFunction,
98101
WindowUDF,
@@ -959,6 +962,52 @@ def register_record_batches(
959962
"""
960963
self.ctx.register_record_batches(name, partitions)
961964

965+
def read_batch(self, batch: pa.RecordBatch) -> DataFrame:
966+
"""Return a :py:class:`~datafusion.DataFrame` reading a single batch.
967+
968+
Convenience wrapper around :py:meth:`read_batches` for the single-batch
969+
case. Unlike :py:meth:`register_batch`, this does not register the
970+
batch as a named table; it returns an anonymous
971+
:py:class:`~datafusion.DataFrame` directly.
972+
973+
Args:
974+
batch: Record batch to wrap as a DataFrame.
975+
976+
Examples:
977+
>>> ctx = dfn.SessionContext()
978+
>>> batch = pa.RecordBatch.from_pydict({"a": [1, 2, 3]})
979+
>>> ctx.read_batch(batch).to_pydict()
980+
{'a': [1, 2, 3]}
981+
"""
982+
return self.read_batches([batch])
983+
984+
def read_batches(self, batches: Iterable[pa.RecordBatch]) -> DataFrame:
985+
"""Return a :py:class:`~datafusion.DataFrame` reading the given batches.
986+
987+
All batches must share the same schema. Any iterable of
988+
:py:class:`pa.RecordBatch` is accepted (list, tuple, generator);
989+
it is materialized into a list before being handed to the
990+
underlying Rust binding. Unlike :py:meth:`register_record_batches`,
991+
this does not register the batches as a named table; it returns
992+
an anonymous :py:class:`~datafusion.DataFrame` directly.
993+
994+
Args:
995+
batches: Record batches to wrap as a DataFrame.
996+
997+
Examples:
998+
>>> ctx = dfn.SessionContext()
999+
>>> b1 = pa.RecordBatch.from_pydict({"a": [1, 2]})
1000+
>>> b2 = pa.RecordBatch.from_pydict({"a": [3, 4]})
1001+
>>> ctx.read_batches([b1, b2]).to_pydict()
1002+
{'a': [1, 2, 3, 4]}
1003+
1004+
A generator works too:
1005+
1006+
>>> ctx.read_batches(b for b in [b1, b2]).to_pydict()
1007+
{'a': [1, 2, 3, 4]}
1008+
"""
1009+
return DataFrame(self.ctx.read_batches(list(batches)))
1010+
9621011
def register_parquet(
9631012
self,
9641013
name: str,
@@ -1268,6 +1317,145 @@ def deregister_udwf(self, name: str) -> None:
12681317
"""
12691318
self.ctx.deregister_udwf(name)
12701319

1320+
def udf(self, name: str) -> ScalarUDF:
1321+
"""Look up a registered scalar UDF by name.
1322+
1323+
Returns the same ``ScalarUDF`` wrapper that :py:meth:`register_udf`
1324+
accepts, so it can be invoked as an expression in the DataFrame API
1325+
or re-registered into a different :py:class:`SessionContext`.
1326+
Built-in scalar functions from the session's function registry are
1327+
also looked up.
1328+
1329+
Args:
1330+
name: Name of the registered scalar UDF.
1331+
1332+
Raises:
1333+
KeyError: If no scalar UDF is registered under ``name``.
1334+
1335+
Examples:
1336+
Register a UDF, then look it up by name and use it in the
1337+
DataFrame API:
1338+
1339+
>>> ctx = dfn.SessionContext()
1340+
>>> nullcheck = dfn.udf(
1341+
... lambda x: x.is_null(),
1342+
... [pa.int64()],
1343+
... pa.bool_(),
1344+
... volatility="immutable",
1345+
... name="nullcheck",
1346+
... )
1347+
>>> ctx.register_udf(nullcheck)
1348+
>>> fn = ctx.udf("nullcheck")
1349+
>>> df = ctx.from_pydict({"a": [1, None, 3]})
1350+
>>> df.select(fn(col("a")).alias("is_null")).to_pydict()
1351+
{'is_null': [False, True, False]}
1352+
1353+
Late-binding: the function name can come from configuration
1354+
rather than an imported symbol, which is useful when the set
1355+
of UDFs is plugin-driven or chosen at runtime:
1356+
1357+
>>> config = {"null_check": "nullcheck"}
1358+
>>> fn = ctx.udf(config["null_check"])
1359+
>>> df.select(fn(col("a")).alias("is_null")).to_pydict()
1360+
{'is_null': [False, True, False]}
1361+
"""
1362+
from datafusion.user_defined import ScalarUDF as _ScalarUDF # noqa: PLC0415
1363+
1364+
return _ScalarUDF._from_internal(self.ctx.udf(name))
1365+
1366+
def udaf(self, name: str) -> AggregateUDF:
1367+
"""Look up a registered aggregate UDF by name.
1368+
1369+
Returns the same ``AggregateUDF`` wrapper that :py:meth:`register_udaf`
1370+
accepts. Built-in aggregate functions such as ``sum`` or ``avg`` are
1371+
also discoverable through this lookup. See :py:meth:`udf` for a worked
1372+
late-binding example; the pattern is identical for aggregates.
1373+
1374+
Args:
1375+
name: Name of the registered aggregate UDF.
1376+
1377+
Raises:
1378+
KeyError: If no aggregate UDF is registered under ``name``.
1379+
1380+
Examples:
1381+
Look up a built-in aggregate by name and use it in
1382+
:py:meth:`~datafusion.DataFrame.aggregate`:
1383+
1384+
>>> ctx = dfn.SessionContext()
1385+
>>> sum_fn = ctx.udaf("sum")
1386+
>>> df = ctx.from_pydict({"a": [1, 2, 3]})
1387+
>>> df.aggregate([], [sum_fn(col("a")).alias("total")]).to_pydict()
1388+
{'total': [6]}
1389+
"""
1390+
from datafusion.user_defined import ( # noqa: PLC0415
1391+
AggregateUDF as _AggregateUDF,
1392+
)
1393+
1394+
return _AggregateUDF._from_internal(self.ctx.udaf(name))
1395+
1396+
def udwf(self, name: str) -> WindowUDF:
1397+
"""Look up a registered window UDF by name.
1398+
1399+
Returns the same ``WindowUDF`` wrapper that :py:meth:`register_udwf`
1400+
accepts. Built-in window functions such as ``row_number`` or ``rank``
1401+
are also discoverable through this lookup. See :py:meth:`udf` for a
1402+
worked late-binding example; the pattern is identical for window
1403+
functions.
1404+
1405+
Args:
1406+
name: Name of the registered window UDF.
1407+
1408+
Raises:
1409+
KeyError: If no window UDF is registered under ``name``.
1410+
1411+
Examples:
1412+
Look up a built-in window function by name and use it in
1413+
``select``:
1414+
1415+
>>> ctx = dfn.SessionContext()
1416+
>>> rn = ctx.udwf("row_number")
1417+
>>> df = ctx.from_pydict({"a": [10, 20, 30]})
1418+
>>> df.select(col("a"), rn().alias("rn")).to_pydict()
1419+
{'a': [10, 20, 30], 'rn': [1, 2, 3]}
1420+
"""
1421+
from datafusion.user_defined import WindowUDF as _WindowUDF # noqa: PLC0415
1422+
1423+
return _WindowUDF._from_internal(self.ctx.udwf(name))
1424+
1425+
def udfs(self) -> list[str]:
1426+
"""Return the sorted names of all registered scalar UDFs.
1427+
1428+
Includes both user-registered and built-in scalar functions. Pair
1429+
with :py:meth:`udf` to drive discovery, validation, or config-based
1430+
dispatch.
1431+
1432+
Examples:
1433+
>>> ctx = dfn.SessionContext()
1434+
>>> "abs" in ctx.udfs()
1435+
True
1436+
"""
1437+
return self.ctx.udfs()
1438+
1439+
def udafs(self) -> list[str]:
1440+
"""Return the sorted names of all registered aggregate UDFs.
1441+
1442+
Examples:
1443+
>>> ctx = dfn.SessionContext()
1444+
>>> "sum" in ctx.udafs()
1445+
True
1446+
"""
1447+
return self.ctx.udafs()
1448+
1449+
def udwfs(self) -> list[str]:
1450+
"""Return the sorted names of all registered window UDFs.
1451+
1452+
Examples:
1453+
>>> ctx = dfn.SessionContext()
1454+
>>> "row_number" in ctx.udwfs()
1455+
True
1456+
"""
1457+
return self.ctx.udwfs()
1458+
12711459
def catalog(self, name: str = "datafusion") -> Catalog:
12721460
"""Retrieve a catalog by name."""
12731461
return Catalog(self.ctx.catalog(name))
@@ -1744,11 +1932,14 @@ def __datafusion_logical_extension_codec__(self) -> Any:
17441932
"""Access the PyCapsule FFI_LogicalExtensionCodec."""
17451933
return self.ctx.__datafusion_logical_extension_codec__()
17461934

1747-
def with_logical_extension_codec(self, codec: Any) -> SessionContext:
1935+
def with_logical_extension_codec(
1936+
self, codec: LogicalExtensionCodecExportable | _PyCapsule
1937+
) -> SessionContext:
17481938
"""Create a new session context with specified codec.
17491939
1750-
This only supports codecs that have been implemented using the
1751-
FFI interface.
1940+
Only FFI codecs are supported. Pass any object implementing
1941+
``__datafusion_logical_extension_codec__`` (see
1942+
:py:class:`~datafusion.user_defined.LogicalExtensionCodecExportable`).
17521943
"""
17531944
new_internal = self.ctx.with_logical_extension_codec(codec)
17541945
new = SessionContext.__new__(SessionContext)
@@ -1759,11 +1950,14 @@ def __datafusion_physical_extension_codec__(self) -> Any:
17591950
"""Access the PyCapsule FFI_PhysicalExtensionCodec."""
17601951
return self.ctx.__datafusion_physical_extension_codec__()
17611952

1762-
def with_physical_extension_codec(self, codec: Any) -> SessionContext:
1953+
def with_physical_extension_codec(
1954+
self, codec: PhysicalExtensionCodecExportable | _PyCapsule
1955+
) -> SessionContext:
17631956
"""Create a new session context with the specified physical codec.
17641957
1765-
This only supports codecs that have been implemented using the
1766-
FFI interface.
1958+
Only FFI codecs are supported. Pass any object implementing
1959+
``__datafusion_physical_extension_codec__`` (see
1960+
:py:class:`~datafusion.user_defined.PhysicalExtensionCodecExportable`).
17671961
"""
17681962
new_internal = self.ctx.with_physical_extension_codec(codec)
17691963
new = SessionContext.__new__(SessionContext)

0 commit comments

Comments
 (0)