Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
42 changes: 36 additions & 6 deletions src/distributed_planner/task_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,31 @@ impl TaskEstimator for FileScanConfigTaskEstimator {
let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref()?;

let mut new_file_scan = file_scan.clone();
new_file_scan.file_groups.clear();
for file_group in file_scan.file_groups.clone() {
new_file_scan
.file_groups
.extend(file_group.split_files(task_count));
}
let input_group_count = file_scan.file_groups.len().max(1);
let all_partitioned_files = file_scan
.file_groups
.iter()
.flat_map(|file_group| file_group.iter().cloned())
.collect::<Vec<_>>();
let file_groups =
rebalance_round_robin(all_partitioned_files, input_group_count * task_count);
new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect();
Comment on lines +283 to +290
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! sounds like a good thing to try. Can you try running the benchmarks for this and report the outcome?

Comment on lines +283 to +290
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One requirement for this type of changes is that they should prove through the benchmarks that they indeed bring performance benefits, but this change does not seem to be meeting this criteria.

Maybe there are opportunities for further re-splitting PartitionedFiles?

Also, you might want to try ClickBench instead, which has a greater number of files VS TPC-H or TPC-DS, you probably would get some better numbers there.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact that this PR shows 0 diffs in the plan snapshots makes me thing that it might be collaterally doing exactly the same thing as the previous code.

Maybe that's the reason why you see no performance impact?

Copy link
Copy Markdown
Author

@shehab-ali shehab-ali May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll investigate if we can get rid of these regressions in some of the queries.

I ran ClickBench (also added to the description)
Totals: PR = 155,530 ms; main = 162,770 ms → PR is ~4.5% faster overall.
Notable PR wins (main slower):
q2: 415 → 1592 ms (3.84×)
q1: 458 → 973 ms (2.12×)
q28: 3993 → 6089 ms (1.52×)
q9: 689 → 925 ms (1.34×)

Notable PR regressions (main faster):
q37: 340 vs 233 ms (1.46×)

let plan = DataSourceExec::from_data_source(new_file_scan);
Some(Arc::new(PartitionIsolatorExec::new(plan, task_count)))
}
}

fn rebalance_round_robin<T>(items: Vec<T>, target_groups: usize) -> Vec<Vec<T>> {
let target_groups = target_groups.min(items.len());
let mut groups = (0..target_groups)
.map(|_| Vec::new())
.collect::<Vec<Vec<T>>>();
for (idx, item) in items.into_iter().enumerate() {
groups[idx % target_groups].push(item);
}
groups
}

/// Tries multiple user-provided [TaskEstimator]s until one returns an estimation. If none
/// returns an estimation, a set of default [TaskEstimation] implementations is tried. Right
/// now the only default [TaskEstimation] is [FileScanConfigTaskEstimator].
Expand Down Expand Up @@ -393,6 +407,22 @@ mod tests {
Ok(())
}

#[test]
fn test_rebalance_round_robin_fixes_group_boundary_skew() {
let items = (0..8).collect::<Vec<_>>();
let groups = rebalance_round_robin(items, 5);
let sizes = groups.iter().map(Vec::len).collect::<Vec<_>>();
assert_eq!(sizes, vec![2, 2, 2, 1, 1]);
}

#[test]
fn test_rebalance_round_robin_caps_partitions_to_file_count() {
let items = vec![10, 20, 30];
let groups = rebalance_round_robin(items, 5);
let sizes = groups.iter().map(Vec::len).collect::<Vec<_>>();
assert_eq!(sizes, vec![1, 1, 1]);
}

impl CombinedTaskEstimator {
fn push(&mut self, value: impl TaskEstimator + Send + Sync + 'static) {
self.user_provided.push(Arc::new(value));
Expand Down
Loading