Skip to content

Factor out distributed recursion#469

Merged
gabotechs merged 2 commits into
mainfrom
gabrielmusat/factor-out-distributed-recursion-fixed
May 27, 2026
Merged

Factor out distributed recursion#469
gabotechs merged 2 commits into
mainfrom
gabrielmusat/factor-out-distributed-recursion-fixed

Conversation

@gabotechs
Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs commented May 26, 2026

This is one PR from the following stack of PRs:

This PR factors out common recursion patterns used in the distributed planner into reusable helpers, reducing complexity in critical parts of the codebase. By centralizing recursion logic, we improve code maintainability and reduce the risk of subtle bugs in recursive tree traversals.

@gabotechs gabotechs force-pushed the gabrielmusat/local-worker-connections branch from bf7cadc to ad92147 Compare May 26, 2026 13:46
@gabotechs gabotechs force-pushed the gabrielmusat/factor-out-distributed-recursion-fixed branch from e28d614 to 12a761b Compare May 26, 2026 13:53
gabotechs added a commit that referenced this pull request May 26, 2026
… be the same (#427)

This is one PR from the following stack of PRs:
- #427
<- you are here
- #469
- #461
- #462
- #463
- #464
- #432

This is a preparatory step towards:
-
#377

This is an optimization that allows workers to communicate in-memory
avoiding network calls and serialization in case it needs to communicate
to itself.

This optimization shows good improvements if using a small number of
workers, and fades away as more workers are used.

Even if this shows improvements today, it will become very meaningful in
adaptative query execution: there will be times when two consecutive
stages are assigned 1 task each, and as that was done dynamically, we
cannot eagerly do the optimization that collapses those two stages into
one, so instead, we assigned both stages to the same worker, and the
optimization in this PR kicks in, executing the plan fully locally.
Base automatically changed from gabrielmusat/local-worker-connections to main May 26, 2026 18:02
@gabotechs gabotechs force-pushed the gabrielmusat/factor-out-distributed-recursion-fixed branch from 12a761b to c5e321e Compare May 26, 2026 18:03
@gabotechs gabotechs requested a review from jayshrivastava May 26, 2026 20:03
Comment thread src/common/recursion.rs Outdated
Comment thread src/common/recursion.rs
Comment thread src/common/recursion.rs
},
)
} else if plan.is_network_boundary() {
Ok(TreeNodeRecursion::Continue)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect this to be TreeNodeRecursion::Jump because the trait method says "This function does not recurse into the input of network boundaries.". Network Boundaries have no children, so this doesn't really matter I guess.... Unless you call this during planning, in which case the NB has a LocalStage, meaning this will recurse into the children.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have some better comments on LocalStage and RemoteStage. They are basically PlanningStage and ExecutionStage at this point.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 PlanningStage and ExecutionStage are actually good names...

About the TreeNodeRecursion::Jump: this function never recurses into network boundaries, regardless of whether they are LocalStage or RemoteStage. Now that I see it it's a bit confusing, but even if we are returning here TreeNodeRecursion::Continue, we are not calling apply_until_stop, so we don't recurse.

This is being called within TreeNodeRecursion::visit_children(), and TreeNodeRecursion::Continue here does not mean "keep recursing down", like in the .transform_down() API, it means instead "keep visiting children", and we do want to move to the next children without recursing into NBs.

It's a bit unfortunate that upstream API decided to reuse the same TreeNodeRecursion for two different things...

Comment thread src/common/recursion.rs Outdated
{
// None = skip this subtree (irrelevant CIU child for our task index).
let stack = RefCell::new(vec![Some(dt_ctx)]);
self.transform_down_up(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use transform_down_up and pass |node| Ok(Transformed::no(node)), to the up function? Isn't transform_down simpler?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap! the only reason why this is transform_down_up is because that's how I implemented every method on the first place, but I'm realizing the RefCell + transform_down_up trick is only needed for implementing transform_up variants.

Comment thread src/common/recursion.rs Outdated
};
let transformed = f(node, dt_ctx.clone())?;
if transformed.tnr != TreeNodeRecursion::Continue
|| transformed.data.is_network_boundary()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so we don't traverse into network boundaries. Should all the trait methods say " /// This function does not recurse into the input of network boundaries."?

When I started reading this PR, it seemed like you only wanted one method to skip traversing into them because only one had that comment.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the *_with_dt_ctx methods we cannot traverse into network boundaries because the DistributedTaskContext can no longer be propagated there.

However, in the *_with_task_count methods we do can traverse into network boundaries, and we do. I just expanded a bit the doc comments to reflect this.

Comment thread src/common/recursion.rs Outdated
Comment thread src/common/recursion.rs
)
}

