Introduce DistributedLeafExec#467
Conversation
03f6905 to
0357def
Compare
| /// Test that verifies PartitionIsolatorExec nodes are preserved during metrics collection. | ||
| /// This tests the corner case where PartitionIsolatorExec nodes (which have no metrics) | ||
| /// must still be included in the metrics collection to maintain correct node-to-metric mapping. | ||
| #[tokio::test] | ||
| async fn test_metrics_collection_with_partition_isolator() { |
There was a problem hiding this comment.
Test becomes irrelevant, partition isolator no longer exists
| let mut node_idx = 0; | ||
| // Phase 1 — accumulate per-task metrics into a map keyed by node identity. | ||
| // | ||
| // For each task, the plan is traversed with `apply_with_ctx`, which visits nodes in pre-order | ||
| // traversal, ignoring branches that do not belong to the recursed DistributedTaskContext | ||
| // (e.g., because of the presence of ChildrenIsolatorUnionExec). | ||
| // | ||
| // The raw allocation address of each `Arc<dyn ExecutionPlan>` as the node key. | ||
| // The planning plan is not modified between traversals, so these addresses are stable. | ||
| let mut node_metrics_map: HashMap<usize, MetricsSet> = HashMap::new(); |
There was a problem hiding this comment.
This needed to change to adhere to the fact that different tasks will have different plans. Before it was a bit easier, because all the plans were the same across tasks inside the same stage, so recursing it once was enough.
However, now, depending on the DistributedTaskContext the plan that gets executed in the different tasks might be completely different, and therefore we need to recurse once per task index.
There was a problem hiding this comment.
Ack. This looks ok.
There was a problem hiding this comment.
I see you've updated the collection side in src/worker/impl_execute_task.rs
There was a problem hiding this comment.
Yeah, the collection side and this code need to follow the same recursion path through the plan, as collected metrics assumes there's a match in the pre-order indexes collected while recursing.
Pretty much same as before, but now per-task rather than globally.
e777f40 to
071dcc0
Compare
0357def to
21b7689
Compare
071dcc0 to
b01c6fc
Compare
9196264 to
8eb191c
Compare
aedad16 to
af69055
Compare
8eb191c to
249311e
Compare
249311e to
613e8db
Compare
12a761b to
c5e321e
Compare
613e8db to
7175232
Compare
There was a problem hiding this comment.
Glad to see this change!
| let mut file_groups = VecDeque::with_capacity(file_scan.file_groups.len() * task_count); | ||
| for file_group in file_scan_template.file_groups.drain(..) { | ||
| file_groups.extend(file_group.split_files(task_count)); | ||
| } |
There was a problem hiding this comment.
Off topic but I always felt that this should flatten out the files and generate new file groups from those flat files.
Say you have 3 tasks and 2 file groups:
group1: 1 file
group2: 12 files
This code will result in 4 file groups with 1, 4, 4, 4 files respectively. Ideally you want 6 file groups with 2, 2, 2, 2, 2, 3.
There was a problem hiding this comment.
Ah I see you push the empty file group FileGroup::new(vec![]) to avoid this problem.
There was a problem hiding this comment.
If it's easy to do, we could avoid putting empty file groups by distributing the files better.
There was a problem hiding this comment.
Yeah, I think this can be improved. There's some on-going work for that in #450.
For this PR, probably leaving it as it was is better, that way we qualify performance impact comparing apples to apples, but definitely worth improving.
There was a problem hiding this comment.
nit: diff is too big for github but in custom_routing_union_variant, having c0 ordered below c1 feels unnatural to me
│ DistributedUnionExec: t0:[c0(0/4)] t1:[c0(1/4)] t2:[c0(2/4)] t3:[c0(3/4)] t4:[c1]
│ DistributedLeafExec: URLEmitterExec: tasks=8 partitions=1 tag=left
│ DistributedLeafExec: URLEmitterExec: tasks=2 partitions=4 tag=right
There was a problem hiding this comment.
🤔 I'm not sure what you mean, isn't c0 on the side within the union rather than above or below?
| .iter() | ||
| .enumerate() | ||
| .map( | ||
| |(child_i, plan)| match children_to_keep.contains(&child_i) { |
There was a problem hiding this comment.
Probably worth using a map to loop up the child faster? The union may have many children.
There was a problem hiding this comment.
I'd not expect there to be too many children, so the overhead of hashing + indexing might actually outweight the benefits.
Let's do it though, from a semantic standpoint is cleaner.
| let mut node_idx = 0; | ||
| // Phase 1 — accumulate per-task metrics into a map keyed by node identity. | ||
| // | ||
| // For each task, the plan is traversed with `apply_with_ctx`, which visits nodes in pre-order | ||
| // traversal, ignoring branches that do not belong to the recursed DistributedTaskContext | ||
| // (e.g., because of the presence of ChildrenIsolatorUnionExec). | ||
| // | ||
| // The raw allocation address of each `Arc<dyn ExecutionPlan>` as the node key. | ||
| // The planning plan is not modified between traversals, so these addresses are stable. | ||
| let mut node_metrics_map: HashMap<usize, MetricsSet> = HashMap::new(); |
There was a problem hiding this comment.
Ack. This looks ok.
| let mut node_idx = 0; | ||
| // Phase 1 — accumulate per-task metrics into a map keyed by node identity. | ||
| // | ||
| // For each task, the plan is traversed with `apply_with_ctx`, which visits nodes in pre-order | ||
| // traversal, ignoring branches that do not belong to the recursed DistributedTaskContext | ||
| // (e.g., because of the presence of ChildrenIsolatorUnionExec). | ||
| // | ||
| // The raw allocation address of each `Arc<dyn ExecutionPlan>` as the node key. | ||
| // The planning plan is not modified between traversals, so these addresses are stable. | ||
| let mut node_metrics_map: HashMap<usize, MetricsSet> = HashMap::new(); |
There was a problem hiding this comment.
I see you've updated the collection side in src/worker/impl_execute_task.rs
This is one PR from the following stack of PRs:
Motivation
Part of #460 (DataFusion v54 upgrade).
DataFusion #21351 introduced work-stealing for
FileScanConfig: instead of each partition scanning a fixed, pre-assigned set of files, idle partitions can steal pending file work from a shared queue at execution time. This means the mapping partition → files is no longer determined at planning time.PartitionIsolatorExecwas built on exactly that assumption: it split the leafDataSourceExecintotask_count × original_partitionsfile groups at planning time and selected the right slice for each task viatask_indexat execution. With dynamic file assignment, this selection is no longer valid.Solution —
DistributedLeafExecReplace
PartitionIsolatorExecwithDistributedLeafExec, a new wrapper that bundles one pre-built per-task variant of the leaf plan for each distributed task. The task spawner swaps in the correct variant before the plan is serialised and sent to a worker — no runtime selection bytask_indexneeded.Before (
PartitionIsolatorExec)PartitionIsolatorExecusedtask_indexat execution to pick the right slice. With work-stealing this slice is meaningless: partition 0 may end up scanningf7, notf3.After (
DistributedLeafExec)Each per-task
DataSourceExechas its own, isolated file groups decided at planning time. The task spawner replacesDistributedLeafExecwith the right variant before serialising, so the worker receives a plainDataSourceExec— compatible with DataFusion's work-stealing scheduler.What changed
PartitionIsolatorExecremoved entirely.DistributedLeafExecintroduced: holdsoriginal(for planning-time properties and single-task execution) +variants(one per task).FileScanConfigTaskEstimator::scale_up_leaf_nodenow returns aDistributedLeafExecwrapping the pre-split variants instead of aPartitionIsolatorExec.DistributedLeafExecwith the per-task variant before serialisation (same location that already handledChildrenIsolatorUnionExec).ChildrenIsolatorUnionExeccorrectly (per-task node offsets) and to makeDistributedLeafExectransparent for metrics lookups.Performance
0 Impact on TPCH and Clickbench, but getting some good numbers on TPC-DS:
TOTAL: prev=264135.349226 ms, new=231281.42632199993 ms, diff=1.14 faster ✔