Add Python bindings for accessing ExecutionMetrics#1381
Add Python bindings for accessing ExecutionMetrics#1381ShreyeshArangath wants to merge 2 commits intoapache:mainfrom
Conversation
timsaucer
left a comment
There was a problem hiding this comment.
At a high level, I think this could bring a lot of value. Thank you for putting in the work!
From an implementation perspective, did you consider instead of caching the prior execution plan that instead we simply add the collect() and execute_stream() and so forth on PyExecutionPlan? It seems like that would more closely mirror the upstream repo and simplify the code. I haven't spent a lot of time going through the details of why you're caching the prior plan, so it's very possible I missed something.
|
@timsaucer Thanks for the suggestion! Initially when I designed the change, I did consider moving Today, I think the users naturally treat a dataframe as the primary handle for a query: df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
batches = df.collect()Requiring metrics to go through ExecutionPlan would effectively change the model to look something like so df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
plan = df.execution_plan()
batches = plan.collect()
metrics = plan.collect_metrics()I thought that this would require users to restructure pipelines and thread a plan object through call chains purely to have access to metrics. The LoE required to get people to use it seemed high to me. My goal was to make minimal changes to how users can add support for metrics without changing how they run queries df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
batches = df.collect()
plan = df.execution_plan()
metrics = plan.collect_metrics()I’m happy to switch to the plan-based approach if we prefer stronger alignment with the upstream API, but I leaned toward this design to make observability easier to adopt without disrupting current usage patterns — lmk what you think |
075e1ec to
0a57da6
Compare
timsaucer
left a comment
There was a problem hiding this comment.
First off, I love this PR!
I've become convinced that your approach is better than what I was suggesting with regards to making them create a plan and execute!
One area I am concerned about is that when we do a display() we do bypass all of this mechanism. That is good and bad. The good is that the metrics are definitely going to be different between the smaller collection that happens when we display because it ends early. The bad is that as a user it's probably confusing to see the the data but then be told that we don't have the metrics for the data in front of them. What do you think?
The biggest area that I think is really necessary is around user facing documentation. I'm willing to chip in and help with this if you need. I think we want to tell the users how to use these metrics, both mechanically (like how you have to have executed the dataframe) and what information they provide. Plus there are differences between which stage of the plan you get them from and the fact that some metrics come from the different partitions as opposed to aggregate values.
| def metrics(self) -> MetricsSet | None: | ||
| """Return metrics for this plan node after execution, or None if unavailable.""" | ||
| raw = self._raw_plan.metrics() | ||
| if raw is None: | ||
| return None | ||
| return MetricsSet(raw) |
There was a problem hiding this comment.
This is leading me to think we should have some high level documentation, probably in the DataFrame page (or a subpage under it). Some of the things it would be good to do are to explain to a user what kinds of information they could find under these metrics and why that data are not available until after the DataFrame has been executed.
| """Walk the plan tree and collect metrics from all operators. | ||
|
|
||
| Returns a list of (operator_name, MetricsSet) tuples. |
There was a problem hiding this comment.
"Walk the plan tree and collect metrics" probably does not make a lot of sense to someone other than a developer. I think we can make this more user focused.
There was a problem hiding this comment.
I haven't dug in, but is operator_name the name of the execution plan?
| Provides both individual metric access and convenience aggregations | ||
| across partitions. |
There was a problem hiding this comment.
A bit of an explanation is probably useful here. Again, I don't think we can assume the user understands that there are both individual execution plan metrics as well as aggregate. I think that some operators have metrics that cannot be aggregated. In general I suspect we really do need some high level documentation with examples we can point to that makes all of this more concrete.
There was a problem hiding this comment.
On second read I now see this is aggregating across partitions. So does that mean the metrics() fn is returning per partition metrics for one ExecutionPlan? Asking for my understanding mostly.
|
|
||
| @property | ||
| def elapsed_compute(self) -> int | None: | ||
| """Sum of elapsed_compute across all partitions, in nanoseconds.""" |
There was a problem hiding this comment.
We probably want to describe what elapsed_compute is rather than assume user knowledge.
|
|
||
| @property | ||
| def spill_count(self) -> int | None: | ||
| """Sum of spill_count across all partitions.""" |
There was a problem hiding this comment.
Same with spill count. Do you know what units it has?
| let df = self.df.as_ref().clone(); | ||
| let plan = wait_for_future(py, df.create_physical_plan())? | ||
| .map_err(PyDataFusionError::from)?; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
| let batches = wait_for_future(py, df_collect(plan, task_ctx))? |
There was a problem hiding this comment.
If I run collect() twice on a DF, should we instead just do the lock on the last plan and clone it? I suspect there's not a huge performance difference the vast majority of the time as opposed to how you have it.
| if let Some(plan) = self.last_plan.lock().as_ref() { | ||
| return Ok(PyExecutionPlan::new(Arc::clone(plan))); | ||
| } | ||
| let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())??; | ||
| Ok(plan.into()) |
There was a problem hiding this comment.
If you go the route of using the existing last_plan for collect() like in my other comment then I think you could set it here just like you do in collect().
| let plan = wait_for_future(py, df.create_physical_plan())? | ||
| .map_err(PyDataFusionError::from)?; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); |
There was a problem hiding this comment.
It feels like we're doing this in a bunch of places, so maybe make a private helper function.
| self.metrics.output_rows() | ||
| } | ||
|
|
||
| /// Returns the sum of all `elapsed_compute` metrics in nanoseconds, or None if not present. |
There was a problem hiding this comment.
There is a lot of boiler plate comments like this where the function is self explanatory and not exposed to the end user.
|
|
||
| /// Returns the numeric value of this metric, or None for non-numeric types. | ||
| #[getter] | ||
| fn value(&self) -> Option<usize> { |
There was a problem hiding this comment.
It feels like we could return Option<Py<PyAny>> and try casting the value appropriately.
Which issue does this PR close?
Closes #1379
Rationale for this change
Today, DataFusion Python only exposes execution metrics through formatted console output via
explain(analyze=True). This makes it difficult to programmatically inspect execution behavior.There is currently no structured python API to access per-operator metrics such as
output_rows,elapsed_compute,spill_countand other runtime metrics collected during execution.This PR introduces APIs to surface the execution metrics, mirroring the Rust API in
datafusion::physical_plan::metrics.What changes are included in this PR?
PyDataFrameso the physical plan used during execution is retained and available for metrics access.metrics()method and addedcollect_metrics()helper to walk the execution plan tree and aggregate metrics from all operators.Are there any user-facing changes?
Users can now programmatically access execution metrics