Skip to content

Use in-memory comms if both workers involved in an exchange happen to be the same #427

Merged
gabotechs merged 3 commits into
mainfrom
gabrielmusat/local-worker-connections
May 26, 2026
Merged

Use in-memory comms if both workers involved in an exchange happen to be the same #427
gabotechs merged 3 commits into
mainfrom
gabrielmusat/local-worker-connections

Conversation

@gabotechs
Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs commented May 4, 2026

This is one PR from the following stack of PRs:

This is a preparatory step towards:

This is an optimization that allows workers to communicate in-memory avoiding network calls and serialization in case it needs to communicate to itself.

This optimization shows good improvements if using a small number of workers, and fades away as more workers are used.

Even if this shows improvements today, it will become very meaningful in adaptative query execution: there will be times when two consecutive stages are assigned 1 task each, and as that was done dynamically, we cannot eagerly do the optimization that collapses those two stages into one, so instead, we assigned both stages to the same worker, and the optimization in this PR kicks in, executing the plan fully locally.

@gabotechs gabotechs changed the base branch from main to gabrielmusat/refactor-coordinator-module May 4, 2026 08:59
JSOD11 pushed a commit that referenced this pull request May 5, 2026
Cherry picks the `LocalWorkerContext` struct propagation from:
- #427
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-coordinator-module branch from 89f0103 to 4a4f699 Compare May 6, 2026 19:46
@gabotechs gabotechs force-pushed the gabrielmusat/local-worker-connections branch from 024e0f1 to 5751a15 Compare May 6, 2026 19:46
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-coordinator-module branch from 4a4f699 to de62b53 Compare May 6, 2026 19:51
@gabotechs gabotechs force-pushed the gabrielmusat/local-worker-connections branch from 5751a15 to d2a57e1 Compare May 6, 2026 19:52
gabotechs added a commit that referenced this pull request May 11, 2026
PR factored out from
#416.

This is one PR from the following stack of PRs:
- #422
<- you are here
- #424
- #416
- #425
- #426
- #427
- #432

Previously, we where force-propagating a max task count assignation
below
the NetworkBroadcast so that the remote build side has never more tasks
than the stage above.

In a dynamic task count assignation context, we can no longer do this,
as by the time
you realize a remote build side is going to have more tasks than the
stage above, the build side might have
already started executing, and by that time its task count is set in
stone.

This is fine, build side in broadcast should have an arbitrarily more
expensive
build side. What matters there is not that the build side is cheap to
execute, but that it returns little amount of data. A build side can
return
very little data (just a couple of rows) and still be very expensive to
execute

This is actually the reason why there is a small speedup in benchmarks.

---

<details><summary>tpch_sf10 1.09 faster ✔</summary>

```text
=== Comparing tpch_sf10 results from engine 'datafusion-distributed-main' [prev] with 'datafusion-distributed-dynamic-task-allocation' [new] ===
      q1: prev=1269 ms, new=1286 ms, diff=1.01 slower ✖
      q2: prev= 390 ms, new= 395 ms, diff=1.01 slower ✖
      q3: prev= 784 ms, new= 826 ms, diff=1.05 slower ✖
      q4: prev= 413 ms, new= 392 ms, diff=1.05 faster ✔
      q5: prev=1306 ms, new=1242 ms, diff=1.05 faster ✔
      q6: prev= 534 ms, new= 528 ms, diff=1.01 faster ✔
      q7: prev=1483 ms, new=1420 ms, diff=1.04 faster ✔
      q8: prev=3001 ms, new=1585 ms, diff=1.89 faster ✅
      q9: prev=2054 ms, new=2009 ms, diff=1.02 faster ✔
     q10: prev= 951 ms, new= 921 ms, diff=1.03 faster ✔
     q11: prev= 322 ms, new= 304 ms, diff=1.06 faster ✔
     q12: prev= 670 ms, new= 676 ms, diff=1.01 slower ✖
     q13: prev= 624 ms, new= 613 ms, diff=1.02 faster ✔
     q14: prev= 594 ms, new= 546 ms, diff=1.09 faster ✔
     q15: prev= 778 ms, new= 756 ms, diff=1.03 faster ✔
     q16: prev= 223 ms, new= 219 ms, diff=1.02 faster ✔
     q17: prev=1644 ms, new=1733 ms, diff=1.05 slower ✖
     q18: prev=1884 ms, new=1966 ms, diff=1.04 slower ✖
     q19: prev= 802 ms, new= 727 ms, diff=1.10 faster ✔
     q20: prev= 784 ms, new= 706 ms, diff=1.11 faster ✔
     q21: prev=2112 ms, new=1925 ms, diff=1.10 faster ✔
     q22: prev= 251 ms, new= 261 ms, diff=1.04 slower ✖
   TOTAL: prev=68651.703894 ms, new=63144.566305999986 ms, diff=1.09 faster ✔
```

