Factor out distributed recursion#469
Conversation
bf7cadc to
ad92147
Compare
e28d614 to
12a761b
Compare
… be the same (#427) This is one PR from the following stack of PRs: - #427 <- you are here - #469 - #461 - #462 - #463 - #464 - #432 This is a preparatory step towards: - #377 This is an optimization that allows workers to communicate in-memory avoiding network calls and serialization in case it needs to communicate to itself. This optimization shows good improvements if using a small number of workers, and fades away as more workers are used. Even if this shows improvements today, it will become very meaningful in adaptative query execution: there will be times when two consecutive stages are assigned 1 task each, and as that was done dynamically, we cannot eagerly do the optimization that collapses those two stages into one, so instead, we assigned both stages to the same worker, and the optimization in this PR kicks in, executing the plan fully locally.
12a761b to
c5e321e
Compare
| }, | ||
| ) | ||
| } else if plan.is_network_boundary() { | ||
| Ok(TreeNodeRecursion::Continue) |
There was a problem hiding this comment.
I'd expect this to be TreeNodeRecursion::Jump because the trait method says "This function does not recurse into the input of network boundaries.". Network Boundaries have no children, so this doesn't really matter I guess.... Unless you call this during planning, in which case the NB has a LocalStage, meaning this will recurse into the children.
There was a problem hiding this comment.
We should have some better comments on LocalStage and RemoteStage. They are basically PlanningStage and ExecutionStage at this point.
There was a problem hiding this comment.
🤔 PlanningStage and ExecutionStage are actually good names...
About the TreeNodeRecursion::Jump: this function never recurses into network boundaries, regardless of whether they are LocalStage or RemoteStage. Now that I see it it's a bit confusing, but even if we are returning here TreeNodeRecursion::Continue, we are not calling apply_until_stop, so we don't recurse.
This is being called within TreeNodeRecursion::visit_children(), and TreeNodeRecursion::Continue here does not mean "keep recursing down", like in the .transform_down() API, it means instead "keep visiting children", and we do want to move to the next children without recursing into NBs.
It's a bit unfortunate that upstream API decided to reuse the same TreeNodeRecursion for two different things...
| { | ||
| // None = skip this subtree (irrelevant CIU child for our task index). | ||
| let stack = RefCell::new(vec![Some(dt_ctx)]); | ||
| self.transform_down_up( |
There was a problem hiding this comment.
Why use transform_down_up and pass |node| Ok(Transformed::no(node)), to the up function? Isn't transform_down simpler?
There was a problem hiding this comment.
Yeap! the only reason why this is transform_down_up is because that's how I implemented every method on the first place, but I'm realizing the RefCell + transform_down_up trick is only needed for implementing transform_up variants.
| }; | ||
| let transformed = f(node, dt_ctx.clone())?; | ||
| if transformed.tnr != TreeNodeRecursion::Continue | ||
| || transformed.data.is_network_boundary() |
There was a problem hiding this comment.
Ok so we don't traverse into network boundaries. Should all the trait methods say " /// This function does not recurse into the input of network boundaries."?
When I started reading this PR, it seemed like you only wanted one method to skip traversing into them because only one had that comment.
There was a problem hiding this comment.
In the *_with_dt_ctx methods we cannot traverse into network boundaries because the DistributedTaskContext can no longer be propagated there.
However, in the *_with_task_count methods we do can traverse into network boundaries, and we do. I just expanded a bit the doc comments to reflect this.
| ) | ||
| } | ||
|
|
||
| fn transform_up_with_task_count<F: FnMut(Self, usize) -> Result<Transformed<Self>>>( |
There was a problem hiding this comment.
For this and transform_down_with_task_count, whether or not we recurse into the children will depend if the NB has a LocalStage or RemoteStage. In other words, behavior will depend on when you use this recursion helper (execution or planning).
There was a problem hiding this comment.
Yeap, same as with upstream's .transform_down and .transform_up. This is expected though: if there is a RemoteStage, the input plan was already sent over the wire to another worker, so there's nothing to keep recursing on.
| let inner0 = ciu(vec![leaf(), leaf()], vec![1, 1], 2).unwrap(); | ||
| let inner1 = ciu(vec![leaf(), leaf()], vec![1, 1], 2).unwrap(); | ||
| let plan = ciu(vec![inner0, inner1], vec![2, 2], 4).unwrap(); | ||
| assert_snapshot!(trace_apply(&plan, ctx(0, 4)), @r" |
There was a problem hiding this comment.
nit: you don't differentiate between the two children CIUs
assert_snapshot!(trace_apply(&plan, ctx(0, 4)), @r"
CIU [ctx=0/4]
CIU [ctx=0/2]
Leaf [ctx=0/1]
");
assert_snapshot!(trace_apply(&plan, ctx(1, 4)), @r"
CIU [ctx=1/4]
CIU [ctx=1/2] -> this looks identical to the one below
Leaf [ctx=0/1]
");
assert_snapshot!(trace_apply(&plan, ctx(2, 4)), @r"
CIU [ctx=2/4]
CIU [ctx=0/2]
Leaf [ctx=0/1]
");
assert_snapshot!(trace_apply(&plan, ctx(3, 4)), @r"
CIU [ctx=3/4]
CIU [ctx=1/2] -> this one
Leaf [ctx=0/1]
");
There was a problem hiding this comment.
🤔 but I think it's not a big deal. Good coverage for CIU's is already provided in children_isolator_union.rs, and here I think the snapshots show the necessary info (the [ctx=X/Y]).
On one iteration I tried having a HashMap from Arc<dyn ExecutionPlan> pointer address to pre-order index so that it also gets rendered here, but I removed it because it seemed like an overkill that did not add a lot of value to the tests.
This is one PR from the following stack of PRs: - #427 - #469 - #467 <- you are here ## Motivation Part of #460 (DataFusion v54 upgrade). DataFusion [#21351](apache/datafusion#21351) introduced **work-stealing** for `FileScanConfig`: instead of each partition scanning a fixed, pre-assigned set of files, idle partitions can steal pending file work from a shared queue at execution time. This means the mapping *partition → files* is no longer determined at planning time. `PartitionIsolatorExec` was built on exactly that assumption: it split the leaf `DataSourceExec` into `task_count × original_partitions` file groups at planning time and selected the right slice for each task via `task_index` at execution. With dynamic file assignment, this selection is no longer valid. ## Solution — `DistributedLeafExec` Replace `PartitionIsolatorExec` with `DistributedLeafExec`, a new wrapper that bundles one **pre-built per-task variant** of the leaf plan for each distributed task. The task spawner swaps in the correct variant before the plan is serialised and sent to a worker — no runtime selection by `task_index` needed. ### Before (`PartitionIsolatorExec`) ``` planning time worker (task 1 of 3) ───────────────────────────── ───────────────────────────── DataSourceExec DataSourceExec file_groups = [f0, f1, f2, file_groups = [f0, f1, f2, f3, f4, f5, f3, f4, f5, f6, f7, f8] f6, f7, f8] ↓ wrapped by ↓ selects slice [f3..f5] PartitionIsolatorExec at execution via task_index tasks=3 partitions=3 ``` `PartitionIsolatorExec` used `task_index` at execution to pick the right slice. With work-stealing this slice is meaningless: partition 0 may end up scanning `f7`, not `f3`. ### After (`DistributedLeafExec`) ``` planning time task spawner (before send) worker (task 1 of 3) ───────────────────────────── ────────────────────────── ───────────────────── DistributedLeafExec pick variants[1] DataSourceExec original: DataSourceExec ─────────────────────────────► file_groups = file_groups = [f0..f8] [f3, f4, f5] variants: [0]: DataSourceExec file_groups=[f0,f1,f2] [1]: DataSourceExec file_groups=[f3,f4,f5] [2]: DataSourceExec file_groups=[f6,f7,f8] ``` Each per-task `DataSourceExec` has its own, isolated file groups decided at planning time. The task spawner replaces `DistributedLeafExec` with the right variant before serialising, so the worker receives a plain `DataSourceExec` — compatible with DataFusion's work-stealing scheduler. ## What changed - **`PartitionIsolatorExec`** removed entirely. - **`DistributedLeafExec`** introduced: holds `original` (for planning-time properties and single-task execution) + `variants` (one per task). - **`FileScanConfigTaskEstimator::scale_up_leaf_node`** now returns a `DistributedLeafExec` wrapping the pre-split variants instead of a `PartitionIsolatorExec`. - Task spawner replaces `DistributedLeafExec` with the per-task variant before serialisation (same location that already handled `ChildrenIsolatorUnionExec`). - Stage metrics rewriter updated to traverse `ChildrenIsolatorUnionExec` correctly (per-task node offsets) and to make `DistributedLeafExec` transparent for metrics lookups. - All snapshot tests and integration tests updated. ## Performance 0 Impact on TPCH and Clickbench, but getting some good numbers on TPC-DS: <details><summary>TOTAL: prev=264135.349226 ms, new=231281.42632199993 ms, diff=1.14 faster ✔</summary> ``` q1: prev= 677 ms, new= 489 ms, diff=1.38 faster ✅ q2: prev= 455 ms, new= 433 ms, diff=1.05 faster ✔ q3: prev= 267 ms, new= 287 ms, diff=1.07 slower ✖ q4: prev=1418 ms, new=1360 ms, diff=1.04 faster ✔ q5: prev= 658 ms, new= 481 ms, diff=1.37 faster ✅ q6: prev=1013 ms, new= 751 ms, diff=1.35 faster ✅ q7: prev= 394 ms, new= 343 ms, diff=1.15 faster ✔ q8: prev= 417 ms, new= 459 ms, diff=1.10 slower ✖ q9: prev= 369 ms, new= 370 ms, diff=1.00 slower ✖ q10: prev=1028 ms, new= 785 ms, diff=1.31 faster ✅ q11: prev=1142 ms, new=1125 ms, diff=1.02 faster ✔ q12: prev= 248 ms, new= 216 ms, diff=1.15 faster ✔ q13: prev= 739 ms, new= 612 ms, diff=1.21 faster ✅ q14: prev= 990 ms, new= 813 ms, diff=1.22 faster ✅ q15: prev= 262 ms, new= 159 ms, diff=1.65 faster ✅ q16: prev= 741 ms, new= 467 ms, diff=1.59 faster ✅ q17: prev= 412 ms, new= 280 ms, diff=1.47 faster ✅ q18: prev= 354 ms, new= 317 ms, diff=1.12 faster ✔ q19: prev= 421 ms, new= 282 ms, diff=1.49 faster ✅ q20: prev= 366 ms, new= 167 ms, diff=2.19 faster ✅ q21: prev= 560 ms, new= 339 ms, diff=1.65 faster ✅ q22: prev= 685 ms, new= 556 ms, diff=1.23 faster ✅ q23: prev= 823 ms, new= 634 ms, diff=1.30 faster ✅ q24: prev= 759 ms, new= 518 ms, diff=1.47 faster ✅ q25: prev= 304 ms, new= 221 ms, diff=1.38 faster ✅ q26: prev= 201 ms, new= 184 ms, diff=1.09 faster ✔ q27: prev= 378 ms, new= 481 ms, diff=1.27 slower ❌ q28: prev= 288 ms, new= 198 ms, diff=1.45 faster ✅ q29: prev= 347 ms, new= 314 ms, diff=1.11 faster ✔ q31: prev= 381 ms, new= 266 ms, diff=1.43 faster ✅ q32: prev= 304 ms, new= 225 ms, diff=1.35 faster ✅ q33: prev= 370 ms, new= 319 ms, diff=1.16 faster ✔ q34: prev= 433 ms, new= 291 ms, diff=1.49 faster ✅ q35: prev= 814 ms, new= 709 ms, diff=1.15 faster ✔ q36: prev=1104 ms, new= 507 ms, diff=2.18 faster ✅ q37: prev= 505 ms, new= 437 ms, diff=1.16 faster ✔ q38: prev= 342 ms, new= 386 ms, diff=1.13 slower ✖ q39: prev= 367 ms, new= 417 ms, diff=1.14 slower ✖ q40: prev= 448 ms, new= 452 ms, diff=1.01 slower ✖ q41: prev= 110 ms, new= 130 ms, diff=1.18 slower ✖ q42: prev= 148 ms, new= 183 ms, diff=1.24 slower ❌ q43: prev= 208 ms, new= 189 ms, diff=1.10 faster ✔ q44: prev= 234 ms, new= 271 ms, diff=1.16 slower ✖ q45: prev= 332 ms, new= 229 ms, diff=1.45 faster ✅ q46: prev= 656 ms, new= 486 ms, diff=1.35 faster ✅ q47: prev= 491 ms, new= 413 ms, diff=1.19 faster ✔ q48: prev= 570 ms, new= 480 ms, diff=1.19 faster ✔ q49: prev= 393 ms, new= 344 ms, diff=1.14 faster ✔ q50: prev= 538 ms, new= 405 ms, diff=1.33 faster ✅ q51: prev= 265 ms, new= 246 ms, diff=1.08 faster ✔ q52: prev= 139 ms, new= 178 ms, diff=1.28 slower ❌ q53: prev= 354 ms, new= 239 ms, diff=1.48 faster ✅ q54: prev= 497 ms, new= 408 ms, diff=1.22 faster ✅ q55: prev= 221 ms, new= 155 ms, diff=1.43 faster ✅ q56: prev= 438 ms, new= 359 ms, diff=1.22 faster ✅ q57: prev= 422 ms, new= 363 ms, diff=1.16 faster ✔ q58: prev= 340 ms, new= 389 ms, diff=1.14 slower ✖ q59: prev= 392 ms, new= 392 ms, diff=1.00 slower ✖ q60: prev= 446 ms, new= 350 ms, diff=1.27 faster ✅ q61: prev=1610 ms, new= 958 ms, diff=1.68 faster ✅ q62: prev= 713 ms, new= 722 ms, diff=1.01 slower ✖ q63: prev= 243 ms, new= 213 ms, diff=1.14 faster ✔ q64: prev=1411 ms, new=1253 ms, diff=1.13 faster ✔ q65: prev= 291 ms, new= 315 ms, diff=1.08 slower ✖ q66: prev= 796 ms, new= 716 ms, diff=1.11 faster ✔ q67: prev= 459 ms, new= 511 ms, diff=1.11 slower ✖ q68: prev= 481 ms, new= 441 ms, diff=1.09 faster ✔ q69: prev= 746 ms, new= 558 ms, diff=1.34 faster ✅ q70: prev= 471 ms, new= 497 ms, diff=1.06 slower ✖ q71: prev= 472 ms, new= 307 ms, diff=1.54 faster ✅ q72: prev=5412 ms, new=5622 ms, diff=1.04 slower ✖ q73: prev= 239 ms, new= 319 ms, diff=1.33 slower ❌ q74: prev= 660 ms, new= 687 ms, diff=1.04 slower ✖ q75: prev= 599 ms, new= 482 ms, diff=1.24 faster ✅ q76: prev=1002 ms, new= 341 ms, diff=2.94 faster ✅ q77: prev= 442 ms, new= 366 ms, diff=1.21 faster ✅ q78: prev= 310 ms, new= 361 ms, diff=1.16 slower ✖ q79: prev= 325 ms, new= 292 ms, diff=1.11 faster ✔ q80: prev= 438 ms, new= 420 ms, diff=1.04 faster ✔ q81: prev= 315 ms, new= 306 ms, diff=1.03 faster ✔ q82: prev= 346 ms, new= 340 ms, diff=1.02 faster ✔ q83: prev= 332 ms, new= 352 ms, diff=1.06 slower ✖ q84: prev= 292 ms, new= 277 ms, diff=1.05 faster ✔ q85: prev= 531 ms, new= 547 ms, diff=1.03 slower ✖ q86: prev= 214 ms, new= 156 ms, diff=1.37 faster ✅ q87: prev= 267 ms, new= 368 ms, diff=1.38 slower ❌ q88: prev= 468 ms, new= 504 ms, diff=1.08 slower ✖ q89: prev= 228 ms, new= 279 ms, diff=1.22 slower ❌ q90: prev= 212 ms, new= 268 ms, diff=1.26 slower ❌ q91: prev= 535 ms, new= 498 ms, diff=1.07 faster ✔ q92: prev= 254 ms, new= 291 ms, diff=1.15 slower ✖ q93: prev= 210 ms, new= 270 ms, diff=1.29 slower ❌ q94: prev= 407 ms, new= 355 ms, diff=1.15 faster ✔ q95: prev= 412 ms, new= 364 ms, diff=1.13 faster ✔ q96: prev= 266 ms, new= 330 ms, diff=1.24 slower ❌ q97: prev= 208 ms, new= 243 ms, diff=1.17 slower ✖ q98: prev= 213 ms, new= 213 ms, diff=1.00 slower ✖ q99: prev= 964 ms, new=1008 ms, diff=1.05 slower ✖ ``` </details>
This is one PR from the following stack of PRs:
This PR factors out common recursion patterns used in the distributed planner into reusable helpers, reducing complexity in critical parts of the codebase. By centralizing recursion logic, we improve code maintainability and reduce the risk of subtle bugs in recursive tree traversals.