diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/src/distributed_planner/task_estimator.rs b/src/distributed_planner/task_estimator.rs index 60534e3a..a234d6f8 100644 --- a/src/distributed_planner/task_estimator.rs +++ b/src/distributed_planner/task_estimator.rs @@ -279,17 +279,31 @@ impl TaskEstimator for FileScanConfigTaskEstimator { let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref()?; let mut new_file_scan = file_scan.clone(); - new_file_scan.file_groups.clear(); - for file_group in file_scan.file_groups.clone() { - new_file_scan - .file_groups - .extend(file_group.split_files(task_count)); - } + let input_group_count = file_scan.file_groups.len().max(1); + let all_partitioned_files = file_scan + .file_groups + .iter() + .flat_map(|file_group| file_group.iter().cloned()) + .collect::>(); + let file_groups = + rebalance_round_robin(all_partitioned_files, input_group_count * task_count); + new_file_scan.file_groups = file_groups.into_iter().map(Into::into).collect(); let plan = DataSourceExec::from_data_source(new_file_scan); Some(Arc::new(PartitionIsolatorExec::new(plan, task_count))) } } +fn rebalance_round_robin(items: Vec, target_groups: usize) -> Vec> { + let target_groups = target_groups.min(items.len()); + let mut groups = (0..target_groups) + .map(|_| Vec::new()) + .collect::>>(); + for (idx, item) in items.into_iter().enumerate() { + groups[idx % target_groups].push(item); + } + groups +} + /// Tries multiple user-provided [TaskEstimator]s until one returns an estimation. If none /// returns an estimation, a set of default [TaskEstimation] implementations is tried. Right /// now the only default [TaskEstimation] is [FileScanConfigTaskEstimator]. @@ -393,6 +407,97 @@ mod tests { Ok(()) } + #[test] + fn test_rebalance_round_robin_fixes_group_boundary_skew() { + let items = (0..8).collect::>(); + let groups = rebalance_round_robin(items, 5); + let sizes = groups.iter().map(Vec::len).collect::>(); + assert_eq!(sizes, vec![2, 2, 2, 1, 1]); + } + + #[test] + fn test_rebalance_round_robin_caps_partitions_to_file_count() { + let items = vec![10, 20, 30]; + let groups = rebalance_round_robin(items, 5); + let sizes = groups.iter().map(Vec::len).collect::>(); + assert_eq!(sizes, vec![1, 1, 1]); + } + + /// End-to-end check that `scale_up_leaf_node` actually rebalances files **across** + /// input group boundaries. With 3 input groups of 5 files each and `task_count = 3`, + /// the global round-robin packing interleaves files from different input groups into + /// the same output group — something the prior per-group `split_files` approach + /// could never do, since it kept each input group's files clustered together. + #[test] + fn test_scale_up_leaf_node_rebalances_across_input_groups() { + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::datasource::listing::PartitionedFile; + use datafusion::datasource::physical_plan::{ + FileGroup, FileScanConfigBuilder, FileSource, ParquetSource, + }; + use datafusion::execution::object_store::ObjectStoreUrl; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let source: Arc = Arc::new(ParquetSource::new(Arc::clone(&schema))); + let url = ObjectStoreUrl::parse("file:///").unwrap(); + let mut builder = FileScanConfigBuilder::new(url, source); + for g in 0..3 { + let files: Vec = (0..5) + .map(|i| PartitionedFile::new(format!("g{}/f{}.parquet", g, i), 1024)) + .collect(); + builder = builder.with_file_group(FileGroup::new(files)); + } + let scan = builder.build(); + let plan: Arc = DataSourceExec::from_data_source(scan); + + let cfg = ConfigOptions::default(); + let scaled = FileScanConfigTaskEstimator + .scale_up_leaf_node(&plan, 3, &cfg) + .expect("scale_up should produce a plan"); + + let isolator = scaled + .as_any() + .downcast_ref::() + .expect("expected PartitionIsolatorExec wrapper"); + let inner = isolator + .children() + .first() + .map(|c| Arc::clone(*c)) + .unwrap(); + let dse: &DataSourceExec = inner.as_any().downcast_ref().unwrap(); + let file_scan: &FileScanConfig = dse.data_source().as_any().downcast_ref().unwrap(); + + let groups: Vec> = file_scan + .file_groups + .iter() + .map(|g| { + g.iter() + .map(|f| f.object_meta.location.to_string()) + .collect() + }) + .collect(); + + // 15 files round-robin into 3 * 3 = 9 output groups -> sizes [2,2,2,2,2,2,1,1,1]. + assert_eq!(groups.len(), 9, "expected 9 output groups, got {:?}", groups); + let total: usize = groups.iter().map(Vec::len).sum(); + assert_eq!(total, 15); + + // Each of the first 5 output groups must contain files from at least two + // distinct input groups -- this is the cross-group rebalancing that the prior + // per-group `split_files` algorithm could not produce. + let prefix = |path: &str| path.split('/').next().unwrap_or("").to_string(); + for (idx, files) in groups.iter().enumerate().take(5) { + let distinct_inputs: std::collections::BTreeSet = + files.iter().map(|p| prefix(p)).collect(); + assert!( + distinct_inputs.len() >= 2, + "output group {} must mix files from multiple input groups; got {:?}", + idx, + files + ); + } + } + impl CombinedTaskEstimator { fn push(&mut self, value: impl TaskEstimator + Send + Sync + 'static) { self.user_provided.push(Arc::new(value));