fn transform_up_with_task_count<F: FnMut(Self, usize) -> Result<Transformed<Self>>>(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this and transform_down_with_task_count, whether or not we recurse into the children will depend if the NB has a LocalStage or RemoteStage. In other words, behavior will depend on when you use this recursion helper (execution or planning).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, same as with upstream's .transform_down and .transform_up. This is expected though: if there is a RemoteStage, the input plan was already sent over the wire to another worker, so there's nothing to keep recursing on.

Comment thread src/common/recursion.rs
Comment thread src/common/recursion.rs
Comment thread src/common/recursion.rs
let inner0 = ciu(vec![leaf(), leaf()], vec![1, 1], 2).unwrap();
let inner1 = ciu(vec![leaf(), leaf()], vec![1, 1], 2).unwrap();
let plan = ciu(vec![inner0, inner1], vec![2, 2], 4).unwrap();
assert_snapshot!(trace_apply(&plan, ctx(0, 4)), @r"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you don't differentiate between the two children CIUs

        assert_snapshot!(trace_apply(&plan, ctx(0, 4)), @r"
        CIU [ctx=0/4]
        CIU [ctx=0/2]
        Leaf [ctx=0/1]
        ");
        assert_snapshot!(trace_apply(&plan, ctx(1, 4)), @r"
        CIU [ctx=1/4]
        CIU [ctx=1/2] -> this looks identical to the one below
        Leaf [ctx=0/1]
        ");
        assert_snapshot!(trace_apply(&plan, ctx(2, 4)), @r"
        CIU [ctx=2/4]
        CIU [ctx=0/2]
        Leaf [ctx=0/1]
        ");
        assert_snapshot!(trace_apply(&plan, ctx(3, 4)), @r"
        CIU [ctx=3/4]
        CIU [ctx=1/2] -> this one
        Leaf [ctx=0/1]
        ");

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 but I think it's not a big deal. Good coverage for CIU's is already provided in children_isolator_union.rs, and here I think the snapshots show the necessary info (the [ctx=X/Y]).

On one iteration I tried having a HashMap from Arc<dyn ExecutionPlan> pointer address to pre-order index so that it also gets rendered here, but I removed it because it seemed like an overkill that did not add a lot of value to the tests.

@gabotechs gabotechs merged commit 070bfc9 into main May 27, 2026
17 checks passed
@gabotechs gabotechs deleted the gabrielmusat/factor-out-distributed-recursion-fixed branch May 27, 2026 13:13
gabotechs added a commit that referenced this pull request May 27, 2026
This is one PR from the following stack of PRs:
- #427
- #469
- #467
<- you are here

## Motivation

Part of #460 (DataFusion v54 upgrade).

DataFusion [#21351](apache/datafusion#21351)
introduced **work-stealing** for `FileScanConfig`: instead of each
partition scanning a fixed, pre-assigned set of files, idle partitions
can steal pending file work from a shared queue at execution time. This
means the mapping *partition → files* is no longer determined at
planning time.

`PartitionIsolatorExec` was built on exactly that assumption: it split
the leaf `DataSourceExec` into `task_count × original_partitions` file
groups at planning time and selected the right slice for each task via
`task_index` at execution. With dynamic file assignment, this selection
is no longer valid.

## Solution — `DistributedLeafExec`

Replace `PartitionIsolatorExec` with `DistributedLeafExec`, a new
wrapper that bundles one **pre-built per-task variant** of the leaf plan
for each distributed task. The task spawner swaps in the correct variant
before the plan is serialised and sent to a worker — no runtime
selection by `task_index` needed.

### Before (`PartitionIsolatorExec`)

```
planning time                    worker (task 1 of 3)
─────────────────────────────    ─────────────────────────────
DataSourceExec                   DataSourceExec
  file_groups = [f0, f1, f2,       file_groups = [f0, f1, f2,
                 f3, f4, f5,                        f3, f4, f5,
                 f6, f7, f8]                        f6, f7, f8]
↓ wrapped by                     ↓ selects slice [f3..f5]
PartitionIsolatorExec              at execution via task_index
  tasks=3 partitions=3
```

`PartitionIsolatorExec` used `task_index` at execution to pick the right
slice. With work-stealing this slice is meaningless: partition 0 may end
up scanning `f7`, not `f3`.

### After (`DistributedLeafExec`)

```
planning time                    task spawner (before send)   worker (task 1 of 3)
─────────────────────────────    ──────────────────────────   ─────────────────────
DistributedLeafExec              pick variants[1]             DataSourceExec
  original: DataSourceExec   ─────────────────────────────►    file_groups =
    file_groups = [f0..f8]                                       [f3, f4, f5]
  variants:
    [0]: DataSourceExec
           file_groups=[f0,f1,f2]
    [1]: DataSourceExec
           file_groups=[f3,f4,f5]
    [2]: DataSourceExec
           file_groups=[f6,f7,f8]
```

Each per-task `DataSourceExec` has its own, isolated file groups decided
at planning time. The task spawner replaces `DistributedLeafExec` with
the right variant before serialising, so the worker receives a plain
`DataSourceExec` — compatible with DataFusion's work-stealing scheduler.

## What changed

- **`PartitionIsolatorExec`** removed entirely.
- **`DistributedLeafExec`** introduced: holds `original` (for
planning-time properties and single-task execution) + `variants` (one
per task).
- **`FileScanConfigTaskEstimator::scale_up_leaf_node`** now returns a
`DistributedLeafExec` wrapping the pre-split variants instead of a
`PartitionIsolatorExec`.
- Task spawner replaces `DistributedLeafExec` with the per-task variant
before serialisation (same location that already handled
`ChildrenIsolatorUnionExec`).
- Stage metrics rewriter updated to traverse `ChildrenIsolatorUnionExec`
correctly (per-task node offsets) and to make `DistributedLeafExec`
transparent for metrics lookups.
- All snapshot tests and integration tests updated.

## Performance

0 Impact on TPCH and Clickbench, but getting some good numbers on
TPC-DS:

<details><summary>TOTAL: prev=264135.349226 ms, new=231281.42632199993
ms, diff=1.14 faster ✔</summary>

```
      q1: prev= 677 ms, new= 489 ms, diff=1.38 faster ✅
      q2: prev= 455 ms, new= 433 ms, diff=1.05 faster ✔
      q3: prev= 267 ms, new= 287 ms, diff=1.07 slower ✖
      q4: prev=1418 ms, new=1360 ms, diff=1.04 faster ✔
      q5: prev= 658 ms, new= 481 ms, diff=1.37 faster ✅
      q6: prev=1013 ms, new= 751 ms, diff=1.35 faster ✅
      q7: prev= 394 ms, new= 343 ms, diff=1.15 faster ✔
      q8: prev= 417 ms, new= 459 ms, diff=1.10 slower ✖
      q9: prev= 369 ms, new= 370 ms, diff=1.00 slower ✖
     q10: prev=1028 ms, new= 785 ms, diff=1.31 faster ✅
     q11: prev=1142 ms, new=1125 ms, diff=1.02 faster ✔
     q12: prev= 248 ms, new= 216 ms, diff=1.15 faster ✔
     q13: prev= 739 ms, new= 612 ms, diff=1.21 faster ✅
     q14: prev= 990 ms, new= 813 ms, diff=1.22 faster ✅
     q15: prev= 262 ms, new= 159 ms, diff=1.65 faster ✅
     q16: prev= 741 ms, new= 467 ms, diff=1.59 faster ✅
     q17: prev= 412 ms, new= 280 ms, diff=1.47 faster ✅
     q18: prev= 354 ms, new= 317 ms, diff=1.12 faster ✔
     q19: prev= 421 ms, new= 282 ms, diff=1.49 faster ✅
     q20: prev= 366 ms, new= 167 ms, diff=2.19 faster ✅
     q21: prev= 560 ms, new= 339 ms, diff=1.65 faster ✅
     q22: prev= 685 ms, new= 556 ms, diff=1.23 faster ✅
     q23: prev= 823 ms, new= 634 ms, diff=1.30 faster ✅
     q24: prev= 759 ms, new= 518 ms, diff=1.47 faster ✅
     q25: prev= 304 ms, new= 221 ms, diff=1.38 faster ✅
     q26: prev= 201 ms, new= 184 ms, diff=1.09 faster ✔
     q27: prev= 378 ms, new= 481 ms, diff=1.27 slower ❌
     q28: prev= 288 ms, new= 198 ms, diff=1.45 faster ✅
     q29: prev= 347 ms, new= 314 ms, diff=1.11 faster ✔
     q31: prev= 381 ms, new= 266 ms, diff=1.43 faster ✅
     q32: prev= 304 ms, new= 225 ms, diff=1.35 faster ✅
     q33: prev= 370 ms, new= 319 ms, diff=1.16 faster ✔
     q34: prev= 433 ms, new= 291 ms, diff=1.49 faster ✅
     q35: prev= 814 ms, new= 709 ms, diff=1.15 faster ✔
     q36: prev=1104 ms, new= 507 ms, diff=2.18 faster ✅
     q37: prev= 505 ms, new= 437 ms, diff=1.16 faster ✔
     q38: prev= 342 ms, new= 386 ms, diff=1.13 slower ✖
     q39: prev= 367 ms, new= 417 ms, diff=1.14 slower ✖
     q40: prev= 448 ms, new= 452 ms, diff=1.01 slower ✖
     q41: prev= 110 ms, new= 130 ms, diff=1.18 slower ✖
     q42: prev= 148 ms, new= 183 ms, diff=1.24 slower ❌
     q43: prev= 208 ms, new= 189 ms, diff=1.10 faster ✔
     q44: prev= 234 ms, new= 271 ms, diff=1.16 slower ✖
     q45: prev= 332 ms, new= 229 ms, diff=1.45 faster ✅
     q46: prev= 656 ms, new= 486 ms, diff=1.35 faster ✅
     q47: prev= 491 ms, new= 413 ms, diff=1.19 faster ✔
     q48: prev= 570 ms, new= 480 ms, diff=1.19 faster ✔
     q49: prev= 393 ms, new= 344 ms, diff=1.14 faster ✔
     q50: prev= 538 ms, new= 405 ms, diff=1.33 faster ✅
     q51: prev= 265 ms, new= 246 ms, diff=1.08 faster ✔
     q52: prev= 139 ms, new= 178 ms, diff=1.28 slower ❌
     q53: prev= 354 ms, new= 239 ms, diff=1.48 faster ✅
     q54: prev= 497 ms, new= 408 ms, diff=1.22 faster ✅
     q55: prev= 221 ms, new= 155 ms, diff=1.43 faster ✅
     q56: prev= 438 ms, new= 359 ms, diff=1.22 faster ✅
     q57: prev= 422 ms, new= 363 ms, diff=1.16 faster ✔
     q58: prev= 340 ms, new= 389 ms, diff=1.14 slower ✖
     q59: prev= 392 ms, new= 392 ms, diff=1.00 slower ✖
     q60: prev= 446 ms, new= 350 ms, diff=1.27 faster ✅
     q61: prev=1610 ms, new= 958 ms, diff=1.68 faster ✅
     q62: prev= 713 ms, new= 722 ms, diff=1.01 slower ✖
     q63: prev= 243 ms, new= 213 ms, diff=1.14 faster ✔
     q64: prev=1411 ms, new=1253 ms, diff=1.13 faster ✔
     q65: prev= 291 ms, new= 315 ms, diff=1.08 slower ✖
     q66: prev= 796 ms, new= 716 ms, diff=1.11 faster ✔
     q67: prev= 459 ms, new= 511 ms, diff=1.11 slower ✖
     q68: prev= 481 ms, new= 441 ms, diff=1.09 faster ✔
     q69: prev= 746 ms, new= 558 ms, diff=1.34 faster ✅
     q70: prev= 471 ms, new= 497 ms, diff=1.06 slower ✖
     q71: prev= 472 ms, new= 307 ms, diff=1.54 faster ✅
     q72: prev=5412 ms, new=5622 ms, diff=1.04 slower ✖
     q73: prev= 239 ms, new= 319 ms, diff=1.33 slower ❌
     q74: prev= 660 ms, new= 687 ms, diff=1.04 slower ✖
     q75: prev= 599 ms, new= 482 ms, diff=1.24 faster ✅
     q76: prev=1002 ms, new= 341 ms, diff=2.94 faster ✅
     q77: prev= 442 ms, new= 366 ms, diff=1.21 faster ✅
     q78: prev= 310 ms, new= 361 ms, diff=1.16 slower ✖
     q79: prev= 325 ms, new= 292 ms, diff=1.11 faster ✔
     q80: prev= 438 ms, new= 420 ms, diff=1.04 faster ✔
     q81: prev= 315 ms, new= 306 ms, diff=1.03 faster ✔
     q82: prev= 346 ms, new= 340 ms, diff=1.02 faster ✔
     q83: prev= 332 ms, new= 352 ms, diff=1.06 slower ✖
     q84: prev= 292 ms, new= 277 ms, diff=1.05 faster ✔
     q85: prev= 531 ms, new= 547 ms, diff=1.03 slower ✖
     q86: prev= 214 ms, new= 156 ms, diff=1.37 faster ✅
     q87: prev= 267 ms, new= 368 ms, diff=1.38 slower ❌
     q88: prev= 468 ms, new= 504 ms, diff=1.08 slower ✖
     q89: prev= 228 ms, new= 279 ms, diff=1.22 slower ❌
     q90: prev= 212 ms, new= 268 ms, diff=1.26 slower ❌
     q91: prev= 535 ms, new= 498 ms, diff=1.07 faster ✔
     q92: prev= 254 ms, new= 291 ms, diff=1.15 slower ✖
     q93: prev= 210 ms, new= 270 ms, diff=1.29 slower ❌
     q94: prev= 407 ms, new= 355 ms, diff=1.15 faster ✔
     q95: prev= 412 ms, new= 364 ms, diff=1.13 faster ✔
     q96: prev= 266 ms, new= 330 ms, diff=1.24 slower ❌
     q97: prev= 208 ms, new= 243 ms, diff=1.17 slower ✖
     q98: prev= 213 ms, new= 213 ms, diff=1.00 slower ✖
     q99: prev= 964 ms, new=1008 ms, diff=1.05 slower ✖
```

</details>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants