Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,31 @@ pub const BALLISTA_COALESCE_SMALL_PARTITION_FACTOR: &str =
pub const BALLISTA_COALESCE_MERGED_PARTITION_FACTOR: &str =
"ballista.planner.coalesce.merged_partition_factor";

/// Configuration key to enable AQE optimize-skewed-join rule (Spark's
/// `OptimizeSkewedJoin`). Disabled by default — opt in for workloads where
/// per-key skew creates straggler tasks on join inputs.
pub const BALLISTA_SKEW_JOIN_ENABLED: &str = "ballista.planner.skew_join.enabled";
/// Multiplier over the per-side median below which a partition is not
/// considered skewed. Mirrors Spark's
/// `spark.sql.adaptive.skewJoin.skewedPartitionFactor`.
pub const BALLISTA_SKEW_JOIN_SKEWED_PARTITION_FACTOR: &str =
"ballista.planner.skew_join.skewed_partition_factor";
/// Absolute byte threshold below which a partition is never split, even when
/// the ratio-to-median qualifies. Mirrors Spark's
/// `spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes`.
pub const BALLISTA_SKEW_JOIN_SKEWED_PARTITION_THRESHOLD_BYTES: &str =
"ballista.planner.skew_join.skewed_partition_threshold_bytes";
/// Advisory target sub-shard byte size used by the bin-packer when splitting
/// a skewed partition's per-mapper outputs. Decoupled from the coalesce
/// target so the two rules can be tuned independently.
pub const BALLISTA_SKEW_JOIN_ADVISORY_PARTITION_BYTES: &str =
"ballista.planner.skew_join.advisory_partition_bytes";
/// Tail-merge factor: a trailing sub-shard whose size is below
/// `small_partition_factor * advisory` folds back into its predecessor
/// (Spark's `splitSizeListByTargetSize` legacy).
pub const BALLISTA_SKEW_JOIN_SMALL_PARTITION_FACTOR: &str =
"ballista.planner.skew_join.small_partition_factor";

/// Result type for configuration parsing operations.
pub type ParseResult<T> = result::Result<T, String>;
use std::sync::LazyLock;
Expand Down Expand Up @@ -220,6 +245,47 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
DataType::Float64,
Some("1.2".to_string()),
),
ConfigEntry::new(
BALLISTA_SKEW_JOIN_ENABLED.to_string(),
"Enables the AQE optimize-skewed-join rule (Spark's \
OptimizeSkewedJoin). Disabled by default — opt in for workloads \
where per-key skew creates straggler tasks on join inputs."
.to_string(),
DataType::Boolean,
Some(false.to_string()),
),
ConfigEntry::new(
BALLISTA_SKEW_JOIN_SKEWED_PARTITION_FACTOR.to_string(),
"Multiplier over per-side median below which a partition is not \
considered skewed (Spark's skewedPartitionFactor)."
.to_string(),
DataType::Float64,
Some("5.0".to_string()),
),
ConfigEntry::new(
BALLISTA_SKEW_JOIN_SKEWED_PARTITION_THRESHOLD_BYTES.to_string(),
"Absolute byte threshold below which a partition is never split \
(Spark's skewedPartitionThresholdInBytes)."
.to_string(),
DataType::UInt64,
Some((256 * 1024 * 1024_usize).to_string()),
),
ConfigEntry::new(
BALLISTA_SKEW_JOIN_ADVISORY_PARTITION_BYTES.to_string(),
"Advisory target sub-shard byte size used by the bin-packer when \
splitting a skewed partition."
.to_string(),
DataType::UInt64,
Some((64 * 1024 * 1024_usize).to_string()),
),
ConfigEntry::new(
BALLISTA_SKEW_JOIN_SMALL_PARTITION_FACTOR.to_string(),
"Tail-merge factor for the skew-join sub-shard bin-packer \
(Spark legacy)."
.to_string(),
DataType::Float64,
Some("0.2".to_string()),
),
];
entries
.into_iter()
Expand Down Expand Up @@ -450,6 +516,35 @@ impl BallistaConfig {
self.get_float_setting(BALLISTA_COALESCE_MERGED_PARTITION_FACTOR)
}

/// Returns whether the AQE optimize-skewed-join rule is enabled.
pub fn skew_join_enabled(&self) -> bool {
self.get_bool_setting(BALLISTA_SKEW_JOIN_ENABLED)
}

/// Returns the skewed-partition factor (Spark's
/// `skewedPartitionFactor`).
pub fn skew_join_skewed_partition_factor(&self) -> f64 {
self.get_float_setting(BALLISTA_SKEW_JOIN_SKEWED_PARTITION_FACTOR)
}

