Rebalance files globally across the partitions produced by FileScanConfigTaskEstimator#450
Rebalance files globally across the partitions produced by FileScanConfigTaskEstimator#450shehab-ali wants to merge 1 commit into
Conversation
| let all_partitioned_files = file_scan | ||
| .file_groups | ||
| .iter() | ||
| .flat_map(|file_group| file_group.iter().cloned()) | ||
| .collect::<Vec<_>>(); | ||
| let file_groups = rebalance_round_robin(all_partitioned_files, task_count); | ||
| new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect(); |
There was a problem hiding this comment.
Nice! sounds like a good thing to try. Can you try running the benchmarks for this and report the outcome?
9e75005 to
a233246
Compare
| let all_partitioned_files = file_scan | ||
| .file_groups | ||
| .iter() | ||
| .flat_map(|file_group| file_group.iter().cloned()) | ||
| .collect::<Vec<_>>(); | ||
| let file_groups = | ||
| rebalance_round_robin(all_partitioned_files, input_group_count * task_count); | ||
| new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect(); |
There was a problem hiding this comment.
One requirement for this type of changes is that they should prove through the benchmarks that they indeed bring performance benefits, but this change does not seem to be meeting this criteria.
Maybe there are opportunities for further re-splitting PartitionedFiles?
Also, you might want to try ClickBench instead, which has a greater number of files VS TPC-H or TPC-DS, you probably would get some better numbers there.
There was a problem hiding this comment.
I think the fact that this PR shows 0 diffs in the plan snapshots makes me thing that it might be collaterally doing exactly the same thing as the previous code.
Maybe that's the reason why you see no performance impact?
There was a problem hiding this comment.
I'll investigate if we can get rid of these regressions in some of the queries.
I ran ClickBench (also added to the description)
Totals: PR = 155,530 ms; main = 162,770 ms → PR is ~4.5% faster overall.
Notable PR wins (main slower):
q2: 415 → 1592 ms (3.84×)
q1: 458 → 973 ms (2.12×)
q28: 3993 → 6089 ms (1.52×)
q9: 689 → 925 ms (1.34×)
Notable PR regressions (main faster):
q20: 3831 vs 2831 ms (1.35×)
q24: 824 vs 610 ms (1.35×)
q25: 743 vs 546 ms (1.36×)
q37: 340 vs 233 ms (1.46×)
Motivation
Issue #310 highlights that scaling FileScanConfig by splitting each existing file group independently can produce uneven and unintuitive partition layouts. In particular, per-group splitting can overfragment or underalign work depending on original group boundaries.
Summary
Rebalance files globally across the partitions produced by
FileScanConfigTaskEstimator::scale_up_leaf_node, instead of splitting each input file group independently. This fixes group-boundary skew where the per-groupsplit_files(task_count)call could leave some partitions with one fewer file than others when the file count per group is not evenly divisible bytask_count.FileScanConfigTaskEstimator::scale_up_leaf_nodeis responsible for inflating the partition count of a leafDataSourceExecso it can be sliced acrosstask_countdistributed tasks byPartitionIsolatorExec. Previously, each input file group was split independently:When a file group's size was not divisible by task_count, the remainder fell entirely on the earlier sub-groups. With many input file groups, this skew compounded — some downstream tasks ended up with consistently more files than others, leaving workers idle at the tail of each stage.
Change
Flatten all files across all input groups, then round-robin them into N × task_count output groups, where N is the original input group count. This preserves the total fan-out that PartitionIsolatorExec expects while smoothing skew across group boundaries.
rebalance_round_robincaps its target group count at items.len() so we don't emit empty partitions when there are fewer files than the requested fan-out.Benchmark Results — TPCH sf10
Run on the remote EC2 cluster (16 workers), 3 iterations + 1 warmup per query, default flags.
Overall TPCH sf10 throughput is at parity with
main. Individual queries shift within ±30%, but no systemic regression.Test Plan
cargo test -p datafusion-distributed task_estimator: new unit tests cover (a) group-boundary skew rebalancing and (b) the cap-to-file-count edge caseNotes / limitations