Skip to content

Add Python bindings for accessing ExecutionMetrics#1381

Open
ShreyeshArangath wants to merge 2 commits intoapache:mainfrom
ShreyeshArangath:feat/support-metrics
Open

Add Python bindings for accessing ExecutionMetrics#1381
ShreyeshArangath wants to merge 2 commits intoapache:mainfrom
ShreyeshArangath:feat/support-metrics

Conversation

@ShreyeshArangath
Copy link

@ShreyeshArangath ShreyeshArangath commented Feb 15, 2026

Which issue does this PR close?

Closes #1379

Rationale for this change

Today, DataFusion Python only exposes execution metrics through formatted console output via explain(analyze=True). This makes it difficult to programmatically inspect execution behavior.

There is currently no structured python API to access per-operator metrics such as output_rows, elapsed_compute, spill_count and other runtime metrics collected during execution.

This PR introduces APIs to surface the execution metrics, mirroring the Rust API in datafusion::physical_plan::metrics.

What changes are included in this PR?

  • Added plan caching to PyDataFrame so the physical plan used during execution is retained and available for metrics access.
  • Kept the metrics() method and added collect_metrics() helper to walk the execution plan tree and aggregate metrics from all operators.

Are there any user-facing changes?

Users can now programmatically access execution metrics

  df = ctx.sql("SELECT * FROM t WHERE x > 1")
  df.collect()
  plan = df.execution_plan()
  metrics = plan.collect_metrics() 
  for operator_name, metrics_set in metrics:
      print(f"{operator_name}: {metrics_set.output_rows} rows")

@ShreyeshArangath ShreyeshArangath changed the title feat: add Python bindings for accessing ExecutionMetrics Add Python bindings for accessing ExecutionMetrics Feb 15, 2026
@ShreyeshArangath ShreyeshArangath marked this pull request as ready for review February 15, 2026 01:53
Copy link
Member

@timsaucer timsaucer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a high level, I think this could bring a lot of value. Thank you for putting in the work!

From an implementation perspective, did you consider instead of caching the prior execution plan that instead we simply add the collect() and execute_stream() and so forth on PyExecutionPlan? It seems like that would more closely mirror the upstream repo and simplify the code. I haven't spent a lot of time going through the details of why you're caching the prior plan, so it's very possible I missed something.

@ShreyeshArangath
Copy link
Author

ShreyeshArangath commented Feb 20, 2026

@timsaucer Thanks for the suggestion! Initially when I designed the change, I did consider moving collect() / execute_*() onto plan object. The reason I didn’t go that route was more about how observability fits into real usage patterns (from the cases that I have seen).

Today, I think the users naturally treat a dataframe as the primary handle for a query:

df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
batches = df.collect()

Requiring metrics to go through ExecutionPlan would effectively change the model to look something like so

df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
plan = df.execution_plan()
batches = plan.collect()
metrics = plan.collect_metrics()

I thought that this would require users to restructure pipelines and thread a plan object through call chains purely to have access to metrics. The LoE required to get people to use it seemed high to me.

My goal was to make minimal changes to how users can add support for metrics without changing how they run queries

df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
batches = df.collect()
plan = df.execution_plan()
metrics = plan.collect_metrics()

I’m happy to switch to the plan-based approach if we prefer stronger alignment with the upstream API, but I leaned toward this design to make observability easier to adopt without disrupting current usage patterns — lmk what you think

Copy link
Member

@timsaucer timsaucer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First off, I love this PR!

I've become convinced that your approach is better than what I was suggesting with regards to making them create a plan and execute!

One area I am concerned about is that when we do a display() we do bypass all of this mechanism. That is good and bad. The good is that the metrics are definitely going to be different between the smaller collection that happens when we display because it ends early. The bad is that as a user it's probably confusing to see the the data but then be told that we don't have the metrics for the data in front of them. What do you think?

The biggest area that I think is really necessary is around user facing documentation. I'm willing to chip in and help with this if you need. I think we want to tell the users how to use these metrics, both mechanically (like how you have to have executed the dataframe) and what information they provide. Plus there are differences between which stage of the plan you get them from and the fact that some metrics come from the different partitions as opposed to aggregate values.

Comment on lines +157 to +162
def metrics(self) -> MetricsSet | None:
"""Return metrics for this plan node after execution, or None if unavailable."""
raw = self._raw_plan.metrics()
if raw is None:
return None
return MetricsSet(raw)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is leading me to think we should have some high level documentation, probably in the DataFrame page (or a subpage under it). Some of the things it would be good to do are to explain to a user what kinds of information they could find under these metrics and why that data are not available until after the DataFrame has been executed.

Comment on lines +165 to +167
"""Walk the plan tree and collect metrics from all operators.

Returns a list of (operator_name, MetricsSet) tuples.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Walk the plan tree and collect metrics" probably does not make a lot of sense to someone other than a developer. I think we can make this more user focused.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't dug in, but is operator_name the name of the execution plan?

Comment on lines +185 to +186
Provides both individual metric access and convenience aggregations
across partitions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of an explanation is probably useful here. Again, I don't think we can assume the user understands that there are both individual execution plan metrics as well as aggregate. I think that some operators have metrics that cannot be aggregated. In general I suspect we really do need some high level documentation with examples we can point to that makes all of this more concrete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second read I now see this is aggregating across partitions. So does that mean the metrics() fn is returning per partition metrics for one ExecutionPlan? Asking for my understanding mostly.


@property
def elapsed_compute(self) -> int | None:
"""Sum of elapsed_compute across all partitions, in nanoseconds."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to describe what elapsed_compute is rather than assume user knowledge.


@property
def spill_count(self) -> int | None:
"""Sum of spill_count across all partitions."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with spill count. Do you know what units it has?

Comment on lines +640 to +645
let df = self.df.as_ref().clone();
let plan = wait_for_future(py, df.create_physical_plan())?
.map_err(PyDataFusionError::from)?;
*self.last_plan.lock() = Some(Arc::clone(&plan));
let task_ctx = Arc::new(self.df.as_ref().task_ctx());
let batches = wait_for_future(py, df_collect(plan, task_ctx))?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I run collect() twice on a DF, should we instead just do the lock on the last plan and clone it? I suspect there's not a huge performance difference the vast majority of the time as opposed to how you have it.

Comment on lines +830 to 834
if let Some(plan) = self.last_plan.lock().as_ref() {
return Ok(PyExecutionPlan::new(Arc::clone(plan)));
}
let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())??;
Ok(plan.into())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you go the route of using the existing last_plan for collect() like in my other comment then I think you could set it here just like you do in collect().

Comment on lines +1166 to +1169
let plan = wait_for_future(py, df.create_physical_plan())?
.map_err(PyDataFusionError::from)?;
*self.last_plan.lock() = Some(Arc::clone(&plan));
let task_ctx = Arc::new(self.df.as_ref().task_ctx());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like we're doing this in a bunch of places, so maybe make a private helper function.

self.metrics.output_rows()
}

/// Returns the sum of all `elapsed_compute` metrics in nanoseconds, or None if not present.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of boiler plate comments like this where the function is self explanatory and not exposed to the end user.


/// Returns the numeric value of this metric, or None for non-numeric types.
#[getter]
fn value(&self) -> Option<usize> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like we could return Option<Py<PyAny>> and try casting the value appropriately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Python bindings for accessing ExecutionMetrics

2 participants