/// Returns the absolute skewed-partition byte threshold
/// (Spark's `skewedPartitionThresholdInBytes`).
pub fn skew_join_skewed_partition_threshold_bytes(&self) -> u64 {
self.get_usize_setting(BALLISTA_SKEW_JOIN_SKEWED_PARTITION_THRESHOLD_BYTES)
as u64
}

/// Returns the advisory target sub-shard byte size used by the bin-packer
/// when splitting a skewed partition.
pub fn skew_join_advisory_partition_bytes(&self) -> u64 {
self.get_usize_setting(BALLISTA_SKEW_JOIN_ADVISORY_PARTITION_BYTES) as u64
}

/// Returns the tail-merge factor for the skew-join sub-shard bin-packer.
pub fn skew_join_small_partition_factor(&self) -> f64 {
self.get_float_setting(BALLISTA_SKEW_JOIN_SMALL_PARTITION_FACTOR)
}

/// Should client employ pull or push job tracking strategy
pub fn client_pull(&self) -> bool {
self.get_bool_setting(BALLISTA_CLIENT_PULL)
Expand Down
4 changes: 3 additions & 1 deletion ballista/core/src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use std::path::{Path, PathBuf};
use datafusion::common::exec_err;
pub use distributed_explain_analyze::DistributedExplainAnalyzeExec;
pub use distributed_query::DistributedQueryExec;
pub use shuffle_reader::{CoalescePlan, PartitionGroup, ShuffleReaderExec};
pub use shuffle_reader::{
CoalescePlan, PartitionGroup, ShuffleReaderExec, SkewJoinPlan, SkewJoinShard,
};
pub use shuffle_reader::{stats_for_partition, stats_for_partitions};
pub use shuffle_writer::DEFAULT_SHUFFLE_CHANNEL_CAPACITY;
pub use shuffle_writer::ShuffleWriterExec;
Expand Down
114 changes: 114 additions & 0 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,44 @@ pub struct PartitionGroup {
pub upstream_indices: Vec<u32>,
}

/// Skew-join plan attached to the two leaf `ShuffleReaderExec`s of a join
/// stage by the AQE `OptimizeSkewedJoinRule`.
///
/// Spark analog: the rewrite that replaces both legs' `AQEShuffleReadExec`
/// with a paired list of `PartialReducerPartitionSpec`s. K' (the post-rewrite
/// partition count) equals `shards.len()` and matches across legs by
/// construction — the planner builds matched join inputs by zipping the two
/// legs' shard lists.
///
/// Note: `Default` is intentionally NOT derived. Callers must construct
/// explicitly to keep "absent skew-join rewrite" (`Option::None`) semantically
/// distinct from "empty plan".
#[derive(Debug, Clone, PartialEq)]
pub struct SkewJoinPlan {
/// Original upstream partition count (M) before the rewrite. Same value
/// on both legs of the join (alignment invariant; enforced by the rule).
pub upstream_partition_count: u32,
/// Per-output-partition shard descriptors. Length K' is the same on both
/// legs after rewrite.
pub shards: Vec<SkewJoinShard>,
}

/// One output partition's read window for the skew-join rewrite.
///
/// Spark analog: `PartialReducerPartitionSpec(reducerIndex, startMapIndex,
/// endMapIndex, dataSize)`. A non-split (passthrough) shard uses
/// `start_map_idx = 0` and `end_map_idx = num_maps_for_idx`, equivalent to
/// reading the full mapper-output range for that reducer.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SkewJoinShard {
/// Upstream partition (= original reducer index `i`) this shard reads from.
pub upstream_idx: u32,
/// Inclusive lower bound of the upstream mapper-output index range to read.
pub start_map_idx: u32,
/// Exclusive upper bound of the upstream mapper-output index range to read.
pub end_map_idx: u32,
}

/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
/// being executed by an executor
#[derive(Debug, Clone)]
Expand All @@ -117,6 +155,14 @@ pub struct ShuffleReaderExec {
/// is responsible for pre-concatenating the M-shape upstream
/// `Vec<Vec<PartitionLocation>>` into K-shape before invoking `try_new_coalesced`.
pub coalesce: Option<CoalescePlan>,
/// Optional skew-join metadata produced by `OptimizeSkewedJoinRule`.
/// `None` means the reader is not part of a skew-join rewrite. When `Some`,
/// `partition.len()` equals `skew_join.shards.len()` (= K', the post-rewrite
/// partition count); the rule/adapter is responsible for slicing each
/// upstream `Vec<PartitionLocation>` to the corresponding shard's
/// `[start_map_idx, end_map_idx)` window before invoking `try_new_skew_join`.
/// Mutually exclusive with `coalesce` by construction.
pub skew_join: Option<SkewJoinPlan>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
properties: Arc<PlanProperties>,
Expand Down Expand Up @@ -147,6 +193,7 @@ impl ShuffleReaderExec {
broadcast: false,
upstream_partition_count,
coalesce: None,
skew_join: None,
metrics: ExecutionPlanMetricsSet::new(),
properties,
work_dir: None, // to be updated at the executor side
Expand Down Expand Up @@ -177,6 +224,7 @@ impl ShuffleReaderExec {
broadcast: true,
upstream_partition_count,
coalesce: None,
skew_join: None,
metrics: ExecutionPlanMetricsSet::new(),
properties,
work_dir: None, // to be updated at the executor side
Expand Down Expand Up @@ -228,6 +276,61 @@ impl ShuffleReaderExec {
broadcast: false,
upstream_partition_count,
coalesce: Some(coalesce),
skew_join: None,
metrics: ExecutionPlanMetricsSet::new(),
properties,
work_dir: None, // to be updated at the executor side
client_pool: None, // to be updated at the executor side
})
}

/// Create a new skew-join ShuffleReaderExec.
///
/// `partition` MUST be the K'-shape `Vec<Vec<PartitionLocation>>` produced
/// by the adapter: each output index `idx` in `0..K'` holds the
/// PartitionLocations of `skew_join.shards[idx].upstream_idx` sliced to
/// the `[start_map_idx, end_map_idx)` window for that shard. Passthrough
/// (non-split) shards take the full upstream location list.
/// `partitioning.partition_count()` MUST equal `K'`.
///
/// Spark analog: `ShuffledRowRDD` reading from a `PartialReducerPartitionSpec`
/// list — each downstream task fetches blocks only from the mapper indices
/// in its shard's window.
///
/// In debug builds this constructor asserts the shape invariants to catch
/// rule/adapter-side mistakes early.
pub fn try_new_skew_join(
stage_id: usize,
partition: Vec<Vec<PartitionLocation>>,
skew_join: SkewJoinPlan,
schema: SchemaRef,
partitioning: Partitioning,
) -> Result<Self> {
debug_assert_eq!(
partition.len(),
skew_join.shards.len(),
"K'-shape partition vector length must equal skew_join.shards.len()",
);
debug_assert_eq!(
partitioning.partition_count(),
skew_join.shards.len(),
"partitioning.partition_count() must equal skew_join.shards.len() (= K')",
);
let upstream_partition_count = skew_join.upstream_partition_count as usize;
let properties = Arc::new(PlanProperties::new(
datafusion::physical_expr::EquivalenceProperties::new(schema.clone()),
partitioning,
datafusion::physical_plan::execution_plan::EmissionType::Incremental,
datafusion::physical_plan::execution_plan::Boundedness::Bounded,
));
Ok(Self {
stage_id,
schema,
partition,
broadcast: false,
upstream_partition_count,
coalesce: None,
skew_join: Some(skew_join),
metrics: ExecutionPlanMetricsSet::new(),
properties,
work_dir: None, // to be updated at the executor side
Expand All @@ -244,6 +347,7 @@ impl ShuffleReaderExec {
broadcast: self.broadcast,
upstream_partition_count: self.upstream_partition_count,
coalesce: self.coalesce.clone(),
skew_join: self.skew_join.clone(),
metrics: self.metrics.clone(),
properties: self.properties.clone(),
work_dir: Some(work_dir),
Expand All @@ -259,6 +363,7 @@ impl ShuffleReaderExec {
broadcast: self.broadcast,
upstream_partition_count: self.upstream_partition_count,
coalesce: self.coalesce.clone(),
skew_join: self.skew_join.clone(),
metrics: self.metrics.clone(),
properties: self.properties.clone(),
work_dir: self.work_dir.clone(),
Expand Down Expand Up @@ -295,6 +400,14 @@ impl DisplayAs for ShuffleReaderExec {
c.upstream_partition_count,
)?;
}
if let Some(sj) = &self.skew_join {
write!(
f,
", skew_join: {} of {}",
sj.shards.len(),
sj.upstream_partition_count,
)?;
}
Ok(())
}
}
Expand Down Expand Up @@ -337,6 +450,7 @@ impl ExecutionPlan for ShuffleReaderExec {
broadcast: self.broadcast,
upstream_partition_count: self.upstream_partition_count,
coalesce: self.coalesce.clone(),
skew_join: self.skew_join.clone(),
metrics: ExecutionPlanMetricsSet::new(),
properties: self.properties.clone(),
work_dir: self.work_dir.clone(),
Expand Down
Loading