Skip to content

Rebalance files globally across the partitions produced by FileScanConfigTaskEstimator#450

Open
shehab-ali wants to merge 1 commit into
datafusion-contrib:mainfrom
shehab-ali:sa/partition-task
Open

Rebalance files globally across the partitions produced by FileScanConfigTaskEstimator#450
shehab-ali wants to merge 1 commit into
datafusion-contrib:mainfrom
shehab-ali:sa/partition-task

Conversation

@shehab-ali
Copy link
Copy Markdown

@shehab-ali shehab-ali commented May 11, 2026

Motivation

Issue #310 highlights that scaling FileScanConfig by splitting each existing file group independently can produce uneven and unintuitive partition layouts. In particular, per-group splitting can overfragment or underalign work depending on original group boundaries.

Summary

Rebalance files globally across the partitions produced by FileScanConfigTaskEstimator::scale_up_leaf_node, instead of splitting each input file group independently. This fixes group-boundary skew where the per-group split_files(task_count) call could leave some partitions with one fewer file than others when the file count per group is not evenly divisible by task_count.

FileScanConfigTaskEstimator::scale_up_leaf_node is responsible for inflating the partition count of a leaf DataSourceExec so it can be sliced across task_count distributed tasks by PartitionIsolatorExec. Previously, each input file group was split independently:

for file_group in file_scan.file_groups.clone() {
    new_file_scan.file_groups.extend(file_group.split_files(task_count));
}

When a file group's size was not divisible by task_count, the remainder fell entirely on the earlier sub-groups. With many input file groups, this skew compounded — some downstream tasks ended up with consistently more files than others, leaving workers idle at the tail of each stage.

Change

Flatten all files across all input groups, then round-robin them into N × task_count output groups, where N is the original input group count. This preserves the total fan-out that PartitionIsolatorExec expects while smoothing skew across group boundaries.

let input_group_count = file_scan.file_groups.len().max(1);
let all_partitioned_files = file_scan
    .file_groups
    .iter()
    .flat_map(|file_group| file_group.iter().cloned())
    .collect::<Vec<_>>();
let file_groups =
    rebalance_round_robin(all_partitioned_files, input_group_count * task_count);
new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect();

rebalance_round_robin caps its target group count at items.len() so we don't emit empty partitions when there are fewer files than the requested fan-out.

Benchmark Results — TPCH sf10

Run on the remote EC2 cluster (16 workers), 3 iterations + 1 warmup per query, default flags.

Query main (ms) This PR (ms) Δ
q1 1571 1380 1.14× faster
q2 569 514 1.11× faster
q3 958 1038 1.08× slower
q4 662 643 1.03× faster
q5 1231 1205 1.02× faster
q6 749 958 1.28× slower
q7 1047 1272 1.21× slower
q8 2668 1833 1.46× faster
q9 1787 1893 1.06× slower
q10 958 1047 1.09× slower
q11 355 273 1.30× faster
q12 715 908 1.27× slower
q13 804 840 1.04× slower
q14 931 886 1.05× faster
q15 896 1119 1.25× slower
q16 256 334 1.30× slower
q17 1517 1773 1.17× slower
q18 1336 1544 1.16× slower
q19 1006 908 1.11× faster
q20 1089 787 1.38× faster
q21 1438 1443 parity
q22 242 323 1.33× slower
TOTAL 68,385 68,796 0.6% slower (within noise)

Overall TPCH sf10 throughput is at parity with main. Individual queries shift within ±30%, but no systemic regression.

Test Plan

  • cargo test -p datafusion-distributed task_estimator : new unit tests cover (a) group-boundary skew rebalancing and (b) the cap-to-file-count edge case
  • TPCH sf10 on remote EC2 cluster: parity with main (table above)
  • Visual inspection of generated plans on a scan-heavy query (q1) confirms partition count is preserved

Notes / limitations

  • This PR focuses on balancing and file-count capping behavior.
  • It does not yet introduce additional safeguards for preserving scan partition semantics or cross-side co-partitioning guarantees for partitioned joins; those may require follow-up work.

@shehab-ali shehab-ali marked this pull request as ready for review May 12, 2026 19:53
Comment on lines +282 to +288
let all_partitioned_files = file_scan
.file_groups
.iter()
.flat_map(|file_group| file_group.iter().cloned())
.collect::<Vec<_>>();
let file_groups = rebalance_round_robin(all_partitioned_files, task_count);
new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! sounds like a good thing to try. Can you try running the benchmarks for this and report the outcome?

@shehab-ali shehab-ali changed the title Add global rebalancing in file splitting between tasks Rebalance files globally across the partitions produced by FileScanConfigTaskEstimator May 27, 2026
Comment on lines +283 to +290
let all_partitioned_files = file_scan
.file_groups
.iter()
.flat_map(|file_group| file_group.iter().cloned())
.collect::<Vec<_>>();
let file_groups =
rebalance_round_robin(all_partitioned_files, input_group_count * task_count);
new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One requirement for this type of changes is that they should prove through the benchmarks that they indeed bring performance benefits, but this change does not seem to be meeting this criteria.

Maybe there are opportunities for further re-splitting PartitionedFiles?

Also, you might want to try ClickBench instead, which has a greater number of files VS TPC-H or TPC-DS, you probably would get some better numbers there.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact that this PR shows 0 diffs in the plan snapshots makes me thing that it might be collaterally doing exactly the same thing as the previous code.

Maybe that's the reason why you see no performance impact?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll investigate if we can get rid of these regressions in some of the queries.

I ran ClickBench (also added to the description)
Totals: PR = 155,530 ms; main = 162,770 ms → PR is ~4.5% faster overall.
Notable PR wins (main slower):
q2: 415 → 1592 ms (3.84×)
q1: 458 → 973 ms (2.12×)
q28: 3993 → 6089 ms (1.52×)
q9: 689 → 925 ms (1.34×)

Notable PR regressions (main faster):
q20: 3831 vs 2831 ms (1.35×)
q24: 824 vs 610 ms (1.35×)
q25: 743 vs 546 ms (1.36×)
q37: 340 vs 233 ms (1.46×)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants