Use in-memory comms if both workers involved in an exchange happen to be the same #427
Conversation
Cherry picks the `LocalWorkerContext` struct propagation from: - #427
89f0103 to
4a4f699
Compare
024e0f1 to
5751a15
Compare
4a4f699 to
de62b53
Compare
5751a15 to
d2a57e1
Compare
PR factored out from #416. This is one PR from the following stack of PRs: - #422 <- you are here - #424 - #416 - #425 - #426 - #427 - #432 Previously, we where force-propagating a max task count assignation below the NetworkBroadcast so that the remote build side has never more tasks than the stage above. In a dynamic task count assignation context, we can no longer do this, as by the time you realize a remote build side is going to have more tasks than the stage above, the build side might have already started executing, and by that time its task count is set in stone. This is fine, build side in broadcast should have an arbitrarily more expensive build side. What matters there is not that the build side is cheap to execute, but that it returns little amount of data. A build side can return very little data (just a couple of rows) and still be very expensive to execute This is actually the reason why there is a small speedup in benchmarks. --- <details><summary>tpch_sf10 1.09 faster ✔</summary> ```text === Comparing tpch_sf10 results from engine 'datafusion-distributed-main' [prev] with 'datafusion-distributed-dynamic-task-allocation' [new] === q1: prev=1269 ms, new=1286 ms, diff=1.01 slower ✖ q2: prev= 390 ms, new= 395 ms, diff=1.01 slower ✖ q3: prev= 784 ms, new= 826 ms, diff=1.05 slower ✖ q4: prev= 413 ms, new= 392 ms, diff=1.05 faster ✔ q5: prev=1306 ms, new=1242 ms, diff=1.05 faster ✔ q6: prev= 534 ms, new= 528 ms, diff=1.01 faster ✔ q7: prev=1483 ms, new=1420 ms, diff=1.04 faster ✔ q8: prev=3001 ms, new=1585 ms, diff=1.89 faster ✅ q9: prev=2054 ms, new=2009 ms, diff=1.02 faster ✔ q10: prev= 951 ms, new= 921 ms, diff=1.03 faster ✔ q11: prev= 322 ms, new= 304 ms, diff=1.06 faster ✔ q12: prev= 670 ms, new= 676 ms, diff=1.01 slower ✖ q13: prev= 624 ms, new= 613 ms, diff=1.02 faster ✔ q14: prev= 594 ms, new= 546 ms, diff=1.09 faster ✔ q15: prev= 778 ms, new= 756 ms, diff=1.03 faster ✔ q16: prev= 223 ms, new= 219 ms, diff=1.02 faster ✔ q17: prev=1644 ms, new=1733 ms, diff=1.05 slower ✖ q18: prev=1884 ms, new=1966 ms, diff=1.04 slower ✖ q19: prev= 802 ms, new= 727 ms, diff=1.10 faster ✔ q20: prev= 784 ms, new= 706 ms, diff=1.11 faster ✔ q21: prev=2112 ms, new=1925 ms, diff=1.10 faster ✔ q22: prev= 251 ms, new= 261 ms, diff=1.04 slower ✖ TOTAL: prev=68651.703894 ms, new=63144.566305999986 ms, diff=1.09 faster ✔ ``` </details> <details><summary>tpcds_sf1 1.02 faster ✔</summary> ```text === Comparing tpcds_sf1 results from engine 'datafusion-distributed-dynamic-task-allocation' [prev] with 'datafusion-distributed-dynamic-task-allocation' [new] === q1: prev= 260 ms, new= 336 ms, diff=1.29 slower ❌ q2: prev= 290 ms, new= 321 ms, diff=1.11 slower ✖ q3: prev= 181 ms, new= 215 ms, diff=1.19 slower ✖ q4: prev=2039 ms, new=2184 ms, diff=1.07 slower ✖ q5: prev= 333 ms, new= 325 ms, diff=1.02 faster ✔ q6: prev= 622 ms, new= 676 ms, diff=1.09 slower ✖ q7: prev= 225 ms, new= 225 ms, diff=1.00 slower ✖ q8: prev= 312 ms, new= 200 ms, diff=1.56 faster ✅ q9: prev= 242 ms, new= 189 ms, diff=1.28 faster ✅ q10: prev= 480 ms, new= 494 ms, diff=1.03 slower ✖ q11: prev=1511 ms, new=1382 ms, diff=1.09 faster ✔ q12: prev= 262 ms, new= 292 ms, diff=1.11 slower ✖ q13: prev= 477 ms, new= 487 ms, diff=1.02 slower ✖ q14: prev= 637 ms, new= 782 ms, diff=1.23 slower ❌ q15: prev= 170 ms, new= 144 ms, diff=1.18 faster ✔ q16: prev= 350 ms, new= 379 ms, diff=1.08 slower ✖ q17: prev= 229 ms, new= 250 ms, diff=1.09 slower ✖ q18: prev= 295 ms, new= 281 ms, diff=1.05 faster ✔ q19: prev= 286 ms, new= 254 ms, diff=1.13 faster ✔ q20: prev= 206 ms, new= 143 ms, diff=1.44 faster ✅ q21: prev= 305 ms, new= 282 ms, diff=1.08 faster ✔ q22: prev= 390 ms, new= 401 ms, diff=1.03 slower ✖ q23: prev= 672 ms, new= 640 ms, diff=1.05 faster ✔ q24: prev= 368 ms, new= 376 ms, diff=1.02 slower ✖ q25: prev= 203 ms, new= 279 ms, diff=1.37 slower ❌ q26: prev= 147 ms, new= 198 ms, diff=1.35 slower ❌ q27: prev= 406 ms, new= 358 ms, diff=1.13 faster ✔ q28: prev= 195 ms, new= 161 ms, diff=1.21 faster ✅ q29: prev= 237 ms, new= 219 ms, diff=1.08 faster ✔ q31: prev= 343 ms, new= 327 ms, diff=1.05 faster ✔ q32: prev= 142 ms, new= 152 ms, diff=1.07 slower ✖ q33: prev= 277 ms, new= 211 ms, diff=1.31 faster ✅ q34: prev= 199 ms, new= 188 ms, diff=1.06 faster ✔ q35: prev= 514 ms, new= 498 ms, diff=1.03 faster ✔ q36: prev= 341 ms, new= 311 ms, diff=1.10 faster ✔ q37: prev= 256 ms, new= 302 ms, diff=1.18 slower ✖ q38: prev= 228 ms, new= 245 ms, diff=1.07 slower ✖ q39: prev= 259 ms, new= 266 ms, diff=1.03 slower ✖ q40: prev= 281 ms, new= 325 ms, diff=1.16 slower ✖ q41: prev= 87 ms, new= 90 ms, diff=1.03 slower ✖ q42: prev= 116 ms, new= 124 ms, diff=1.07 slower ✖ q43: prev= 190 ms, new= 132 ms, diff=1.44 faster ✅ q44: prev= 214 ms, new= 144 ms, diff=1.49 faster ✅ q45: prev= 244 ms, new= 186 ms, diff=1.31 faster ✅ q46: prev= 355 ms, new= 288 ms, diff=1.23 faster ✅ q47: prev= 374 ms, new= 387 ms, diff=1.03 slower ✖ q48: prev= 384 ms, new= 360 ms, diff=1.07 faster ✔ q49: prev= 285 ms, new= 229 ms, diff=1.24 faster ✅ q50: prev= 352 ms, new= 343 ms, diff=1.03 faster ✔ q51: prev= 305 ms, new= 224 ms, diff=1.36 faster ✅ q52: prev= 138 ms, new= 127 ms, diff=1.09 faster ✔ q53: prev= 143 ms, new= 158 ms, diff=1.10 slower ✖ q54: prev= 331 ms, new= 271 ms, diff=1.22 faster ✅ q55: prev= 132 ms, new= 145 ms, diff=1.10 slower ✖ q56: prev= 298 ms, new= 233 ms, diff=1.28 faster ✅ q57: prev= 335 ms, new= 354 ms, diff=1.06 slower ✖ q58: prev= 280 ms, new= 284 ms, diff=1.01 slower ✖ q59: prev= 293 ms, new= 270 ms, diff=1.09 faster ✔ q60: prev= 361 ms, new= 311 ms, diff=1.16 faster ✔ q61: prev= 856 ms, new= 849 ms, diff=1.01 faster ✔ q62: prev= 639 ms, new= 665 ms, diff=1.04 slower ✖ q63: prev= 224 ms, new= 148 ms, diff=1.51 faster ✅ q64: prev=1159 ms, new=1193 ms, diff=1.03 slower ✖ q65: prev= 229 ms, new= 228 ms, diff=1.00 faster ✔ q66: prev= 730 ms, new= 714 ms, diff=1.02 faster ✔ q67: prev= 406 ms, new= 420 ms, diff=1.03 slower ✖ q68: prev= 289 ms, new= 320 ms, diff=1.11 slower ✖ q69: prev= 513 ms, new= 570 ms, diff=1.11 slower ✖ q70: prev= 394 ms, new= 386 ms, diff=1.02 faster ✔ q71: prev= 250 ms, new= 329 ms, diff=1.32 slower ❌ q72: prev=6644 ms, new=6609 ms, diff=1.01 faster ✔ q73: prev= 201 ms, new= 210 ms, diff=1.04 slower ✖ q74: prev= 797 ms, new= 743 ms, diff=1.07 faster ✔ q75: prev= 375 ms, new= 452 ms, diff=1.21 slower ❌ q76: prev= 165 ms, new= 230 ms, diff=1.39 slower ❌ q77: prev= 232 ms, new= 271 ms, diff=1.17 slower ✖ q78: prev= 341 ms, new= 353 ms, diff=1.04 slower ✖ q79: prev= 226 ms, new= 228 ms, diff=1.01 slower ✖ q80: prev= 332 ms, new= 336 ms, diff=1.01 slower ✖ q81: prev= 216 ms, new= 191 ms, diff=1.13 faster ✔ q82: prev= 258 ms, new= 262 ms, diff=1.02 slower ✖ q83: prev= 240 ms, new= 287 ms, diff=1.20 slower ✖ q84: prev= 240 ms, new= 228 ms, diff=1.05 faster ✔ q85: prev= 455 ms, new= 364 ms, diff=1.25 faster ✅ q86: prev= 124 ms, new= 138 ms, diff=1.11 slower ✖ q87: prev= 203 ms, new= 208 ms, diff=1.02 slower ✖ q88: prev= 404 ms, new= 350 ms, diff=1.15 faster ✔ q89: prev= 237 ms, new= 167 ms, diff=1.42 faster ✅ q90: prev= 189 ms, new= 187 ms, diff=1.01 faster ✔ q91: prev= 377 ms, new= 328 ms, diff=1.15 faster ✔ q92: prev= 284 ms, new= 131 ms, diff=2.17 faster ✅ q93: prev= 154 ms, new= 142 ms, diff=1.08 faster ✔ q94: prev= 302 ms, new= 308 ms, diff=1.02 slower ✖ q95: prev= 365 ms, new= 290 ms, diff=1.26 faster ✅ q96: prev= 177 ms, new= 157 ms, diff=1.13 faster ✔ q97: prev= 235 ms, new= 170 ms, diff=1.38 faster ✅ q98: prev= 165 ms, new= 159 ms, diff=1.04 faster ✔ q99: prev= 951 ms, new= 995 ms, diff=1.05 slower ✖ TOTAL: prev=123029.07797800002 ms, new=120962.55825200005 ms, diff=1.02 faster ✔ ``` </details>
An independent refactor factored out from #416 This is one PR from the following stack of PRs: - #422 - #424 <- you are here - #416 - #425 - #426 - #427 - #432 Previously the stage struct was a "hidden" state machine that could have two states: 1. A state where the Stage contains the input plan and is locally accessible and traversible. ```rust pub struct Stage { query_id: ... num: ... plan: Some(plan), tasks: vec![None, None, None] } ``` 2. A state where the input plan is serialized, and the worker URLs are assigned. This happens in `DistributedExec` right before execution on `prepare_plan()` ```rust pub struct Stage { query_id: ... num: ... plan: None, tasks: vec![Some("http://1"), Some("http://2"), Some("http://3")] } ``` This PR makes this behavior explicit, and represented with an `enum`: ```rust pub enum Stage { Local(LocalStage), Remote(RemoteStage), } pub struct LocalStage { pub query_id: Uuid, pub num: usize, pub plan: Arc<dyn ExecutionPlan>, pub tasks: usize, } pub struct RemoteStage { pub query_id: Uuid, pub num: usize, pub workers: Vec<Url>, } ```
de62b53 to
a748a8b
Compare
a147fd7 to
f643281
Compare
1de0c5c to
c278b27
Compare
f643281 to
b0cb082
Compare
| fn execute(&self, partition: usize) -> Result<BoxStream<'static, Result<RecordBatch>>> { | ||
| let mut request = self.request_template.clone(); | ||
| request.target_partition_start = partition as u64; | ||
| request.target_partition_end = (partition + 1) as u64; |
There was a problem hiding this comment.
Why does this request have a range if we only execute 1 partition in the remote and local case?
There was a problem hiding this comment.
In the remote case we execute multiple partitions:
The range is passed here:
And one of the places where the range is given is here:
There was a problem hiding this comment.
Yeah, the range matters for remote because one request owns multiple target partitions, but I think that the local request to just the partition being executed makes it so unrequested local partitions never get the drop wrapper that decrements num_partitions_remaining.
Could we either make the local connection own the full range or mark the unstarted local partitions done on connection drop?
There was a problem hiding this comment.
🤔 Good suggestion, trying this now...
There was a problem hiding this comment.
Implemented this here bf7cadc, let me know what you think
bfa0326 to
aea9cc4
Compare
… planner (#416) This is a preparatory step towards: - #377 This is one PR from the following stack of PRs: - #422 - #424 - #416 <- you are here - #425 - #426 - #427 - #432 The main purpose of this PR is to make distributed planning in a single pass, rather than the current two that communicate each other via an intermediate struct (`AnnotatedPlan`). This change cascades into several other changes that produce a nicer public API for building custom distributed plans, but also produce a big diff. ## Dropping the two-step annotation + NB injection On a dynamic task assignation context, choosing the task count for a stage based on the previous one can no longer be done statically. After "annotating" a stage, and before "annotating" the one above, we need to be able to send it for execution, collect runtime metrics, and based on that decide the task count for the stage above. This means that the stage below should be good to be sent for execution before the full annotation process has finished, meaning that we need to do everything there is to be done in the "annotation" process, we can no longer divide the distribution process in several steps that recurse the whole plan. ## Network boundaries no longer mutate their children In order for network boundaries to know what mutations to apply to their children, they need to now how many consumer tasks are they going to be running, but this might not be know until execution time, so if we want to dynamically assign tasks to stages, there's no way at planning time that we can know how to mutate the children. For example, we do not now how to scale up a `RepartitionExec` if we don't know how many `NetworkShuffleExec`s are going to be consuming it. The responsibility of preparing network boundaries inputs (e.g., scaling RepartitionExec) is now factor out into a separate `network_boundary_scale_input()` function that can be called either at planning time or at execution time. Right now, it's still just called at planning time.
c278b27 to
479ddc5
Compare
This is a preparatory step towards: - #377 This is one PR from the following stack of PRs: - #422 - #424 - #416 - #425 <- you are here - #426 - #427 - #432 Removes `impl_set_plan.rs` in favor of just inlining its contents to `impl_coordinator_channel.rs`. In future changes, the relationship between `impl_set_plan.rs` and `impl_coordinator_channel.rs` will get more complex, increasing the function signature `impl_set_plan.rs` exposes to `impl_coordinator_channel.rs`. This proves that the split between those two files does not make sense, as they have never been able to evolve independently, so we may as well just not pay the price of a complex function signature in between.
4673b41 to
f3e3e50
Compare
00eeb5b to
62e00ac
Compare
This is a preparatory step towards: - #377 This is one PR from the following stack of PRs: - #422 - #424 - #416 - #425 - #426 <- you are here - #427 - #432 `distributed.rs` contains the `DistributedExec` node, which has evolved towards acting as a "coordinator". It's in charge of assigning tasks to worker URLs, setting the subplans in the appropriate workers, collecting metrics, streaming work units, etc... Soon, it will evolve even more as we prepare for adaptative query execution. This PR ships two things: - A refactor that dismantles the old `distributed.rs` into smaller reusable modules in the `coordinator/` module. - Bypass the metrics collection machinery if metrics collection is disabled
ca2d4a9 to
3c7c85e
Compare
gene-bordegaray
left a comment
There was a problem hiding this comment.
just one follow up on a thread but I am using this in a brnach haha 😆
| fn execute(&self, partition: usize) -> Result<BoxStream<'static, Result<RecordBatch>>> { | ||
| let mut request = self.request_template.clone(); | ||
| request.target_partition_start = partition as u64; | ||
| request.target_partition_end = (partition + 1) as u64; |
There was a problem hiding this comment.
Yeah, the range matters for remote because one request owns multiple target partitions, but I think that the local request to just the partition being executed makes it so unrequested local partitions never get the drop wrapper that decrements num_partitions_remaining.
Could we either make the local connection own the full range or mark the unstarted local partitions done on connection drop?
Add local_connections_used metric Factor out OnceLockResult
bf7cadc to
ad92147
Compare
This is one PR from the following stack of PRs: - #427 - #469 <- you are here - #461 - #462 - #463 - #464 - #432 This PR factors out common recursion patterns used in the distributed planner into reusable helpers, reducing complexity in critical parts of the codebase. By centralizing recursion logic, we improve code maintainability and reduce the risk of subtle bugs in recursive tree traversals.
This is one PR from the following stack of PRs: - #427 - #469 - #467 <- you are here ## Motivation Part of #460 (DataFusion v54 upgrade). DataFusion [#21351](apache/datafusion#21351) introduced **work-stealing** for `FileScanConfig`: instead of each partition scanning a fixed, pre-assigned set of files, idle partitions can steal pending file work from a shared queue at execution time. This means the mapping *partition → files* is no longer determined at planning time. `PartitionIsolatorExec` was built on exactly that assumption: it split the leaf `DataSourceExec` into `task_count × original_partitions` file groups at planning time and selected the right slice for each task via `task_index` at execution. With dynamic file assignment, this selection is no longer valid. ## Solution — `DistributedLeafExec` Replace `PartitionIsolatorExec` with `DistributedLeafExec`, a new wrapper that bundles one **pre-built per-task variant** of the leaf plan for each distributed task. The task spawner swaps in the correct variant before the plan is serialised and sent to a worker — no runtime selection by `task_index` needed. ### Before (`PartitionIsolatorExec`) ``` planning time worker (task 1 of 3) ───────────────────────────── ───────────────────────────── DataSourceExec DataSourceExec file_groups = [f0, f1, f2, file_groups = [f0, f1, f2, f3, f4, f5, f3, f4, f5, f6, f7, f8] f6, f7, f8] ↓ wrapped by ↓ selects slice [f3..f5] PartitionIsolatorExec at execution via task_index tasks=3 partitions=3 ``` `PartitionIsolatorExec` used `task_index` at execution to pick the right slice. With work-stealing this slice is meaningless: partition 0 may end up scanning `f7`, not `f3`. ### After (`DistributedLeafExec`) ``` planning time task spawner (before send) worker (task 1 of 3) ───────────────────────────── ────────────────────────── ───────────────────── DistributedLeafExec pick variants[1] DataSourceExec original: DataSourceExec ─────────────────────────────► file_groups = file_groups = [f0..f8] [f3, f4, f5] variants: [0]: DataSourceExec file_groups=[f0,f1,f2] [1]: DataSourceExec file_groups=[f3,f4,f5] [2]: DataSourceExec file_groups=[f6,f7,f8] ``` Each per-task `DataSourceExec` has its own, isolated file groups decided at planning time. The task spawner replaces `DistributedLeafExec` with the right variant before serialising, so the worker receives a plain `DataSourceExec` — compatible with DataFusion's work-stealing scheduler. ## What changed - **`PartitionIsolatorExec`** removed entirely. - **`DistributedLeafExec`** introduced: holds `original` (for planning-time properties and single-task execution) + `variants` (one per task). - **`FileScanConfigTaskEstimator::scale_up_leaf_node`** now returns a `DistributedLeafExec` wrapping the pre-split variants instead of a `PartitionIsolatorExec`. - Task spawner replaces `DistributedLeafExec` with the per-task variant before serialisation (same location that already handled `ChildrenIsolatorUnionExec`). - Stage metrics rewriter updated to traverse `ChildrenIsolatorUnionExec` correctly (per-task node offsets) and to make `DistributedLeafExec` transparent for metrics lookups. - All snapshot tests and integration tests updated. ## Performance 0 Impact on TPCH and Clickbench, but getting some good numbers on TPC-DS: <details><summary>TOTAL: prev=264135.349226 ms, new=231281.42632199993 ms, diff=1.14 faster ✔</summary> ``` q1: prev= 677 ms, new= 489 ms, diff=1.38 faster ✅ q2: prev= 455 ms, new= 433 ms, diff=1.05 faster ✔ q3: prev= 267 ms, new= 287 ms, diff=1.07 slower ✖ q4: prev=1418 ms, new=1360 ms, diff=1.04 faster ✔ q5: prev= 658 ms, new= 481 ms, diff=1.37 faster ✅ q6: prev=1013 ms, new= 751 ms, diff=1.35 faster ✅ q7: prev= 394 ms, new= 343 ms, diff=1.15 faster ✔ q8: prev= 417 ms, new= 459 ms, diff=1.10 slower ✖ q9: prev= 369 ms, new= 370 ms, diff=1.00 slower ✖ q10: prev=1028 ms, new= 785 ms, diff=1.31 faster ✅ q11: prev=1142 ms, new=1125 ms, diff=1.02 faster ✔ q12: prev= 248 ms, new= 216 ms, diff=1.15 faster ✔ q13: prev= 739 ms, new= 612 ms, diff=1.21 faster ✅ q14: prev= 990 ms, new= 813 ms, diff=1.22 faster ✅ q15: prev= 262 ms, new= 159 ms, diff=1.65 faster ✅ q16: prev= 741 ms, new= 467 ms, diff=1.59 faster ✅ q17: prev= 412 ms, new= 280 ms, diff=1.47 faster ✅ q18: prev= 354 ms, new= 317 ms, diff=1.12 faster ✔ q19: prev= 421 ms, new= 282 ms, diff=1.49 faster ✅ q20: prev= 366 ms, new= 167 ms, diff=2.19 faster ✅ q21: prev= 560 ms, new= 339 ms, diff=1.65 faster ✅ q22: prev= 685 ms, new= 556 ms, diff=1.23 faster ✅ q23: prev= 823 ms, new= 634 ms, diff=1.30 faster ✅ q24: prev= 759 ms, new= 518 ms, diff=1.47 faster ✅ q25: prev= 304 ms, new= 221 ms, diff=1.38 faster ✅ q26: prev= 201 ms, new= 184 ms, diff=1.09 faster ✔ q27: prev= 378 ms, new= 481 ms, diff=1.27 slower ❌ q28: prev= 288 ms, new= 198 ms, diff=1.45 faster ✅ q29: prev= 347 ms, new= 314 ms, diff=1.11 faster ✔ q31: prev= 381 ms, new= 266 ms, diff=1.43 faster ✅ q32: prev= 304 ms, new= 225 ms, diff=1.35 faster ✅ q33: prev= 370 ms, new= 319 ms, diff=1.16 faster ✔ q34: prev= 433 ms, new= 291 ms, diff=1.49 faster ✅ q35: prev= 814 ms, new= 709 ms, diff=1.15 faster ✔ q36: prev=1104 ms, new= 507 ms, diff=2.18 faster ✅ q37: prev= 505 ms, new= 437 ms, diff=1.16 faster ✔ q38: prev= 342 ms, new= 386 ms, diff=1.13 slower ✖ q39: prev= 367 ms, new= 417 ms, diff=1.14 slower ✖ q40: prev= 448 ms, new= 452 ms, diff=1.01 slower ✖ q41: prev= 110 ms, new= 130 ms, diff=1.18 slower ✖ q42: prev= 148 ms, new= 183 ms, diff=1.24 slower ❌ q43: prev= 208 ms, new= 189 ms, diff=1.10 faster ✔ q44: prev= 234 ms, new= 271 ms, diff=1.16 slower ✖ q45: prev= 332 ms, new= 229 ms, diff=1.45 faster ✅ q46: prev= 656 ms, new= 486 ms, diff=1.35 faster ✅ q47: prev= 491 ms, new= 413 ms, diff=1.19 faster ✔ q48: prev= 570 ms, new= 480 ms, diff=1.19 faster ✔ q49: prev= 393 ms, new= 344 ms, diff=1.14 faster ✔ q50: prev= 538 ms, new= 405 ms, diff=1.33 faster ✅ q51: prev= 265 ms, new= 246 ms, diff=1.08 faster ✔ q52: prev= 139 ms, new= 178 ms, diff=1.28 slower ❌ q53: prev= 354 ms, new= 239 ms, diff=1.48 faster ✅ q54: prev= 497 ms, new= 408 ms, diff=1.22 faster ✅ q55: prev= 221 ms, new= 155 ms, diff=1.43 faster ✅ q56: prev= 438 ms, new= 359 ms, diff=1.22 faster ✅ q57: prev= 422 ms, new= 363 ms, diff=1.16 faster ✔ q58: prev= 340 ms, new= 389 ms, diff=1.14 slower ✖ q59: prev= 392 ms, new= 392 ms, diff=1.00 slower ✖ q60: prev= 446 ms, new= 350 ms, diff=1.27 faster ✅ q61: prev=1610 ms, new= 958 ms, diff=1.68 faster ✅ q62: prev= 713 ms, new= 722 ms, diff=1.01 slower ✖ q63: prev= 243 ms, new= 213 ms, diff=1.14 faster ✔ q64: prev=1411 ms, new=1253 ms, diff=1.13 faster ✔ q65: prev= 291 ms, new= 315 ms, diff=1.08 slower ✖ q66: prev= 796 ms, new= 716 ms, diff=1.11 faster ✔ q67: prev= 459 ms, new= 511 ms, diff=1.11 slower ✖ q68: prev= 481 ms, new= 441 ms, diff=1.09 faster ✔ q69: prev= 746 ms, new= 558 ms, diff=1.34 faster ✅ q70: prev= 471 ms, new= 497 ms, diff=1.06 slower ✖ q71: prev= 472 ms, new= 307 ms, diff=1.54 faster ✅ q72: prev=5412 ms, new=5622 ms, diff=1.04 slower ✖ q73: prev= 239 ms, new= 319 ms, diff=1.33 slower ❌ q74: prev= 660 ms, new= 687 ms, diff=1.04 slower ✖ q75: prev= 599 ms, new= 482 ms, diff=1.24 faster ✅ q76: prev=1002 ms, new= 341 ms, diff=2.94 faster ✅ q77: prev= 442 ms, new= 366 ms, diff=1.21 faster ✅ q78: prev= 310 ms, new= 361 ms, diff=1.16 slower ✖ q79: prev= 325 ms, new= 292 ms, diff=1.11 faster ✔ q80: prev= 438 ms, new= 420 ms, diff=1.04 faster ✔ q81: prev= 315 ms, new= 306 ms, diff=1.03 faster ✔ q82: prev= 346 ms, new= 340 ms, diff=1.02 faster ✔ q83: prev= 332 ms, new= 352 ms, diff=1.06 slower ✖ q84: prev= 292 ms, new= 277 ms, diff=1.05 faster ✔ q85: prev= 531 ms, new= 547 ms, diff=1.03 slower ✖ q86: prev= 214 ms, new= 156 ms, diff=1.37 faster ✅ q87: prev= 267 ms, new= 368 ms, diff=1.38 slower ❌ q88: prev= 468 ms, new= 504 ms, diff=1.08 slower ✖ q89: prev= 228 ms, new= 279 ms, diff=1.22 slower ❌ q90: prev= 212 ms, new= 268 ms, diff=1.26 slower ❌ q91: prev= 535 ms, new= 498 ms, diff=1.07 faster ✔ q92: prev= 254 ms, new= 291 ms, diff=1.15 slower ✖ q93: prev= 210 ms, new= 270 ms, diff=1.29 slower ❌ q94: prev= 407 ms, new= 355 ms, diff=1.15 faster ✔ q95: prev= 412 ms, new= 364 ms, diff=1.13 faster ✔ q96: prev= 266 ms, new= 330 ms, diff=1.24 slower ❌ q97: prev= 208 ms, new= 243 ms, diff=1.17 slower ✖ q98: prev= 213 ms, new= 213 ms, diff=1.00 slower ✖ q99: prev= 964 ms, new=1008 ms, diff=1.05 slower ✖ ``` </details>
This is one PR from the following stack of PRs:
This is a preparatory step towards:
This is an optimization that allows workers to communicate in-memory avoiding network calls and serialization in case it needs to communicate to itself.
This optimization shows good improvements if using a small number of workers, and fades away as more workers are used.
Even if this shows improvements today, it will become very meaningful in adaptative query execution: there will be times when two consecutive stages are assigned 1 task each, and as that was done dynamically, we cannot eagerly do the optimization that collapses those two stages into one, so instead, we assigned both stages to the same worker, and the optimization in this PR kicks in, executing the plan fully locally.