-
Notifications
You must be signed in to change notification settings - Fork 148
Add Python bindings for accessing ExecutionMetrics #1381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,8 @@ | |
| __all__ = [ | ||
| "ExecutionPlan", | ||
| "LogicalPlan", | ||
| "Metric", | ||
| "MetricsSet", | ||
| ] | ||
|
|
||
|
|
||
|
|
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes: | |
| Tables created in memory from record batches are currently not supported. | ||
| """ | ||
| return self._raw_plan.to_proto() | ||
|
|
||
| def metrics(self) -> MetricsSet | None: | ||
| """Return metrics for this plan node after execution, or None if unavailable.""" | ||
| raw = self._raw_plan.metrics() | ||
| if raw is None: | ||
| return None | ||
| return MetricsSet(raw) | ||
|
|
||
| def collect_metrics(self) -> list[tuple[str, MetricsSet]]: | ||
| """Walk the plan tree and collect metrics from all operators. | ||
|
|
||
| Returns a list of (operator_name, MetricsSet) tuples. | ||
|
Comment on lines
+165
to
+167
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Walk the plan tree and collect metrics" probably does not make a lot of sense to someone other than a developer. I think we can make this more user focused.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't dug in, but is |
||
| """ | ||
| result: list[tuple[str, MetricsSet]] = [] | ||
|
|
||
| def _walk(node: ExecutionPlan) -> None: | ||
| ms = node.metrics() | ||
| if ms is not None: | ||
| result.append((node.display(), ms)) | ||
| for child in node.children(): | ||
| _walk(child) | ||
|
|
||
| _walk(self) | ||
| return result | ||
|
|
||
|
|
||
| class MetricsSet: | ||
| """A set of metrics for a single execution plan operator. | ||
|
|
||
| Provides both individual metric access and convenience aggregations | ||
| across partitions. | ||
|
Comment on lines
+185
to
+186
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit of an explanation is probably useful here. Again, I don't think we can assume the user understands that there are both individual execution plan metrics as well as aggregate. I think that some operators have metrics that cannot be aggregated. In general I suspect we really do need some high level documentation with examples we can point to that makes all of this more concrete.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On second read I now see this is aggregating across partitions. So does that mean the |
||
| """ | ||
|
|
||
| def __init__(self, raw: df_internal.MetricsSet) -> None: | ||
| """This constructor should not be called by the end user.""" | ||
| self._raw = raw | ||
|
|
||
| def metrics(self) -> list[Metric]: | ||
| """Return all individual metrics in this set.""" | ||
| return [Metric(m) for m in self._raw.metrics()] | ||
|
|
||
| @property | ||
| def output_rows(self) -> int | None: | ||
| """Sum of output_rows across all partitions.""" | ||
| return self._raw.output_rows() | ||
|
|
||
| @property | ||
| def elapsed_compute(self) -> int | None: | ||
| """Sum of elapsed_compute across all partitions, in nanoseconds.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably want to describe what |
||
| return self._raw.elapsed_compute() | ||
|
|
||
| @property | ||
| def spill_count(self) -> int | None: | ||
| """Sum of spill_count across all partitions.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same with spill count. Do you know what units it has? |
||
| return self._raw.spill_count() | ||
|
|
||
| @property | ||
| def spilled_bytes(self) -> int | None: | ||
| """Sum of spilled_bytes across all partitions.""" | ||
| return self._raw.spilled_bytes() | ||
|
|
||
| @property | ||
| def spilled_rows(self) -> int | None: | ||
| """Sum of spilled_rows across all partitions.""" | ||
| return self._raw.spilled_rows() | ||
|
|
||
| def sum_by_name(self, name: str) -> int | None: | ||
| """Return the sum of metrics matching the given name.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sum across partitions, I presume. This is different because it works for an arbitrary metric, right? |
||
| return self._raw.sum_by_name(name) | ||
|
|
||
| def __repr__(self) -> str: | ||
| """Return a string representation of the metrics set.""" | ||
| return repr(self._raw) | ||
|
|
||
|
|
||
| class Metric: | ||
| """A single execution metric with name, value, partition, and labels.""" | ||
|
|
||
| def __init__(self, raw: df_internal.Metric) -> None: | ||
| """This constructor should not be called by the end user.""" | ||
| self._raw = raw | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| """The name of this metric (e.g. ``output_rows``).""" | ||
| return self._raw.name | ||
|
|
||
| @property | ||
| def value(self) -> int | None: | ||
| """The numeric value of this metric, or None for non-numeric types.""" | ||
| return self._raw.value | ||
|
Comment on lines
+245
to
+246
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it not possible to give values for non-numeric metrics? |
||
|
|
||
| @property | ||
| def partition(self) -> int | None: | ||
| """The partition this metric applies to, or None if global.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you know if this partition is a hash or an index or something else? I am trying to figure out how a user makes use of this information. |
||
| return self._raw.partition | ||
|
|
||
| def labels(self) -> dict[str, str]: | ||
| """Return the labels associated with this metric.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something to include in user documentation is an example of these labels. |
||
| return self._raw.labels() | ||
|
|
||
| def __repr__(self) -> str: | ||
| """Return a string representation of the metric.""" | ||
| return repr(self._raw) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,13 @@ | |
| # under the License. | ||
|
|
||
| import pytest | ||
| from datafusion import ExecutionPlan, LogicalPlan, SessionContext | ||
| from datafusion import ( | ||
| ExecutionPlan, | ||
| LogicalPlan, | ||
| Metric, | ||
| MetricsSet, | ||
| SessionContext, | ||
| ) | ||
|
|
||
|
|
||
| # Note: We must use CSV because memory tables are currently not supported for | ||
|
|
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None: | |
| execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes) | ||
|
|
||
| assert str(original_execution_plan) == str(execution_plan) | ||
|
|
||
|
|
||
| def test_metrics_tree_walk() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
| df.collect() | ||
| plan = df.execution_plan() | ||
|
|
||
| results = plan.collect_metrics() | ||
| assert len(results) >= 1 | ||
| found_metrics = False | ||
| for name, ms in results: | ||
| assert isinstance(name, str) | ||
| assert isinstance(ms, MetricsSet) | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
| found_metrics = True | ||
| assert found_metrics | ||
|
|
||
|
|
||
| def test_metric_properties() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
| df.collect() | ||
| plan = df.execution_plan() | ||
|
|
||
| for _, ms in plan.collect_metrics(): | ||
| r = repr(ms) | ||
| assert isinstance(r, str) | ||
| for metric in ms.metrics(): | ||
| assert isinstance(metric, Metric) | ||
| assert isinstance(metric.name, str) | ||
| assert len(metric.name) > 0 | ||
| assert metric.partition is None or isinstance(metric.partition, int) | ||
| assert isinstance(metric.labels(), dict) | ||
| mr = repr(metric) | ||
| assert isinstance(mr, str) | ||
| assert len(mr) > 0 | ||
| return | ||
| pytest.skip("No metrics found") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we want a skip here. I think we want to ensure in our unit test that we do generate metrics. I think we should know a priori what metrics we're getting in this test. While some values we can't use, some parts of it we should be able to test directly, like the metric name and labels. |
||
|
|
||
|
|
||
| def test_no_metrics_before_execution() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)") | ||
| df = ctx.sql("SELECT * FROM t") | ||
| plan = df.execution_plan() | ||
| ms = plan.metrics() | ||
| assert ms is None or ms.output_rows is None or ms.output_rows == 0 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should know exactly which condition should be hit. Do you know when the metrics is None vs returns no useful output? |
||
|
|
||
|
|
||
| def test_collect_partitioned_metrics() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
|
|
||
| df.collect_partitioned() | ||
| plan = df.execution_plan() | ||
|
|
||
| found_metrics = False | ||
| for _, ms in plan.collect_metrics(): | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should know exactly how many output rows to expect
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also for the unit tests to follow |
||
| found_metrics = True | ||
| assert found_metrics | ||
|
|
||
|
|
||
| def test_execute_stream_metrics() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
|
|
||
| for _ in df.execute_stream(): | ||
| pass | ||
|
|
||
| plan = df.execution_plan() | ||
| found_metrics = False | ||
| for _, ms in plan.collect_metrics(): | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
| found_metrics = True | ||
| assert found_metrics | ||
|
|
||
|
|
||
| def test_execute_stream_partitioned_metrics() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
|
|
||
| for stream in df.execute_stream_partitioned(): | ||
| for _ in stream: | ||
| pass | ||
|
|
||
| plan = df.execution_plan() | ||
| found_metrics = False | ||
| for _, ms in plan.collect_metrics(): | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
| found_metrics = True | ||
| assert found_metrics | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,13 @@ use pyo3::pybacked::PyBackedStr; | |
| use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods}; | ||
|
|
||
| use crate::common::data_type::PyScalarValue; | ||
| use datafusion::physical_plan::{ | ||
| ExecutionPlan as DFExecutionPlan, | ||
| collect as df_collect, | ||
| collect_partitioned as df_collect_partitioned, | ||
| execute_stream as df_execute_stream, | ||
| execute_stream_partitioned as df_execute_stream_partitioned, | ||
| }; | ||
| use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err}; | ||
| use crate::expr::PyExpr; | ||
| use crate::expr::sort_expr::{PySortExpr, to_sort_expressions}; | ||
|
|
@@ -289,6 +296,9 @@ pub struct PyDataFrame { | |
|
|
||
| // In IPython environment cache batches between __repr__ and _repr_html_ calls. | ||
| batches: SharedCachedBatches, | ||
|
|
||
| // Cache the last physical plan so that metrics are available after execution. | ||
| last_plan: Arc<Mutex<Option<Arc<dyn DFExecutionPlan>>>>, | ||
| } | ||
|
|
||
| impl PyDataFrame { | ||
|
|
@@ -297,6 +307,7 @@ impl PyDataFrame { | |
| Self { | ||
| df: Arc::new(df), | ||
| batches: Arc::new(Mutex::new(None)), | ||
| last_plan: Arc::new(Mutex::new(None)), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -626,7 +637,12 @@ impl PyDataFrame { | |
| /// Unless some order is specified in the plan, there is no | ||
| /// guarantee of the order of the result. | ||
| fn collect<'py>(&self, py: Python<'py>) -> PyResult<Vec<Bound<'py, PyAny>>> { | ||
| let batches = wait_for_future(py, self.df.as_ref().clone().collect())? | ||
| let df = self.df.as_ref().clone(); | ||
| let plan = wait_for_future(py, df.create_physical_plan())? | ||
| .map_err(PyDataFusionError::from)?; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
| let batches = wait_for_future(py, df_collect(plan, task_ctx))? | ||
|
Comment on lines
+640
to
+645
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I run |
||
| .map_err(PyDataFusionError::from)?; | ||
| // cannot use PyResult<Vec<RecordBatch>> return type due to | ||
| // https://github.com/PyO3/pyo3/issues/1813 | ||
|
|
@@ -642,7 +658,12 @@ impl PyDataFrame { | |
| /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch | ||
| /// maintaining the input partitioning. | ||
| fn collect_partitioned<'py>(&self, py: Python<'py>) -> PyResult<Vec<Vec<Bound<'py, PyAny>>>> { | ||
| let batches = wait_for_future(py, self.df.as_ref().clone().collect_partitioned())? | ||
| let df = self.df.as_ref().clone(); | ||
| let plan = wait_for_future(py, df.create_physical_plan())? | ||
| .map_err(PyDataFusionError::from)?; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
| let batches = wait_for_future(py, df_collect_partitioned(plan, task_ctx))? | ||
| .map_err(PyDataFusionError::from)?; | ||
|
|
||
| batches | ||
|
|
@@ -802,7 +823,13 @@ impl PyDataFrame { | |
| } | ||
|
|
||
| /// Get the execution plan for this `DataFrame` | ||
| /// | ||
| /// If the DataFrame has already been executed (e.g. via `collect()`), | ||
| /// returns the cached plan which includes populated metrics. | ||
| fn execution_plan(&self, py: Python) -> PyDataFusionResult<PyExecutionPlan> { | ||
| if let Some(plan) = self.last_plan.lock().as_ref() { | ||
| return Ok(PyExecutionPlan::new(Arc::clone(plan))); | ||
| } | ||
| let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())??; | ||
| Ok(plan.into()) | ||
|
Comment on lines
+830
to
834
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you go the route of using the existing |
||
| } | ||
|
|
@@ -1127,13 +1154,22 @@ impl PyDataFrame { | |
|
|
||
| fn execute_stream(&self, py: Python) -> PyDataFusionResult<PyRecordBatchStream> { | ||
| let df = self.df.as_ref().clone(); | ||
| let stream = spawn_future(py, async move { df.execute_stream().await })?; | ||
| let plan = wait_for_future(py, df.create_physical_plan())??; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
| let stream = spawn_future(py, async move { df_execute_stream(plan, task_ctx) })?; | ||
| Ok(PyRecordBatchStream::new(stream)) | ||
| } | ||
|
|
||
| fn execute_stream_partitioned(&self, py: Python) -> PyResult<Vec<PyRecordBatchStream>> { | ||
| let df = self.df.as_ref().clone(); | ||
| let streams = spawn_future(py, async move { df.execute_stream_partitioned().await })?; | ||
| let plan = wait_for_future(py, df.create_physical_plan())? | ||
| .map_err(PyDataFusionError::from)?; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
|
Comment on lines
+1166
to
+1169
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels like we're doing this in a bunch of places, so maybe make a private helper function. |
||
| let streams = spawn_future(py, async move { | ||
| df_execute_stream_partitioned(plan, task_ctx) | ||
| })?; | ||
| Ok(streams.into_iter().map(PyRecordBatchStream::new).collect()) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is leading me to think we should have some high level documentation, probably in the DataFrame page (or a subpage under it). Some of the things it would be good to do are to explain to a user what kinds of information they could find under these metrics and why that data are not available until after the DataFrame has been executed.