From a233246394d65fd7dd984a858f79b6f4f01bef74 Mon Sep 17 00:00:00 2001 From: Shehab Ali Date: Wed, 27 May 2026 09:35:58 -0400 Subject: [PATCH] Add global rebalancing in file splitting between tasks --- .vscode/settings.json | 1 + src/distributed_planner/task_estimator.rs | 42 +++++++++++++++++++---- 2 files changed, 37 insertions(+), 6 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/src/distributed_planner/task_estimator.rs b/src/distributed_planner/task_estimator.rs index 60534e3a..da7e6eeb 100644 --- a/src/distributed_planner/task_estimator.rs +++ b/src/distributed_planner/task_estimator.rs @@ -279,17 +279,31 @@ impl TaskEstimator for FileScanConfigTaskEstimator { let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref()?; let mut new_file_scan = file_scan.clone(); - new_file_scan.file_groups.clear(); - for file_group in file_scan.file_groups.clone() { - new_file_scan - .file_groups - .extend(file_group.split_files(task_count)); - } + let input_group_count = file_scan.file_groups.len().max(1); + let all_partitioned_files = file_scan + .file_groups + .iter() + .flat_map(|file_group| file_group.iter().cloned()) + .collect::>(); + let file_groups = + rebalance_round_robin(all_partitioned_files, input_group_count * task_count); + new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect(); let plan = DataSourceExec::from_data_source(new_file_scan); Some(Arc::new(PartitionIsolatorExec::new(plan, task_count))) } } +fn rebalance_round_robin(items: Vec, target_groups: usize) -> Vec> { + let target_groups = target_groups.min(items.len()); + let mut groups = (0..target_groups) + .map(|_| Vec::new()) + .collect::>>(); + for (idx, item) in items.into_iter().enumerate() { + groups[idx % target_groups].push(item); + } + groups +} + /// Tries multiple user-provided [TaskEstimator]s until one returns an estimation. If none /// returns an estimation, a set of default [TaskEstimation] implementations is tried. Right /// now the only default [TaskEstimation] is [FileScanConfigTaskEstimator]. @@ -393,6 +407,22 @@ mod tests { Ok(()) } + #[test] + fn test_rebalance_round_robin_fixes_group_boundary_skew() { + let items = (0..8).collect::>(); + let groups = rebalance_round_robin(items, 5); + let sizes = groups.iter().map(Vec::len).collect::>(); + assert_eq!(sizes, vec![2, 2, 2, 1, 1]); + } + + #[test] + fn test_rebalance_round_robin_caps_partitions_to_file_count() { + let items = vec![10, 20, 30]; + let groups = rebalance_round_robin(items, 5); + let sizes = groups.iter().map(Vec::len).collect::>(); + assert_eq!(sizes, vec![1, 1, 1]); + } + impl CombinedTaskEstimator { fn push(&mut self, value: impl TaskEstimator + Send + Sync + 'static) { self.user_provided.push(Arc::new(value));