</details>

<details><summary>tpcds_sf1 1.02 faster ✔</summary>

```text
=== Comparing tpcds_sf1 results from engine 'datafusion-distributed-dynamic-task-allocation' [prev] with 'datafusion-distributed-dynamic-task-allocation' [new] ===
      q1: prev= 260 ms, new= 336 ms, diff=1.29 slower ❌
      q2: prev= 290 ms, new= 321 ms, diff=1.11 slower ✖
      q3: prev= 181 ms, new= 215 ms, diff=1.19 slower ✖
      q4: prev=2039 ms, new=2184 ms, diff=1.07 slower ✖
      q5: prev= 333 ms, new= 325 ms, diff=1.02 faster ✔
      q6: prev= 622 ms, new= 676 ms, diff=1.09 slower ✖
      q7: prev= 225 ms, new= 225 ms, diff=1.00 slower ✖
      q8: prev= 312 ms, new= 200 ms, diff=1.56 faster ✅
      q9: prev= 242 ms, new= 189 ms, diff=1.28 faster ✅
     q10: prev= 480 ms, new= 494 ms, diff=1.03 slower ✖
     q11: prev=1511 ms, new=1382 ms, diff=1.09 faster ✔
     q12: prev= 262 ms, new= 292 ms, diff=1.11 slower ✖
     q13: prev= 477 ms, new= 487 ms, diff=1.02 slower ✖
     q14: prev= 637 ms, new= 782 ms, diff=1.23 slower ❌
     q15: prev= 170 ms, new= 144 ms, diff=1.18 faster ✔
     q16: prev= 350 ms, new= 379 ms, diff=1.08 slower ✖
     q17: prev= 229 ms, new= 250 ms, diff=1.09 slower ✖
     q18: prev= 295 ms, new= 281 ms, diff=1.05 faster ✔
     q19: prev= 286 ms, new= 254 ms, diff=1.13 faster ✔
     q20: prev= 206 ms, new= 143 ms, diff=1.44 faster ✅
     q21: prev= 305 ms, new= 282 ms, diff=1.08 faster ✔
     q22: prev= 390 ms, new= 401 ms, diff=1.03 slower ✖
     q23: prev= 672 ms, new= 640 ms, diff=1.05 faster ✔
     q24: prev= 368 ms, new= 376 ms, diff=1.02 slower ✖
     q25: prev= 203 ms, new= 279 ms, diff=1.37 slower ❌
     q26: prev= 147 ms, new= 198 ms, diff=1.35 slower ❌
     q27: prev= 406 ms, new= 358 ms, diff=1.13 faster ✔
     q28: prev= 195 ms, new= 161 ms, diff=1.21 faster ✅
     q29: prev= 237 ms, new= 219 ms, diff=1.08 faster ✔
     q31: prev= 343 ms, new= 327 ms, diff=1.05 faster ✔
     q32: prev= 142 ms, new= 152 ms, diff=1.07 slower ✖
     q33: prev= 277 ms, new= 211 ms, diff=1.31 faster ✅
     q34: prev= 199 ms, new= 188 ms, diff=1.06 faster ✔
     q35: prev= 514 ms, new= 498 ms, diff=1.03 faster ✔
     q36: prev= 341 ms, new= 311 ms, diff=1.10 faster ✔
     q37: prev= 256 ms, new= 302 ms, diff=1.18 slower ✖
     q38: prev= 228 ms, new= 245 ms, diff=1.07 slower ✖
     q39: prev= 259 ms, new= 266 ms, diff=1.03 slower ✖
     q40: prev= 281 ms, new= 325 ms, diff=1.16 slower ✖
     q41: prev=  87 ms, new=  90 ms, diff=1.03 slower ✖
     q42: prev= 116 ms, new= 124 ms, diff=1.07 slower ✖
     q43: prev= 190 ms, new= 132 ms, diff=1.44 faster ✅
     q44: prev= 214 ms, new= 144 ms, diff=1.49 faster ✅
     q45: prev= 244 ms, new= 186 ms, diff=1.31 faster ✅
     q46: prev= 355 ms, new= 288 ms, diff=1.23 faster ✅
     q47: prev= 374 ms, new= 387 ms, diff=1.03 slower ✖
     q48: prev= 384 ms, new= 360 ms, diff=1.07 faster ✔
     q49: prev= 285 ms, new= 229 ms, diff=1.24 faster ✅
     q50: prev= 352 ms, new= 343 ms, diff=1.03 faster ✔
     q51: prev= 305 ms, new= 224 ms, diff=1.36 faster ✅
     q52: prev= 138 ms, new= 127 ms, diff=1.09 faster ✔
     q53: prev= 143 ms, new= 158 ms, diff=1.10 slower ✖
     q54: prev= 331 ms, new= 271 ms, diff=1.22 faster ✅
     q55: prev= 132 ms, new= 145 ms, diff=1.10 slower ✖
     q56: prev= 298 ms, new= 233 ms, diff=1.28 faster ✅
     q57: prev= 335 ms, new= 354 ms, diff=1.06 slower ✖
     q58: prev= 280 ms, new= 284 ms, diff=1.01 slower ✖
     q59: prev= 293 ms, new= 270 ms, diff=1.09 faster ✔
     q60: prev= 361 ms, new= 311 ms, diff=1.16 faster ✔
     q61: prev= 856 ms, new= 849 ms, diff=1.01 faster ✔
     q62: prev= 639 ms, new= 665 ms, diff=1.04 slower ✖
     q63: prev= 224 ms, new= 148 ms, diff=1.51 faster ✅
     q64: prev=1159 ms, new=1193 ms, diff=1.03 slower ✖
     q65: prev= 229 ms, new= 228 ms, diff=1.00 faster ✔
     q66: prev= 730 ms, new= 714 ms, diff=1.02 faster ✔
     q67: prev= 406 ms, new= 420 ms, diff=1.03 slower ✖
     q68: prev= 289 ms, new= 320 ms, diff=1.11 slower ✖
     q69: prev= 513 ms, new= 570 ms, diff=1.11 slower ✖
     q70: prev= 394 ms, new= 386 ms, diff=1.02 faster ✔
     q71: prev= 250 ms, new= 329 ms, diff=1.32 slower ❌
     q72: prev=6644 ms, new=6609 ms, diff=1.01 faster ✔
     q73: prev= 201 ms, new= 210 ms, diff=1.04 slower ✖
     q74: prev= 797 ms, new= 743 ms, diff=1.07 faster ✔
     q75: prev= 375 ms, new= 452 ms, diff=1.21 slower ❌
     q76: prev= 165 ms, new= 230 ms, diff=1.39 slower ❌
     q77: prev= 232 ms, new= 271 ms, diff=1.17 slower ✖
     q78: prev= 341 ms, new= 353 ms, diff=1.04 slower ✖
     q79: prev= 226 ms, new= 228 ms, diff=1.01 slower ✖
     q80: prev= 332 ms, new= 336 ms, diff=1.01 slower ✖
     q81: prev= 216 ms, new= 191 ms, diff=1.13 faster ✔
     q82: prev= 258 ms, new= 262 ms, diff=1.02 slower ✖
     q83: prev= 240 ms, new= 287 ms, diff=1.20 slower ✖
     q84: prev= 240 ms, new= 228 ms, diff=1.05 faster ✔
     q85: prev= 455 ms, new= 364 ms, diff=1.25 faster ✅
     q86: prev= 124 ms, new= 138 ms, diff=1.11 slower ✖
     q87: prev= 203 ms, new= 208 ms, diff=1.02 slower ✖
     q88: prev= 404 ms, new= 350 ms, diff=1.15 faster ✔
     q89: prev= 237 ms, new= 167 ms, diff=1.42 faster ✅
     q90: prev= 189 ms, new= 187 ms, diff=1.01 faster ✔
     q91: prev= 377 ms, new= 328 ms, diff=1.15 faster ✔
     q92: prev= 284 ms, new= 131 ms, diff=2.17 faster ✅
     q93: prev= 154 ms, new= 142 ms, diff=1.08 faster ✔
     q94: prev= 302 ms, new= 308 ms, diff=1.02 slower ✖
     q95: prev= 365 ms, new= 290 ms, diff=1.26 faster ✅
     q96: prev= 177 ms, new= 157 ms, diff=1.13 faster ✔
     q97: prev= 235 ms, new= 170 ms, diff=1.38 faster ✅
     q98: prev= 165 ms, new= 159 ms, diff=1.04 faster ✔
     q99: prev= 951 ms, new= 995 ms, diff=1.05 slower ✖
   TOTAL: prev=123029.07797800002 ms, new=120962.55825200005 ms, diff=1.02 faster ✔
```

</details>
gabotechs added a commit that referenced this pull request May 11, 2026
An independent refactor factored out from
#416

This is one PR from the following stack of PRs:
- #422
- #424
<- you are here
- #416
- #425
- #426
- #427
- #432


Previously the stage struct was a "hidden" state machine that could have
two states:

1. A state where the Stage contains the input plan and is locally
accessible and traversible.

```rust
pub struct Stage {
    query_id: ...
    num: ...
    plan: Some(plan),
    tasks: vec![None, None, None]
}
```

2. A state where the input plan is serialized, and the worker URLs are
assigned. This happens in `DistributedExec` right before execution on
`prepare_plan()`

```rust
pub struct Stage {
    query_id: ...
    num: ...
    plan: None,
    tasks: vec![Some("http://1"), Some("http://2"), Some("http://3")]
}
```

This PR makes this behavior explicit, and represented with an `enum`:

```rust
pub enum Stage {
    Local(LocalStage),
    Remote(RemoteStage),
}

pub struct LocalStage {
    pub query_id: Uuid,
    pub num: usize,
    pub plan: Arc<dyn ExecutionPlan>,
    pub tasks: usize,
}

pub struct RemoteStage {
    pub query_id: Uuid,
    pub num: usize,
    pub workers: Vec<Url>,
}
```
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-coordinator-module branch from de62b53 to a748a8b Compare May 11, 2026 17:41
@gabotechs gabotechs force-pushed the gabrielmusat/local-worker-connections branch 2 times, most recently from a147fd7 to f643281 Compare May 11, 2026 17:42
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-coordinator-module branch 2 times, most recently from 1de0c5c to c278b27 Compare May 11, 2026 20:58
@gabotechs gabotechs force-pushed the gabrielmusat/local-worker-connections branch from f643281 to b0cb082 Compare May 11, 2026 21:00
Comment thread src/worker/impl_execute_task.rs
Comment thread src/worker/impl_execute_task.rs Outdated
Comment thread src/worker/impl_execute_task.rs
Comment thread src/worker/worker_connection_pool.rs Outdated
fn execute(&self, partition: usize) -> Result<BoxStream<'static, Result<RecordBatch>>> {
let mut request = self.request_template.clone();
request.target_partition_start = partition as u64;
request.target_partition_end = (partition + 1) as u64;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this request have a range if we only execute 1 partition in the remote and local case?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the remote case we execute multiple partitions:

The range is passed here:

target_partition_range: Range<usize>,

And one of the places where the range is given is here:

off..(off + self.properties.partitioning.partition_count()),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the range matters for remote because one request owns multiple target partitions, but I think that the local request to just the partition being executed makes it so unrequested local partitions never get the drop wrapper that decrements num_partitions_remaining.

Could we either make the local connection own the full range or mark the unstarted local partitions done on connection drop?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Good suggestion, trying this now...

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented this here bf7cadc, let me know what you think

@gabotechs gabotechs force-pushed the gabrielmusat/local-worker-connections branch from bfa0326 to aea9cc4 Compare May 14, 2026 18:52
gabotechs added a commit that referenced this pull request May 14, 2026
… planner (#416)

This is a preparatory step towards:
-
#377

This is one PR from the following stack of PRs:
- #422
- #424
- #416
<- you are here
- #425
- #426
- #427
- #432

The main purpose of this PR is to make distributed planning in a single
pass, rather than the current two that communicate each other via an
intermediate struct (`AnnotatedPlan`). This change cascades into several
other changes that produce a nicer public API for building custom
distributed plans, but also produce a big diff.

## Dropping the two-step annotation + NB injection

On a dynamic task assignation context, choosing the task count for a
stage based on the previous one can no longer
be done statically. 

After "annotating" a stage, and before "annotating" the
one above, we need to be able to send it for execution, collect runtime
metrics, and based on that decide the task count for the stage above.

This means that the stage below should be good to be sent for execution
before the full annotation process has finished, meaning that we need to
do everything there is to be done in the "annotation" process, we can no
longer divide the distribution process in several steps that recurse the
whole plan.

## Network boundaries no longer mutate their children

In order for network boundaries to know what mutations to apply to their
children, they need to now how many consumer tasks are they going to be 
running, but this might not be know until execution time, so if we want
to
dynamically assign tasks to stages, there's no way at planning time that
we can know how to mutate the children.

For example, we do not now how to scale up a `RepartitionExec` if we
don't
know how many `NetworkShuffleExec`s are going to be consuming it.

The responsibility of preparing network boundaries inputs (e.g., scaling
RepartitionExec)
is now factor out into a separate `network_boundary_scale_input()`
function
that can be called either at planning time or at execution time.

Right now, it's still just called at planning time.
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-coordinator-module branch from c278b27 to 479ddc5 Compare May 15, 2026 11:41
gabotechs added a commit that referenced this pull request May 15, 2026
This is a preparatory step towards:
-
#377

This is one PR from the following stack of PRs:
- #422
- #424
- #416 
- #425
<- you are here
- #426
- #427
- #432


Removes `impl_set_plan.rs` in favor of just inlining its contents to
`impl_coordinator_channel.rs`.

In future changes, the relationship between `impl_set_plan.rs` and
`impl_coordinator_channel.rs` will get more complex, increasing the
function signature `impl_set_plan.rs` exposes to
`impl_coordinator_channel.rs`. This proves that the split between those
two files does not make sense, as they have never been able to evolve
independently, so we may as well just not pay the price of a complex
function signature in between.
@gabotechs gabotechs force-pushed the gabrielmusat/refactor-coordinator-module branch from 4673b41 to f3e3e50 Compare May 15, 2026 11:51
@gabotechs gabotechs force-pushed the gabrielmusat/local-worker-connections branch from 00eeb5b to 62e00ac Compare May 15, 2026 15:13
gabotechs added a commit that referenced this pull request May 16, 2026
This is a preparatory step towards:
-
#377

This is one PR from the following stack of PRs:
- #422
- #424
- #416 
- #425
- #426
<- you are here
- #427
- #432

`distributed.rs` contains the `DistributedExec` node, which has evolved
towards acting as a "coordinator". It's in charge of assigning tasks to
worker URLs, setting the subplans in the appropriate workers, collecting
metrics, streaming work units, etc...

Soon, it will evolve even more as we prepare for adaptative query
execution.

This PR ships two things:
- A refactor that dismantles the old `distributed.rs` into smaller
reusable modules in the `coordinator/` module.
- Bypass the metrics collection machinery if metrics collection is
disabled
Copy link
Copy Markdown
Collaborator

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just one follow up on a thread but I am using this in a brnach haha 😆

Comment thread src/worker/worker_connection_pool.rs Outdated
fn execute(&self, partition: usize) -> Result<BoxStream<'static, Result<RecordBatch>>> {
let mut request = self.request_template.clone();
request.target_partition_start = partition as u64;
request.target_partition_end = (partition + 1) as u64;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the range matters for remote because one request owns multiple target partitions, but I think that the local request to just the partition being executed makes it so unrequested local partitions never get the drop wrapper that decrements num_partitions_remaining.

Could we either make the local connection own the full range or mark the unstarted local partitions done on connection drop?

@gabotechs gabotechs force-pushed the gabrielmusat/local-worker-connections branch from bf7cadc to ad92147 Compare May 26, 2026 13:46
Copy link
Copy Markdown
Collaborator

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

Comment thread src/worker/worker_connection_pool.rs Outdated
@gabotechs gabotechs merged commit fe9f04d into main May 26, 2026
17 checks passed
@gabotechs gabotechs deleted the gabrielmusat/local-worker-connections branch May 26, 2026 18:02
gabotechs added a commit that referenced this pull request May 27, 2026
This is one PR from the following stack of PRs:
- #427
- #469
<- you are here
- #461
- #462
- #463
- #464
- #432

This PR factors out common recursion patterns used in the distributed
planner into reusable helpers, reducing complexity in critical parts of
the codebase. By centralizing recursion logic, we improve code
maintainability and reduce the risk of subtle bugs in recursive tree
traversals.
gabotechs added a commit that referenced this pull request May 27, 2026
This is one PR from the following stack of PRs:
- #427
- #469
- #467
<- you are here

## Motivation

Part of #460 (DataFusion v54 upgrade).

DataFusion [#21351](apache/datafusion#21351)
introduced **work-stealing** for `FileScanConfig`: instead of each
partition scanning a fixed, pre-assigned set of files, idle partitions
can steal pending file work from a shared queue at execution time. This
means the mapping *partition → files* is no longer determined at
planning time.

`PartitionIsolatorExec` was built on exactly that assumption: it split
the leaf `DataSourceExec` into `task_count × original_partitions` file
groups at planning time and selected the right slice for each task via
`task_index` at execution. With dynamic file assignment, this selection
is no longer valid.

## Solution — `DistributedLeafExec`

Replace `PartitionIsolatorExec` with `DistributedLeafExec`, a new
wrapper that bundles one **pre-built per-task variant** of the leaf plan
for each distributed task. The task spawner swaps in the correct variant
before the plan is serialised and sent to a worker — no runtime
selection by `task_index` needed.

### Before (`PartitionIsolatorExec`)

```
planning time                    worker (task 1 of 3)
─────────────────────────────    ─────────────────────────────
DataSourceExec                   DataSourceExec
  file_groups = [f0, f1, f2,       file_groups = [f0, f1, f2,
                 f3, f4, f5,                        f3, f4, f5,
                 f6, f7, f8]                        f6, f7, f8]
↓ wrapped by                     ↓ selects slice [f3..f5]
PartitionIsolatorExec              at execution via task_index
  tasks=3 partitions=3
```

`PartitionIsolatorExec` used `task_index` at execution to pick the right
slice. With work-stealing this slice is meaningless: partition 0 may end
up scanning `f7`, not `f3`.

### After (`DistributedLeafExec`)

```
planning time                    task spawner (before send)   worker (task 1 of 3)
─────────────────────────────    ──────────────────────────   ─────────────────────
DistributedLeafExec              pick variants[1]             DataSourceExec
  original: DataSourceExec   ─────────────────────────────►    file_groups =
    file_groups = [f0..f8]                                       [f3, f4, f5]
  variants:
    [0]: DataSourceExec
           file_groups=[f0,f1,f2]
    [1]: DataSourceExec
           file_groups=[f3,f4,f5]
    [2]: DataSourceExec
           file_groups=[f6,f7,f8]
```

Each per-task `DataSourceExec` has its own, isolated file groups decided
at planning time. The task spawner replaces `DistributedLeafExec` with
the right variant before serialising, so the worker receives a plain
`DataSourceExec` — compatible with DataFusion's work-stealing scheduler.

## What changed

- **`PartitionIsolatorExec`** removed entirely.
- **`DistributedLeafExec`** introduced: holds `original` (for
planning-time properties and single-task execution) + `variants` (one
per task).
- **`FileScanConfigTaskEstimator::scale_up_leaf_node`** now returns a
`DistributedLeafExec` wrapping the pre-split variants instead of a
`PartitionIsolatorExec`.
- Task spawner replaces `DistributedLeafExec` with the per-task variant
before serialisation (same location that already handled
`ChildrenIsolatorUnionExec`).
- Stage metrics rewriter updated to traverse `ChildrenIsolatorUnionExec`
correctly (per-task node offsets) and to make `DistributedLeafExec`
transparent for metrics lookups.
- All snapshot tests and integration tests updated.

## Performance

0 Impact on TPCH and Clickbench, but getting some good numbers on
TPC-DS:

<details><summary>TOTAL: prev=264135.349226 ms, new=231281.42632199993
ms, diff=1.14 faster ✔</summary>

```
      q1: prev= 677 ms, new= 489 ms, diff=1.38 faster ✅
      q2: prev= 455 ms, new= 433 ms, diff=1.05 faster ✔
      q3: prev= 267 ms, new= 287 ms, diff=1.07 slower ✖
      q4: prev=1418 ms, new=1360 ms, diff=1.04 faster ✔
      q5: prev= 658 ms, new= 481 ms, diff=1.37 faster ✅
      q6: prev=1013 ms, new= 751 ms, diff=1.35 faster ✅
      q7: prev= 394 ms, new= 343 ms, diff=1.15 faster ✔
      q8: prev= 417 ms, new= 459 ms, diff=1.10 slower ✖
      q9: prev= 369 ms, new= 370 ms, diff=1.00 slower ✖
     q10: prev=1028 ms, new= 785 ms, diff=1.31 faster ✅
     q11: prev=1142 ms, new=1125 ms, diff=1.02 faster ✔
     q12: prev= 248 ms, new= 216 ms, diff=1.15 faster ✔
     q13: prev= 739 ms, new= 612 ms, diff=1.21 faster ✅
     q14: prev= 990 ms, new= 813 ms, diff=1.22 faster ✅
     q15: prev= 262 ms, new= 159 ms, diff=1.65 faster ✅
     q16: prev= 741 ms, new= 467 ms, diff=1.59 faster ✅
     q17: prev= 412 ms, new= 280 ms, diff=1.47 faster ✅
     q18: prev= 354 ms, new= 317 ms, diff=1.12 faster ✔
     q19: prev= 421 ms, new= 282 ms, diff=1.49 faster ✅
     q20: prev= 366 ms, new= 167 ms, diff=2.19 faster ✅
     q21: prev= 560 ms, new= 339 ms, diff=1.65 faster ✅
     q22: prev= 685 ms, new= 556 ms, diff=1.23 faster ✅
     q23: prev= 823 ms, new= 634 ms, diff=1.30 faster ✅
     q24: prev= 759 ms, new= 518 ms, diff=1.47 faster ✅
     q25: prev= 304 ms, new= 221 ms, diff=1.38 faster ✅
     q26: prev= 201 ms, new= 184 ms, diff=1.09 faster ✔
     q27: prev= 378 ms, new= 481 ms, diff=1.27 slower ❌
     q28: prev= 288 ms, new= 198 ms, diff=1.45 faster ✅
     q29: prev= 347 ms, new= 314 ms, diff=1.11 faster ✔
     q31: prev= 381 ms, new= 266 ms, diff=1.43 faster ✅
     q32: prev= 304 ms, new= 225 ms, diff=1.35 faster ✅
     q33: prev= 370 ms, new= 319 ms, diff=1.16 faster ✔
     q34: prev= 433 ms, new= 291 ms, diff=1.49 faster ✅
     q35: prev= 814 ms, new= 709 ms, diff=1.15 faster ✔
     q36: prev=1104 ms, new= 507 ms, diff=2.18 faster ✅
     q37: prev= 505 ms, new= 437 ms, diff=1.16 faster ✔
     q38: prev= 342 ms, new= 386 ms, diff=1.13 slower ✖
     q39: prev= 367 ms, new= 417 ms, diff=1.14 slower ✖
     q40: prev= 448 ms, new= 452 ms, diff=1.01 slower ✖
     q41: prev= 110 ms, new= 130 ms, diff=1.18 slower ✖
     q42: prev= 148 ms, new= 183 ms, diff=1.24 slower ❌
     q43: prev= 208 ms, new= 189 ms, diff=1.10 faster ✔
     q44: prev= 234 ms, new= 271 ms, diff=1.16 slower ✖
     q45: prev= 332 ms, new= 229 ms, diff=1.45 faster ✅
     q46: prev= 656 ms, new= 486 ms, diff=1.35 faster ✅
     q47: prev= 491 ms, new= 413 ms, diff=1.19 faster ✔
     q48: prev= 570 ms, new= 480 ms, diff=1.19 faster ✔
     q49: prev= 393 ms, new= 344 ms, diff=1.14 faster ✔
     q50: prev= 538 ms, new= 405 ms, diff=1.33 faster ✅
     q51: prev= 265 ms, new= 246 ms, diff=1.08 faster ✔
     q52: prev= 139 ms, new= 178 ms, diff=1.28 slower ❌
     q53: prev= 354 ms, new= 239 ms, diff=1.48 faster ✅
     q54: prev= 497 ms, new= 408 ms, diff=1.22 faster ✅
     q55: prev= 221 ms, new= 155 ms, diff=1.43 faster ✅
     q56: prev= 438 ms, new= 359 ms, diff=1.22 faster ✅
     q57: prev= 422 ms, new= 363 ms, diff=1.16 faster ✔
     q58: prev= 340 ms, new= 389 ms, diff=1.14 slower ✖
     q59: prev= 392 ms, new= 392 ms, diff=1.00 slower ✖
     q60: prev= 446 ms, new= 350 ms, diff=1.27 faster ✅
     q61: prev=1610 ms, new= 958 ms, diff=1.68 faster ✅
     q62: prev= 713 ms, new= 722 ms, diff=1.01 slower ✖
     q63: prev= 243 ms, new= 213 ms, diff=1.14 faster ✔
     q64: prev=1411 ms, new=1253 ms, diff=1.13 faster ✔
     q65: prev= 291 ms, new= 315 ms, diff=1.08 slower ✖
     q66: prev= 796 ms, new= 716 ms, diff=1.11 faster ✔
     q67: prev= 459 ms, new= 511 ms, diff=1.11 slower ✖
     q68: prev= 481 ms, new= 441 ms, diff=1.09 faster ✔
     q69: prev= 746 ms, new= 558 ms, diff=1.34 faster ✅
     q70: prev= 471 ms, new= 497 ms, diff=1.06 slower ✖
     q71: prev= 472 ms, new= 307 ms, diff=1.54 faster ✅
     q72: prev=5412 ms, new=5622 ms, diff=1.04 slower ✖
     q73: prev= 239 ms, new= 319 ms, diff=1.33 slower ❌
     q74: prev= 660 ms, new= 687 ms, diff=1.04 slower ✖
     q75: prev= 599 ms, new= 482 ms, diff=1.24 faster ✅
     q76: prev=1002 ms, new= 341 ms, diff=2.94 faster ✅
     q77: prev= 442 ms, new= 366 ms, diff=1.21 faster ✅
     q78: prev= 310 ms, new= 361 ms, diff=1.16 slower ✖
     q79: prev= 325 ms, new= 292 ms, diff=1.11 faster ✔
     q80: prev= 438 ms, new= 420 ms, diff=1.04 faster ✔
     q81: prev= 315 ms, new= 306 ms, diff=1.03 faster ✔
     q82: prev= 346 ms, new= 340 ms, diff=1.02 faster ✔
     q83: prev= 332 ms, new= 352 ms, diff=1.06 slower ✖
     q84: prev= 292 ms, new= 277 ms, diff=1.05 faster ✔
     q85: prev= 531 ms, new= 547 ms, diff=1.03 slower ✖
     q86: prev= 214 ms, new= 156 ms, diff=1.37 faster ✅
     q87: prev= 267 ms, new= 368 ms, diff=1.38 slower ❌
     q88: prev= 468 ms, new= 504 ms, diff=1.08 slower ✖
     q89: prev= 228 ms, new= 279 ms, diff=1.22 slower ❌
     q90: prev= 212 ms, new= 268 ms, diff=1.26 slower ❌
     q91: prev= 535 ms, new= 498 ms, diff=1.07 faster ✔
     q92: prev= 254 ms, new= 291 ms, diff=1.15 slower ✖
     q93: prev= 210 ms, new= 270 ms, diff=1.29 slower ❌
     q94: prev= 407 ms, new= 355 ms, diff=1.15 faster ✔
     q95: prev= 412 ms, new= 364 ms, diff=1.13 faster ✔
     q96: prev= 266 ms, new= 330 ms, diff=1.24 slower ❌
     q97: prev= 208 ms, new= 243 ms, diff=1.17 slower ✖
     q98: prev= 213 ms, new= 213 ms, diff=1.00 slower ✖
     q99: prev= 964 ms, new=1008 ms, diff=1.05 slower ✖
```

</details>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants