Skip to content

Introduce DistributedLeafExec#467

Open
gabotechs wants to merge 3 commits into
gabrielmusat/factor-out-distributed-recursion-fixedfrom
gabrielmusat/remove-partition-isolator
Open

Introduce DistributedLeafExec#467
gabotechs wants to merge 3 commits into
gabrielmusat/factor-out-distributed-recursion-fixedfrom
gabrielmusat/remove-partition-isolator

Conversation

@gabotechs
Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs commented May 26, 2026

This is one PR from the following stack of PRs:

Motivation

Part of #460 (DataFusion v54 upgrade).

DataFusion #21351 introduced work-stealing for FileScanConfig: instead of each partition scanning a fixed, pre-assigned set of files, idle partitions can steal pending file work from a shared queue at execution time. This means the mapping partition → files is no longer determined at planning time.

PartitionIsolatorExec was built on exactly that assumption: it split the leaf DataSourceExec into task_count × original_partitions file groups at planning time and selected the right slice for each task via task_index at execution. With dynamic file assignment, this selection is no longer valid.

Solution — DistributedLeafExec

Replace PartitionIsolatorExec with DistributedLeafExec, a new wrapper that bundles one pre-built per-task variant of the leaf plan for each distributed task. The task spawner swaps in the correct variant before the plan is serialised and sent to a worker — no runtime selection by task_index needed.

Before (PartitionIsolatorExec)

planning time                    worker (task 1 of 3)
─────────────────────────────    ─────────────────────────────
DataSourceExec                   DataSourceExec
  file_groups = [f0, f1, f2,       file_groups = [f0, f1, f2,
                 f3, f4, f5,                        f3, f4, f5,
                 f6, f7, f8]                        f6, f7, f8]
↓ wrapped by                     ↓ selects slice [f3..f5]
PartitionIsolatorExec              at execution via task_index
  tasks=3 partitions=3

PartitionIsolatorExec used task_index at execution to pick the right slice. With work-stealing this slice is meaningless: partition 0 may end up scanning f7, not f3.

After (DistributedLeafExec)

planning time                    task spawner (before send)   worker (task 1 of 3)
─────────────────────────────    ──────────────────────────   ─────────────────────
DistributedLeafExec              pick variants[1]             DataSourceExec
  original: DataSourceExec   ─────────────────────────────►    file_groups =
    file_groups = [f0..f8]                                       [f3, f4, f5]
  variants:
    [0]: DataSourceExec
           file_groups=[f0,f1,f2]
    [1]: DataSourceExec
           file_groups=[f3,f4,f5]
    [2]: DataSourceExec
           file_groups=[f6,f7,f8]

Each per-task DataSourceExec has its own, isolated file groups decided at planning time. The task spawner replaces DistributedLeafExec with the right variant before serialising, so the worker receives a plain DataSourceExec — compatible with DataFusion's work-stealing scheduler.

What changed

  • PartitionIsolatorExec removed entirely.
  • DistributedLeafExec introduced: holds original (for planning-time properties and single-task execution) + variants (one per task).
  • FileScanConfigTaskEstimator::scale_up_leaf_node now returns a DistributedLeafExec wrapping the pre-split variants instead of a PartitionIsolatorExec.
  • Task spawner replaces DistributedLeafExec with the per-task variant before serialisation (same location that already handled ChildrenIsolatorUnionExec).
  • Stage metrics rewriter updated to traverse ChildrenIsolatorUnionExec correctly (per-task node offsets) and to make DistributedLeafExec transparent for metrics lookups.
  • All snapshot tests and integration tests updated.

Performance

0 Impact on TPCH and Clickbench, but getting some good numbers on TPC-DS:

TOTAL: prev=264135.349226 ms, new=231281.42632199993 ms, diff=1.14 faster ✔
      q1: prev= 677 ms, new= 489 ms, diff=1.38 faster ✅
      q2: prev= 455 ms, new= 433 ms, diff=1.05 faster ✔
      q3: prev= 267 ms, new= 287 ms, diff=1.07 slower ✖
      q4: prev=1418 ms, new=1360 ms, diff=1.04 faster ✔
      q5: prev= 658 ms, new= 481 ms, diff=1.37 faster ✅
      q6: prev=1013 ms, new= 751 ms, diff=1.35 faster ✅
      q7: prev= 394 ms, new= 343 ms, diff=1.15 faster ✔
      q8: prev= 417 ms, new= 459 ms, diff=1.10 slower ✖
      q9: prev= 369 ms, new= 370 ms, diff=1.00 slower ✖
     q10: prev=1028 ms, new= 785 ms, diff=1.31 faster ✅
     q11: prev=1142 ms, new=1125 ms, diff=1.02 faster ✔
     q12: prev= 248 ms, new= 216 ms, diff=1.15 faster ✔
     q13: prev= 739 ms, new= 612 ms, diff=1.21 faster ✅
     q14: prev= 990 ms, new= 813 ms, diff=1.22 faster ✅
     q15: prev= 262 ms, new= 159 ms, diff=1.65 faster ✅
     q16: prev= 741 ms, new= 467 ms, diff=1.59 faster ✅
     q17: prev= 412 ms, new= 280 ms, diff=1.47 faster ✅
     q18: prev= 354 ms, new= 317 ms, diff=1.12 faster ✔
     q19: prev= 421 ms, new= 282 ms, diff=1.49 faster ✅
     q20: prev= 366 ms, new= 167 ms, diff=2.19 faster ✅
     q21: prev= 560 ms, new= 339 ms, diff=1.65 faster ✅
     q22: prev= 685 ms, new= 556 ms, diff=1.23 faster ✅
     q23: prev= 823 ms, new= 634 ms, diff=1.30 faster ✅
     q24: prev= 759 ms, new= 518 ms, diff=1.47 faster ✅
     q25: prev= 304 ms, new= 221 ms, diff=1.38 faster ✅
     q26: prev= 201 ms, new= 184 ms, diff=1.09 faster ✔
     q27: prev= 378 ms, new= 481 ms, diff=1.27 slower ❌
     q28: prev= 288 ms, new= 198 ms, diff=1.45 faster ✅
     q29: prev= 347 ms, new= 314 ms, diff=1.11 faster ✔
     q31: prev= 381 ms, new= 266 ms, diff=1.43 faster ✅
     q32: prev= 304 ms, new= 225 ms, diff=1.35 faster ✅
     q33: prev= 370 ms, new= 319 ms, diff=1.16 faster ✔
     q34: prev= 433 ms, new= 291 ms, diff=1.49 faster ✅
     q35: prev= 814 ms, new= 709 ms, diff=1.15 faster ✔
     q36: prev=1104 ms, new= 507 ms, diff=2.18 faster ✅
     q37: prev= 505 ms, new= 437 ms, diff=1.16 faster ✔
     q38: prev= 342 ms, new= 386 ms, diff=1.13 slower ✖
     q39: prev= 367 ms, new= 417 ms, diff=1.14 slower ✖
     q40: prev= 448 ms, new= 452 ms, diff=1.01 slower ✖
     q41: prev= 110 ms, new= 130 ms, diff=1.18 slower ✖
     q42: prev= 148 ms, new= 183 ms, diff=1.24 slower ❌
     q43: prev= 208 ms, new= 189 ms, diff=1.10 faster ✔
     q44: prev= 234 ms, new= 271 ms, diff=1.16 slower ✖
     q45: prev= 332 ms, new= 229 ms, diff=1.45 faster ✅
     q46: prev= 656 ms, new= 486 ms, diff=1.35 faster ✅
     q47: prev= 491 ms, new= 413 ms, diff=1.19 faster ✔
     q48: prev= 570 ms, new= 480 ms, diff=1.19 faster ✔
     q49: prev= 393 ms, new= 344 ms, diff=1.14 faster ✔
     q50: prev= 538 ms, new= 405 ms, diff=1.33 faster ✅
     q51: prev= 265 ms, new= 246 ms, diff=1.08 faster ✔
     q52: prev= 139 ms, new= 178 ms, diff=1.28 slower ❌
     q53: prev= 354 ms, new= 239 ms, diff=1.48 faster ✅
     q54: prev= 497 ms, new= 408 ms, diff=1.22 faster ✅
     q55: prev= 221 ms, new= 155 ms, diff=1.43 faster ✅
     q56: prev= 438 ms, new= 359 ms, diff=1.22 faster ✅
     q57: prev= 422 ms, new= 363 ms, diff=1.16 faster ✔
     q58: prev= 340 ms, new= 389 ms, diff=1.14 slower ✖
     q59: prev= 392 ms, new= 392 ms, diff=1.00 slower ✖
     q60: prev= 446 ms, new= 350 ms, diff=1.27 faster ✅
     q61: prev=1610 ms, new= 958 ms, diff=1.68 faster ✅
     q62: prev= 713 ms, new= 722 ms, diff=1.01 slower ✖
     q63: prev= 243 ms, new= 213 ms, diff=1.14 faster ✔
     q64: prev=1411 ms, new=1253 ms, diff=1.13 faster ✔
     q65: prev= 291 ms, new= 315 ms, diff=1.08 slower ✖
     q66: prev= 796 ms, new= 716 ms, diff=1.11 faster ✔
     q67: prev= 459 ms, new= 511 ms, diff=1.11 slower ✖
     q68: prev= 481 ms, new= 441 ms, diff=1.09 faster ✔
     q69: prev= 746 ms, new= 558 ms, diff=1.34 faster ✅
     q70: prev= 471 ms, new= 497 ms, diff=1.06 slower ✖
     q71: prev= 472 ms, new= 307 ms, diff=1.54 faster ✅
     q72: prev=5412 ms, new=5622 ms, diff=1.04 slower ✖
     q73: prev= 239 ms, new= 319 ms, diff=1.33 slower ❌
     q74: prev= 660 ms, new= 687 ms, diff=1.04 slower ✖
     q75: prev= 599 ms, new= 482 ms, diff=1.24 faster ✅
     q76: prev=1002 ms, new= 341 ms, diff=2.94 faster ✅
     q77: prev= 442 ms, new= 366 ms, diff=1.21 faster ✅
     q78: prev= 310 ms, new= 361 ms, diff=1.16 slower ✖
     q79: prev= 325 ms, new= 292 ms, diff=1.11 faster ✔
     q80: prev= 438 ms, new= 420 ms, diff=1.04 faster ✔
     q81: prev= 315 ms, new= 306 ms, diff=1.03 faster ✔
     q82: prev= 346 ms, new= 340 ms, diff=1.02 faster ✔
     q83: prev= 332 ms, new= 352 ms, diff=1.06 slower ✖
     q84: prev= 292 ms, new= 277 ms, diff=1.05 faster ✔
     q85: prev= 531 ms, new= 547 ms, diff=1.03 slower ✖
     q86: prev= 214 ms, new= 156 ms, diff=1.37 faster ✅
     q87: prev= 267 ms, new= 368 ms, diff=1.38 slower ❌
     q88: prev= 468 ms, new= 504 ms, diff=1.08 slower ✖
     q89: prev= 228 ms, new= 279 ms, diff=1.22 slower ❌
     q90: prev= 212 ms, new= 268 ms, diff=1.26 slower ❌
     q91: prev= 535 ms, new= 498 ms, diff=1.07 faster ✔
     q92: prev= 254 ms, new= 291 ms, diff=1.15 slower ✖
     q93: prev= 210 ms, new= 270 ms, diff=1.29 slower ❌
     q94: prev= 407 ms, new= 355 ms, diff=1.15 faster ✔
     q95: prev= 412 ms, new= 364 ms, diff=1.13 faster ✔
     q96: prev= 266 ms, new= 330 ms, diff=1.24 slower ❌
     q97: prev= 208 ms, new= 243 ms, diff=1.17 slower ✖
     q98: prev= 213 ms, new= 213 ms, diff=1.00 slower ✖
     q99: prev= 964 ms, new=1008 ms, diff=1.05 slower ✖

@gabotechs gabotechs marked this pull request as ready for review May 26, 2026 11:14
@gabotechs gabotechs force-pushed the gabrielmusat/remove-partition-isolator branch 2 times, most recently from 03f6905 to 0357def Compare May 26, 2026 11:28
Comment on lines -319 to -323
/// Test that verifies PartitionIsolatorExec nodes are preserved during metrics collection.
/// This tests the corner case where PartitionIsolatorExec nodes (which have no metrics)
/// must still be included in the metrics collection to maintain correct node-to-metric mapping.
#[tokio::test]
async fn test_metrics_collection_with_partition_isolator() {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test becomes irrelevant, partition isolator no longer exists

Comment on lines -216 to +226
let mut node_idx = 0;
// Phase 1 — accumulate per-task metrics into a map keyed by node identity.
//
// For each task, the plan is traversed with `apply_with_ctx`, which visits nodes in pre-order
// traversal, ignoring branches that do not belong to the recursed DistributedTaskContext
// (e.g., because of the presence of ChildrenIsolatorUnionExec).
//
// The raw allocation address of each `Arc<dyn ExecutionPlan>` as the node key.
// The planning plan is not modified between traversals, so these addresses are stable.
let mut node_metrics_map: HashMap<usize, MetricsSet> = HashMap::new();
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needed to change to adhere to the fact that different tasks will have different plans. Before it was a bit easier, because all the plans were the same across tasks inside the same stage, so recursing it once was enough.

However, now, depending on the DistributedTaskContext the plan that gets executed in the different tasks might be completely different, and therefore we need to recurse once per task index.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. This looks ok.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you've updated the collection side in src/worker/impl_execute_task.rs

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the collection side and this code need to follow the same recursion path through the plan, as collected metrics assumes there's a match in the pre-order indexes collected while recursing.

Pretty much same as before, but now per-task rather than globally.

@gabotechs gabotechs force-pushed the gabrielmusat/factor-out-distributed-recursion branch 2 times, most recently from e777f40 to 071dcc0 Compare May 26, 2026 11:40
@gabotechs gabotechs force-pushed the gabrielmusat/remove-partition-isolator branch from 0357def to 21b7689 Compare May 26, 2026 11:40
@gabotechs gabotechs force-pushed the gabrielmusat/factor-out-distributed-recursion branch from 071dcc0 to b01c6fc Compare May 26, 2026 13:20
Base automatically changed from gabrielmusat/factor-out-distributed-recursion to gabrielmusat/stage-metrics May 26, 2026 13:21
@gabotechs gabotechs force-pushed the gabrielmusat/remove-partition-isolator branch 4 times, most recently from 9196264 to 8eb191c Compare May 26, 2026 13:47
@gabotechs gabotechs force-pushed the gabrielmusat/stage-metrics branch from aedad16 to af69055 Compare May 26, 2026 13:53
@gabotechs gabotechs changed the base branch from gabrielmusat/stage-metrics to gabrielmusat/factor-out-distributed-recursion-fixed May 26, 2026 13:57
@gabotechs gabotechs closed this May 26, 2026
@gabotechs gabotechs reopened this May 26, 2026
@gabotechs gabotechs force-pushed the gabrielmusat/remove-partition-isolator branch from 8eb191c to 249311e Compare May 26, 2026 14:28
@gabotechs gabotechs force-pushed the gabrielmusat/remove-partition-isolator branch from 249311e to 613e8db Compare May 26, 2026 14:53
@gabotechs gabotechs force-pushed the gabrielmusat/factor-out-distributed-recursion-fixed branch from 12a761b to c5e321e Compare May 26, 2026 18:03
@gabotechs gabotechs force-pushed the gabrielmusat/remove-partition-isolator branch from 613e8db to 7175232 Compare May 26, 2026 18:04
Copy link
Copy Markdown
Collaborator

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice change. This is so much more natural than PartitionIsolatorExec. Perf improvements look awesome as well!

Closes #449

Let's close this too #395 :)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad to see this change!

let mut file_groups = VecDeque::with_capacity(file_scan.file_groups.len() * task_count);
for file_group in file_scan_template.file_groups.drain(..) {
file_groups.extend(file_group.split_files(task_count));
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off topic but I always felt that this should flatten out the files and generate new file groups from those flat files.

Say you have 3 tasks and 2 file groups:
group1: 1 file
group2: 12 files

This code will result in 4 file groups with 1, 4, 4, 4 files respectively. Ideally you want 6 file groups with 2, 2, 2, 2, 2, 3.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see you push the empty file group FileGroup::new(vec![]) to avoid this problem.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's easy to do, we could avoid putting empty file groups by distributing the files better.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this can be improved. There's some on-going work for that in #450.

For this PR, probably leaving it as it was is better, that way we qualify performance impact comparing apples to apples, but definitely worth improving.

Comment thread tests/tpcds_plans_test.rs
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: diff is too big for github but in custom_routing_union_variant, having c0 ordered below c1 feels unnatural to me

│     DistributedUnionExec: t0:[c0(0/4)] t1:[c0(1/4)] t2:[c0(2/4)] t3:[c0(3/4)] t4:[c1]
    │       DistributedLeafExec: URLEmitterExec: tasks=8 partitions=1 tag=left
    │       DistributedLeafExec: URLEmitterExec: tasks=2 partitions=4 tag=right

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I'm not sure what you mean, isn't c0 on the side within the union rather than above or below?

.iter()
.enumerate()
.map(
|(child_i, plan)| match children_to_keep.contains(&child_i) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth using a map to loop up the child faster? The union may have many children.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd not expect there to be too many children, so the overhead of hashing + indexing might actually outweight the benefits.

Let's do it though, from a semantic standpoint is cleaner.

Comment on lines -216 to +226
let mut node_idx = 0;
// Phase 1 — accumulate per-task metrics into a map keyed by node identity.
//
// For each task, the plan is traversed with `apply_with_ctx`, which visits nodes in pre-order
// traversal, ignoring branches that do not belong to the recursed DistributedTaskContext
// (e.g., because of the presence of ChildrenIsolatorUnionExec).
//
// The raw allocation address of each `Arc<dyn ExecutionPlan>` as the node key.
// The planning plan is not modified between traversals, so these addresses are stable.
let mut node_metrics_map: HashMap<usize, MetricsSet> = HashMap::new();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. This looks ok.

Comment on lines -216 to +226
let mut node_idx = 0;
// Phase 1 — accumulate per-task metrics into a map keyed by node identity.
//
// For each task, the plan is traversed with `apply_with_ctx`, which visits nodes in pre-order
// traversal, ignoring branches that do not belong to the recursed DistributedTaskContext
// (e.g., because of the presence of ChildrenIsolatorUnionExec).
//
// The raw allocation address of each `Arc<dyn ExecutionPlan>` as the node key.
// The planning plan is not modified between traversals, so these addresses are stable.
let mut node_metrics_map: HashMap<usize, MetricsSet> = HashMap::new();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you've updated the collection side in src/worker/impl_execute_task.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants