fillUpAssignedJobs(
return assignedJobs;
}
+ /**
+ * Cross-machine parallelization: for each tablet / scan range of the scan nodes
+ * in this fragment, select the best replica and its hosting {@link DistributedPlanWorker}.
+ * The result groups all scan ranges by the worker that will process them.
+ *
+ * This is the first phase of the two-phase parallelization. The returned map drives
+ * the second phase ({@link #insideMachineParallelization}) where each worker's ranges
+ * are further split into individual instances.
+ *
+ * @param distributeContext the distribute context for worker selection and parallelism config
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return a map from selected worker to its {@link UninstancedScanSource} containing
+ * the raw scan ranges assigned to that worker, not yet split into instances
+ */
protected abstract Map multipleMachinesParallelization(
DistributeContext distributeContext, ListMultimap inputJobs);
+ /**
+ * Intra-machine parallelization: for each worker, split its assigned scan ranges
+ * into one or more {@link AssignedJob} instances. This is the second phase of
+ * the two-phase parallelization, following {@link #multipleMachinesParallelization}.
+ *
+ * For each worker entry, the method:
+ *
+ * - Computes the max parallelism from the scan source (e.g. tablet count).
+ * - Determines the final instance count via {@link #degreeOfParallelism},
+ * capped by the fragment's {@code parallelExecNum} and tablet count.
+ * - Splits scan ranges evenly across instances (default mode) or creates
+ * local shuffle instances that share a single scan source to add
+ * parallelism without rescanning data ({@link #assignLocalShuffleJobs}).
+ *
+ *
+ * @param workerToScanRanges map from worker to its un-instanced scan ranges,
+ * produced by {@link #multipleMachinesParallelization}
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @param distributeContext the distribute context for parallelism configuration
+ * @return the list of assigned jobs, each bound to a worker with its portion of scan ranges
+ */
protected List insideMachineParallelization(
Map workerToScanRanges,
ListMultimap inputJobs,
@@ -104,10 +169,33 @@ protected List insideMachineParallelization(
return instances;
}
+ /**
+ * Whether the fragment should use a serial source operator followed by local
+ * shuffle to add intra-machine parallelism. When true, data is first gathered
+ * through one exchange, then locally shuffled to multiple instances on the same
+ * machine, allowing parallel computation without rescanning the source data.
+ *
+ * @param distributeContext the distribute context; for load jobs, the connect
+ * context is passed as null to avoid serial source
+ * @return true if the fragment has a serial source operator and should use
+ * local shuffle to increase parallelism
+ */
protected boolean useLocalShuffleToAddParallel(DistributeContext distributeContext) {
return fragment.useSerialSource(distributeContext.isLoadJob ? null : statementContext.getConnectContext());
}
+ /**
+ * Split the given scan source evenly into {@code instanceNum} partitions and
+ * create one {@link StaticAssignedJob} per partition, all on the same worker.
+ * Each instance scans a disjoint subset of the tablet ranges, dividing the
+ * total scan workload among the instances.
+ *
+ * @param scanSource the full scan source (e.g. all tablets assigned to this worker)
+ * @param instanceNum the number of instances to split into
+ * @param instances the output list receiving newly created assigned jobs
+ * @param context the connect context for generating instance IDs
+ * @param worker the worker that will host all of the instances
+ */
protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum, List instances,
ConnectContext context, DistributedPlanWorker worker) {
// split the scanRanges to some partitions, one partition for one instance
@@ -127,6 +215,22 @@ protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum, List<
}
}
+ /**
+ * Create local shuffle instances on the given worker. The first instance scans
+ * all data, and remaining instances receive an empty scan source — they share
+ * the first instance's scan result via local shuffle on the same BE.
+ * This avoids rescanning the same data multiple times while still adding
+ * parallelism for downstream operators (e.g. aggregation).
+ *
+ * All instances share the same {@code shareScanId}, signaling to the backend
+ * that they belong to the same shared-scan group.
+ *
+ * @param scanSource the full scan source (all data for this worker)
+ * @param instanceNum the total number of local shuffle instances to create
+ * @param instances the output list receiving newly created {@link LocalShuffleAssignedJob}s
+ * @param context the connect context for generating instance IDs
+ * @param worker the worker that will host all local shuffle instances
+ */
protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List instances,
ConnectContext context, DistributedPlanWorker worker) {
// only generate one instance to scan all data, in this step
@@ -161,6 +265,25 @@ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, Li
}
}
+ /**
+ * Compute the number of parallel instances for this fragment.
+ * The result is bounded by several constraints:
+ *
+ * - If the fragment has unpartitioned data distribution, returns 1.
+ * - If query cache is enabled, returns {@code maxParallel} (one instance per
+ * tablet required for cache lookup).
+ * - If the single OLAP scan node qualifies for single-instance optimization
+ * (e.g. LIMIT with no conjuncts), returns 1 to save resources.
+ * - If local shuffle is active, returns the fragment's {@code parallelExecNum}.
+ * - Otherwise, returns {@code min(maxParallel, max(parallelExecNum, 1))},
+ * i.e. capped by the actual tablet count.
+ *
+ *
+ * @param maxParallel the maximum possible parallelism (e.g. total tablet count
+ * or bucket count on this worker)
+ * @param useLocalShuffleToAddParallel whether local shuffle is active
+ * @return the number of instances to create for this worker
+ */
protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddParallel) {
Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive");
if (!fragment.getDataPartition().isPartitioned()) {
@@ -188,6 +311,15 @@ protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddP
return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 1));
}
+ /**
+ * Create a single empty instance assigned to a random available worker.
+ * Used by subclasses in {@link #fillUpAssignedJobs} as a fallback when normal
+ * parallelization produces no instances (e.g. all tablets/data pruned away),
+ * ensuring the fragment can still execute and return an empty result.
+ *
+ * @param workerManager the worker manager to select a random worker from
+ * @return a singleton list containing one empty assigned job
+ */
protected List fillUpSingleEmptyInstance(DistributedPlanWorkerManager workerManager) {
long catalogId = Env.getCurrentInternalCatalog().getId();
if (scanNodes != null && scanNodes.size() > 0) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
index e8b30730103cb0..fb3e1a07526af0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java
@@ -56,6 +56,24 @@ public UnassignedAllBEJob(StatementContext statementContext, PlanFragment fragme
}
// ExchangeNode -> upstreamFragment -> AssignedJob(instances of upstreamFragment)
+ /**
+ * Compute assigned jobs that deploy one instance on every available backend.
+ * This is used for dictionary sink fragments where data must be loaded onto
+ * all BEs. Supports two loading modes:
+ *
+ * - Full load: when source data version has changed, redeploy to all BEs
+ * with parallelism matching the upstream fragment instance count.
+ * - Partial load: when only some BEs are outdated, deploy only to those
+ * outdated BEs to avoid redundant work.
+ *
+ * Each BE gets one instance with an empty {@link DefaultScanSource} (the actual
+ * scan data comes from the upstream exchange).
+ *
+ * @param distributeContext the distribute context providing the worker manager
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs,
+ * used to determine the expected instance count for full loads
+ * @return one assigned job per target backend
+ */
@Override
public List computeAssignedJobs(DistributeContext distributeContext,
ListMultimap inputJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
index bd1f2779acc4b9..3cb1b9ec858947 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
@@ -40,6 +40,18 @@ public UnassignedGatherJob(
super(statementContext, fragment, ImmutableList.of(), exchangeToChildJob);
}
+ /**
+ * Compute assigned jobs for a gather (single-node) fragment.
+ * All instances are placed on a single randomly selected worker.
+ * When {@code useSerialSource} is true, multiple local shuffle instances
+ * are created on the same worker to add intra-machine parallelism:
+ * the first instance scans all data from the upstream exchange and
+ * local-shuffles it to the other local instances for parallel processing.
+ *
+ * @param distributeContext the distribute context for worker selection
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return one or more assigned jobs, all on the same selected worker
+ */
@Override
public List computeAssignedJobs(
DistributeContext distributeContext, ListMultimap inputJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java
index f3d260e289d792..7045ceb9748e58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java
@@ -61,6 +61,16 @@ public static boolean canApply(List scanNodes) {
return true;
}
+ /**
+ * Compute a single assigned job that gathers scan ranges from all
+ * {@link org.apache.doris.planner.DataGenScanNode} sources in this fragment.
+ * All scan ranges from each DataGenScanNode are collected into one
+ * {@link DefaultScanSource} and placed on a randomly selected worker.
+ *
+ * @param distributeContext the distribute context for worker selection
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return a list containing exactly one assigned job with all scan ranges merged
+ */
@Override
public List computeAssignedJobs(
DistributeContext distributeContext, ListMultimap inputJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
index d4f32cce8961c4..ca29f0a76895f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java
@@ -39,6 +39,15 @@ public UnassignedGroupCommitJob(StatementContext statementContext,
super(statementContext, fragment, scanNodes, exchangeToChildJob);
}
+ /**
+ * Compute a single assigned job bound to the group commit merge backend.
+ * The target backend is determined by {@link StatementContext#getGroupCommitMergeBackend()},
+ * ensuring the group commit sink executes on the specific BE designated for merging.
+ *
+ * @param distributeContext the distribute context (unused — worker is fixed by group commit logic)
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return a list containing exactly one assigned job on the group commit merge backend
+ */
@Override
public List computeAssignedJobs(
DistributeContext distributeContext, ListMultimap inputJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java
index a5d6331440acab..2830b1d1d9cff6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java
@@ -43,6 +43,22 @@ public interface UnassignedJob extends TreeNode {
ListMultimap getExchangeToChildJob();
+ /**
+ * Compute and return the list of {@link AssignedJob}s for this fragment.
+ * This is the core method that transforms an unassigned fragment-level job into
+ * concrete parallel instances, each bound to a specific {@link DistributedPlanWorker}
+ * and carrying its assigned {@link ScanSource} (data ranges).
+ *
+ * @param distributeContext
+ * the distribute context containing worker manager, selected workers, and other
+ * planner state needed for worker selection and parallelism decisions
+ * @param inputJobs
+ * multimap from child {@link ExchangeNode} to their already-assigned jobs;
+ * provides the child fragment instance layout used by shuffle/gather jobs
+ * to determine their own instance count and worker placement
+ * @return the list of assigned jobs, each representing one fragment instance scheduled
+ * on a specific worker with its data source
+ */
List computeAssignedJobs(
DistributeContext distributeContext, ListMultimap inputJobs);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
index abe804dc170bfb..a9b259de3f765e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
@@ -48,6 +48,17 @@ public UnassignedLocalTVFSinkJob(
this.backendId = backendId;
}
+ /**
+ * Compute a single assigned job on the designated backend for local TVF sink.
+ * The target backend is determined by {@code backendId}. If the specified backend
+ * is not alive, an {@link IllegalStateException} is thrown. This ensures
+ * INSERT INTO local(...) writes to the correct node's local disk.
+ *
+ * @param distributeContext the distribute context (unused — worker is fixed by backendId)
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return a list containing exactly one assigned job on the designated backend
+ * @throws IllegalStateException if the target backend is not available
+ */
@Override
public List computeAssignedJobs(
DistributeContext distributeContext, ListMultimap inputJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java
index 4c9fb15a2b7cef..735760553c74eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java
@@ -37,6 +37,15 @@ public UnassignedQueryConstantJob(StatementContext statementContext, PlanFragmen
super(statementContext, fragment, ImmutableList.of(), ArrayListMultimap.create());
}
+ /**
+ * Compute a single assigned job on a randomly selected worker for constant queries
+ * (e.g. SELECT 1, SELECT * FROM VALUES(...)). Such queries have no data scan,
+ * so a single instance with an empty {@link DefaultScanSource} suffices.
+ *
+ * @param distributeContext the distribute context for random worker selection
+ * @param inputJobs unused — constant queries have no child fragments
+ * @return a list containing exactly one assigned job on a random worker
+ */
@Override
public List computeAssignedJobs(
DistributeContext distributeContext, ListMultimap inputJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index b6a450c93a1b1a..c4a08a339ee23f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -86,6 +86,18 @@ public List getOlapScanNodes() {
return olapScanNodes;
}
+ /**
+ * Select a replica and its hosting worker for each bucket's tablets across all
+ * OLAP scan nodes in this fragment, grouping by bucket index. This is the key
+ * mechanism for bucket-shuffle join and colocate join: tablets belonging to the
+ * same bucket index across different tables are co-located on the same worker,
+ * enabling local join without data shuffling.
+ *
+ * @param distributeContext the distribute context
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return a map from worker to its {@link UninstancedScanSource} keyed by bucket index,
+ * e.g. {@code {BackendWorker("172.0.0.1") -> {bucket 0: {olapScanNode1: [...], olapScanNode2: [...]}}}}
+ */
@Override
protected Map multipleMachinesParallelization(
DistributeContext distributeContext, ListMultimap inputJobs) {
@@ -112,6 +124,23 @@ protected Map multipleMachinesPara
);
}
+ /**
+ * Split each worker's assigned buckets into one or more instances, then fill up
+ * missing bucket instances when needed for outer join or non-intersect set operations
+ * in bucket-shuffle mode.
+ *
+ * After the default even-split from {@link AbstractUnassignedScanJob#insideMachineParallelization},
+ * this method checks whether the fragment contains right outer join, full outer join,
+ * semi/anti join, or non-intersect set operations that use bucket shuffle. If a bucket
+ * index has no left-side data (e.g. due to tablet pruning), a placeholder instance is
+ * created for that bucket so the right-side data still has a destination to be shuffled to,
+ * preventing the join from silently dropping rows.
+ *
+ * @param workerToScanRanges map from worker to its un-instanced bucket ranges
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @param distributeContext the distribute context for parallelism configuration
+ * @return the list of assigned jobs, with missing bucket placeholders filled in
+ */
@Override
protected List insideMachineParallelization(
Map workerToScanRanges,
@@ -177,6 +206,29 @@ protected List insideMachineParallelization(
return assignedJobs;
}
+ /**
+ * Creates local shuffle instances for bucket-based join fragments.
+ * Handles two scenarios:
+ *
+ * - All serial: all scan nodes use serial source. Only the first
+ * instance scans, others share via local shuffle. Each instance is
+ * assigned a subset of bucket indexes for join processing.
+ * - Mixed serial/non-serial: some scan nodes are serial (e.g.
+ * multi-partition table) and some are not. The serial scan source is
+ * merged into the first instance, while non-serial sources are
+ * parallelized normally. All instances use
+ * {@link LocalShuffleBucketJoinAssignedJob} which carries the specific
+ * bucket indexes to join.
+ *
+ * Any remaining slots (when {@code instanceNum} exceeds the number of
+ * bucket groups) are filled with empty instances that have no join buckets.
+ *
+ * @param scanSource the bucket scan source to distribute
+ * @param instanceNum the target number of instances for this worker
+ * @param instances the output list receiving newly created assigned jobs
+ * @param context the connect context for generating instance IDs
+ * @param worker the worker that will host all instances
+ */
@Override
protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List instances,
ConnectContext context, DistributedPlanWorker worker) {
@@ -506,6 +558,21 @@ private List getWorkersByReplicas(Tablet tablet, long cat
return workers;
}
+ /**
+ * Compute parallelism for bucket-based scan fragments.
+ * In addition to the base class constraints, this override introduces a
+ * tablet-count-based strategy for pure colocate scan (no exchange nodes):
+ * parallelism is derived from the total tablet count, capped by the
+ * session variable {@code colocateMaxParallelNum} (default 128).
+ *
+ * When exchange nodes are present (e.g. bucket shuffle join), falls back
+ * to {@link AbstractUnassignedScanJob#degreeOfParallelism} to avoid
+ * over-parallelizing the join fragment.
+ *
+ * @param maxParallel the maximum possible parallelism (bucket count)
+ * @param useLocalShuffleToAddParallel whether local shuffle is active
+ * @return the number of instances to create per worker
+ */
@Override
protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddParallel) {
Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java
index aab0f20895d049..8ff7092300fb8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java
@@ -48,6 +48,15 @@ public UnassignedScanMetadataJob(
this.schemaScanNode = schemaScanNode;
}
+ /**
+ * Select a worker for the schema metadata scan node (e.g. information_schema tables).
+ * Metadata scans are typically lightweight and produce a single scan range per node;
+ * this method distributes them across available workers for load balancing.
+ *
+ * @param distributeContext the distribute context
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return a map from worker to its assigned schema scan ranges
+ */
@Override
protected Map multipleMachinesParallelization(
DistributeContext distributeContext, ListMultimap inputJobs) {
@@ -56,6 +65,16 @@ protected Map multipleMachinesPara
);
}
+ /**
+ * If no workers could be selected for the metadata scan (e.g. all backends are
+ * unavailable), create a single empty instance on a random available worker
+ * as a fallback to prevent query failure.
+ *
+ * @param assignedJobs the list produced by {@link #insideMachineParallelization}
+ * @param workerManager the worker manager to select a fallback worker from
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return the original list if non-empty, otherwise a single empty instance
+ */
@Override
protected List fillUpAssignedJobs(List assignedJobs,
DistributedPlanWorkerManager workerManager, ListMultimap inputJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
index fa72f8c01050b6..f6d1694d855fd7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
@@ -62,6 +62,16 @@ public UnassignedScanSingleOlapTableJob(
this.olapScanNode = olapScanNode;
}
+ /**
+ * Select a replica and its hosting worker for every tablet of the OLAP scan node,
+ * without bucket awareness. Each tablet is assigned to the best available backend
+ * holding a replica, and tablets on the same backend are grouped together.
+ *
+ * @param distributeContext the distribute context
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return a map from worker to its assigned tablets (as {@link UninstancedScanSource}),
+ * e.g. {@code {BackendWorker("172.0.0.1") -> [tablet_10001..10004]}}
+ */
@Override
protected Map multipleMachinesParallelization(
DistributeContext distributeContext, ListMultimap inputJobs) {
@@ -78,6 +88,20 @@ protected Map multipleMachinesPara
);
}
+ /**
+ * For each worker, split its assigned tablets into one or more instances.
+ * When the fragment uses query cache and the tablet count exceeds the threshold,
+ * a partition-based grouping strategy is attempted first: tablets belonging to
+ * the same partition are kept within the same instance to reduce backend
+ * concurrency pressure during cache lookup. If partition-based grouping is not
+ * applicable or fails, falls back to the default even-split strategy from
+ * {@link AbstractUnassignedScanJob#insideMachineParallelization}.
+ *
+ * @param workerToScanRanges map from worker to its un-instanced tablet ranges
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @param distributeContext the distribute context for parallelism configuration
+ * @return the list of assigned jobs, each bound to a worker with its tablet portion
+ */
@Override
protected List insideMachineParallelization(
Map workerToScanRanges,
@@ -225,6 +249,17 @@ private Map splitScanRangesByPartition(
return partitionToScanRanges;
}
+ /**
+ * If the normal parallelization produced an empty list (e.g. all tablets have been
+ * pruned by TABLET() hint specifying a non-existent tablet), create a single empty
+ * instance on a random worker so the fragment can still execute and return an empty
+ * result set rather than failing.
+ *
+ * @param assignedJobs the list produced by {@link #insideMachineParallelization}
+ * @param workerManager the worker manager to select a fallback worker from
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return the original list if non-empty, otherwise a single empty instance
+ */
@Override
protected List fillUpAssignedJobs(List assignedJobs,
DistributedPlanWorkerManager workerManager, ListMultimap inputJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java
index bc98119d939aaa..e9dfc73588076c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java
@@ -48,6 +48,16 @@ public UnassignedScanSingleRemoteTableJob(
this.scanWorkerSelector = Objects.requireNonNull(scanWorkerSelector, "scanWorkerSelector is not null");
}
+ /**
+ * Select a worker for each scan range of the external / remote table scan node.
+ * For external tables (Hive, Iceberg, etc.), scan ranges represent file splits
+ * rather than tablets, and workers are selected based on data locality or
+ * workload balancing.
+ *
+ * @param distributeContext the distribute context
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return a map from worker to its assigned file scan ranges
+ */
@Override
protected Map multipleMachinesParallelization(
DistributeContext distributeContext, ListMultimap inputJobs) {
@@ -56,6 +66,16 @@ protected Map multipleMachinesPara
);
}
+ /**
+ * If all file scan ranges have been pruned and the assigned job list is empty,
+ * create a single empty instance on a random worker so the fragment can still
+ * execute (returning an empty result) rather than failing.
+ *
+ * @param assignedJobs the list produced by {@link #insideMachineParallelization}
+ * @param workerManager the worker manager to select a fallback worker from
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return the original list if non-empty, otherwise a single empty instance
+ */
@Override
protected List fillUpAssignedJobs(List assignedJobs,
DistributedPlanWorkerManager workerManager, ListMultimap inputJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
index 27792eb288e1ca..40c90563730331 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
@@ -50,6 +50,22 @@ public UnassignedShuffleJob(
super(statementContext, fragment, ImmutableList.of(), exchangeToChildJob);
}
+ /**
+ * Compute assigned jobs for a shuffle (data redistribution) fragment.
+ * The instance count is determined by the parallelism of the largest child
+ * fragment. When the expected instance count is lower than the child count
+ * (e.g. due to session variable limits or query cache constraints), workers
+ * are shuffled to spread instances across different backends for load balancing.
+ * When more instances are needed, worker assignment follows the child layout.
+ *
+ * If {@code useSerialSource} is true, multiple local shuffle instances are
+ * created per worker to add intra-machine parallelism without rescanning data.
+ *
+ * @param distributeContext the distribute context for worker selection
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs,
+ * used to determine the largest child fragment's instance layout
+ * @return assigned shuffle jobs with workers selected from child fragment layout
+ */
@Override
public List computeAssignedJobs(
DistributeContext distributeContext, ListMultimap inputJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java
index 6ded32e0cd98c5..ca3fe1c4aae025 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java
@@ -42,6 +42,17 @@ public UnassignedSpecifyInstancesJob(
this.specifyInstances = fragment.specifyInstances.get();
}
+ /**
+ * Compute assigned jobs by delegating to the fragment's
+ * {@link NereidsSpecifyInstances}. This is used when the fragment has
+ * pre-specified instance-to-worker mappings (e.g. from hints or
+ * statement-level instance specifications), bypassing the normal
+ * worker selection and parallelization logic.
+ *
+ * @param distributeContext the distribute context (forwarded to specify instances)
+ * @param inputJobs multimap from child exchange nodes to their assigned jobs
+ * @return assigned jobs built from the pre-specified instance layout
+ */
@Override
public List computeAssignedJobs(
DistributeContext distributeContext, ListMultimap inputJobs) {