diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java index 167ea3dc33419a..8c8a47e844408c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java @@ -47,6 +47,24 @@ public AbstractUnassignedScanJob(StatementContext statementContext, PlanFragment super(statementContext, fragment, scanNodes, exchangeToChildJob); } + /** + * Compute assigned scan jobs using a two-phase parallelization strategy: + *
    + *
  1. Cross-machine parallelization ({@link #multipleMachinesParallelization}): + * For each tablet / scan range, select the best replica and its hosting backend worker. + * This groups scan ranges by the worker that will process them.
  2. + *
  3. Intra-machine parallelization ({@link #insideMachineParallelization}): + * Within each worker, split the assigned scan ranges into one or more instances + * based on the degree of parallelism. Supports local shuffle mode to further + * increase parallelism without rescanning data.
  4. + *
+ * After both phases, {@link #fillUpAssignedJobs} provides a hook for subclasses to + * supply fallback instances when no workers could be selected (e.g. all tablets pruned). + * + * @param distributeContext the distribute context for worker selection and parallelism config + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return the list of assigned scan jobs, each bound to a worker with its tablet ranges + */ @Override public List computeAssignedJobs( DistributeContext distributeContext, ListMultimap inputJobs) { @@ -59,6 +77,18 @@ public List computeAssignedJobs( return fillUpAssignedJobs(assignedJobs, distributeContext.workerManager, inputJobs); } + /** + * Hook for subclasses to supply fallback instances when the normal parallelization + * produces an empty result. For example, when all tablets of a table have been pruned + * (e.g. TABLET(1234) with a non-existent tablet id), this method can create a single + * empty instance to keep the fragment alive and return an empty result set. + * + * @param assignedJobs the list produced by {@link #insideMachineParallelization}; + * may be empty if no workers could be selected + * @param workerManager the worker manager used to select a random fallback worker + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return the (possibly augmented) list of assigned jobs; default returns unchanged + */ protected List fillUpAssignedJobs( List assignedJobs, DistributedPlanWorkerManager workerManager, @@ -66,9 +96,44 @@ protected List fillUpAssignedJobs( return assignedJobs; } + /** + * Cross-machine parallelization: for each tablet / scan range of the scan nodes + * in this fragment, select the best replica and its hosting {@link DistributedPlanWorker}. + * The result groups all scan ranges by the worker that will process them. + *

+ * This is the first phase of the two-phase parallelization. The returned map drives + * the second phase ({@link #insideMachineParallelization}) where each worker's ranges + * are further split into individual instances. + * + * @param distributeContext the distribute context for worker selection and parallelism config + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return a map from selected worker to its {@link UninstancedScanSource} containing + * the raw scan ranges assigned to that worker, not yet split into instances + */ protected abstract Map multipleMachinesParallelization( DistributeContext distributeContext, ListMultimap inputJobs); + /** + * Intra-machine parallelization: for each worker, split its assigned scan ranges + * into one or more {@link AssignedJob} instances. This is the second phase of + * the two-phase parallelization, following {@link #multipleMachinesParallelization}. + *

+ * For each worker entry, the method: + *

    + *
  1. Computes the max parallelism from the scan source (e.g. tablet count).
  2. + *
  3. Determines the final instance count via {@link #degreeOfParallelism}, + * capped by the fragment's {@code parallelExecNum} and tablet count.
  4. + *
  5. Splits scan ranges evenly across instances (default mode) or creates + * local shuffle instances that share a single scan source to add + * parallelism without rescanning data ({@link #assignLocalShuffleJobs}).
  6. + *
+ * + * @param workerToScanRanges map from worker to its un-instanced scan ranges, + * produced by {@link #multipleMachinesParallelization} + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @param distributeContext the distribute context for parallelism configuration + * @return the list of assigned jobs, each bound to a worker with its portion of scan ranges + */ protected List insideMachineParallelization( Map workerToScanRanges, ListMultimap inputJobs, @@ -104,10 +169,33 @@ protected List insideMachineParallelization( return instances; } + /** + * Whether the fragment should use a serial source operator followed by local + * shuffle to add intra-machine parallelism. When true, data is first gathered + * through one exchange, then locally shuffled to multiple instances on the same + * machine, allowing parallel computation without rescanning the source data. + * + * @param distributeContext the distribute context; for load jobs, the connect + * context is passed as null to avoid serial source + * @return true if the fragment has a serial source operator and should use + * local shuffle to increase parallelism + */ protected boolean useLocalShuffleToAddParallel(DistributeContext distributeContext) { return fragment.useSerialSource(distributeContext.isLoadJob ? null : statementContext.getConnectContext()); } + /** + * Split the given scan source evenly into {@code instanceNum} partitions and + * create one {@link StaticAssignedJob} per partition, all on the same worker. + * Each instance scans a disjoint subset of the tablet ranges, dividing the + * total scan workload among the instances. + * + * @param scanSource the full scan source (e.g. all tablets assigned to this worker) + * @param instanceNum the number of instances to split into + * @param instances the output list receiving newly created assigned jobs + * @param context the connect context for generating instance IDs + * @param worker the worker that will host all of the instances + */ protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum, List instances, ConnectContext context, DistributedPlanWorker worker) { // split the scanRanges to some partitions, one partition for one instance @@ -127,6 +215,22 @@ protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum, List< } } + /** + * Create local shuffle instances on the given worker. The first instance scans + * all data, and remaining instances receive an empty scan source — they share + * the first instance's scan result via local shuffle on the same BE. + * This avoids rescanning the same data multiple times while still adding + * parallelism for downstream operators (e.g. aggregation). + *

+ * All instances share the same {@code shareScanId}, signaling to the backend + * that they belong to the same shared-scan group. + * + * @param scanSource the full scan source (all data for this worker) + * @param instanceNum the total number of local shuffle instances to create + * @param instances the output list receiving newly created {@link LocalShuffleAssignedJob}s + * @param context the connect context for generating instance IDs + * @param worker the worker that will host all local shuffle instances + */ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List instances, ConnectContext context, DistributedPlanWorker worker) { // only generate one instance to scan all data, in this step @@ -161,6 +265,25 @@ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, Li } } + /** + * Compute the number of parallel instances for this fragment. + * The result is bounded by several constraints: + *

    + *
  • If the fragment has unpartitioned data distribution, returns 1.
  • + *
  • If query cache is enabled, returns {@code maxParallel} (one instance per + * tablet required for cache lookup).
  • + *
  • If the single OLAP scan node qualifies for single-instance optimization + * (e.g. LIMIT with no conjuncts), returns 1 to save resources.
  • + *
  • If local shuffle is active, returns the fragment's {@code parallelExecNum}.
  • + *
  • Otherwise, returns {@code min(maxParallel, max(parallelExecNum, 1))}, + * i.e. capped by the actual tablet count.
  • + *
+ * + * @param maxParallel the maximum possible parallelism (e.g. total tablet count + * or bucket count on this worker) + * @param useLocalShuffleToAddParallel whether local shuffle is active + * @return the number of instances to create for this worker + */ protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddParallel) { Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive"); if (!fragment.getDataPartition().isPartitioned()) { @@ -188,6 +311,15 @@ protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddP return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 1)); } + /** + * Create a single empty instance assigned to a random available worker. + * Used by subclasses in {@link #fillUpAssignedJobs} as a fallback when normal + * parallelization produces no instances (e.g. all tablets/data pruned away), + * ensuring the fragment can still execute and return an empty result. + * + * @param workerManager the worker manager to select a random worker from + * @return a singleton list containing one empty assigned job + */ protected List fillUpSingleEmptyInstance(DistributedPlanWorkerManager workerManager) { long catalogId = Env.getCurrentInternalCatalog().getId(); if (scanNodes != null && scanNodes.size() > 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java index e8b30730103cb0..fb3e1a07526af0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedAllBEJob.java @@ -56,6 +56,24 @@ public UnassignedAllBEJob(StatementContext statementContext, PlanFragment fragme } // ExchangeNode -> upstreamFragment -> AssignedJob(instances of upstreamFragment) + /** + * Compute assigned jobs that deploy one instance on every available backend. + * This is used for dictionary sink fragments where data must be loaded onto + * all BEs. Supports two loading modes: + *
    + *
  • Full load: when source data version has changed, redeploy to all BEs + * with parallelism matching the upstream fragment instance count.
  • + *
  • Partial load: when only some BEs are outdated, deploy only to those + * outdated BEs to avoid redundant work.
  • + *
+ * Each BE gets one instance with an empty {@link DefaultScanSource} (the actual + * scan data comes from the upstream exchange). + * + * @param distributeContext the distribute context providing the worker manager + * @param inputJobs multimap from child exchange nodes to their assigned jobs, + * used to determine the expected instance count for full loads + * @return one assigned job per target backend + */ @Override public List computeAssignedJobs(DistributeContext distributeContext, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java index bd1f2779acc4b9..3cb1b9ec858947 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java @@ -40,6 +40,18 @@ public UnassignedGatherJob( super(statementContext, fragment, ImmutableList.of(), exchangeToChildJob); } + /** + * Compute assigned jobs for a gather (single-node) fragment. + * All instances are placed on a single randomly selected worker. + * When {@code useSerialSource} is true, multiple local shuffle instances + * are created on the same worker to add intra-machine parallelism: + * the first instance scans all data from the upstream exchange and + * local-shuffles it to the other local instances for parallel processing. + * + * @param distributeContext the distribute context for worker selection + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return one or more assigned jobs, all on the same selected worker + */ @Override public List computeAssignedJobs( DistributeContext distributeContext, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java index f3d260e289d792..7045ceb9748e58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java @@ -61,6 +61,16 @@ public static boolean canApply(List scanNodes) { return true; } + /** + * Compute a single assigned job that gathers scan ranges from all + * {@link org.apache.doris.planner.DataGenScanNode} sources in this fragment. + * All scan ranges from each DataGenScanNode are collected into one + * {@link DefaultScanSource} and placed on a randomly selected worker. + * + * @param distributeContext the distribute context for worker selection + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return a list containing exactly one assigned job with all scan ranges merged + */ @Override public List computeAssignedJobs( DistributeContext distributeContext, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java index d4f32cce8961c4..ca29f0a76895f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGroupCommitJob.java @@ -39,6 +39,15 @@ public UnassignedGroupCommitJob(StatementContext statementContext, super(statementContext, fragment, scanNodes, exchangeToChildJob); } + /** + * Compute a single assigned job bound to the group commit merge backend. + * The target backend is determined by {@link StatementContext#getGroupCommitMergeBackend()}, + * ensuring the group commit sink executes on the specific BE designated for merging. + * + * @param distributeContext the distribute context (unused — worker is fixed by group commit logic) + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return a list containing exactly one assigned job on the group commit merge backend + */ @Override public List computeAssignedJobs( DistributeContext distributeContext, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java index a5d6331440acab..2830b1d1d9cff6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java @@ -43,6 +43,22 @@ public interface UnassignedJob extends TreeNode { ListMultimap getExchangeToChildJob(); + /** + * Compute and return the list of {@link AssignedJob}s for this fragment. + * This is the core method that transforms an unassigned fragment-level job into + * concrete parallel instances, each bound to a specific {@link DistributedPlanWorker} + * and carrying its assigned {@link ScanSource} (data ranges). + * + * @param distributeContext + * the distribute context containing worker manager, selected workers, and other + * planner state needed for worker selection and parallelism decisions + * @param inputJobs + * multimap from child {@link ExchangeNode} to their already-assigned jobs; + * provides the child fragment instance layout used by shuffle/gather jobs + * to determine their own instance count and worker placement + * @return the list of assigned jobs, each representing one fragment instance scheduled + * on a specific worker with its data source + */ List computeAssignedJobs( DistributeContext distributeContext, ListMultimap inputJobs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java index abe804dc170bfb..a9b259de3f765e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java @@ -48,6 +48,17 @@ public UnassignedLocalTVFSinkJob( this.backendId = backendId; } + /** + * Compute a single assigned job on the designated backend for local TVF sink. + * The target backend is determined by {@code backendId}. If the specified backend + * is not alive, an {@link IllegalStateException} is thrown. This ensures + * INSERT INTO local(...) writes to the correct node's local disk. + * + * @param distributeContext the distribute context (unused — worker is fixed by backendId) + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return a list containing exactly one assigned job on the designated backend + * @throws IllegalStateException if the target backend is not available + */ @Override public List computeAssignedJobs( DistributeContext distributeContext, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java index 4c9fb15a2b7cef..735760553c74eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java @@ -37,6 +37,15 @@ public UnassignedQueryConstantJob(StatementContext statementContext, PlanFragmen super(statementContext, fragment, ImmutableList.of(), ArrayListMultimap.create()); } + /** + * Compute a single assigned job on a randomly selected worker for constant queries + * (e.g. SELECT 1, SELECT * FROM VALUES(...)). Such queries have no data scan, + * so a single instance with an empty {@link DefaultScanSource} suffices. + * + * @param distributeContext the distribute context for random worker selection + * @param inputJobs unused — constant queries have no child fragments + * @return a list containing exactly one assigned job on a random worker + */ @Override public List computeAssignedJobs( DistributeContext distributeContext, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index b6a450c93a1b1a..c4a08a339ee23f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -86,6 +86,18 @@ public List getOlapScanNodes() { return olapScanNodes; } + /** + * Select a replica and its hosting worker for each bucket's tablets across all + * OLAP scan nodes in this fragment, grouping by bucket index. This is the key + * mechanism for bucket-shuffle join and colocate join: tablets belonging to the + * same bucket index across different tables are co-located on the same worker, + * enabling local join without data shuffling. + * + * @param distributeContext the distribute context + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return a map from worker to its {@link UninstancedScanSource} keyed by bucket index, + * e.g. {@code {BackendWorker("172.0.0.1") -> {bucket 0: {olapScanNode1: [...], olapScanNode2: [...]}}}} + */ @Override protected Map multipleMachinesParallelization( DistributeContext distributeContext, ListMultimap inputJobs) { @@ -112,6 +124,23 @@ protected Map multipleMachinesPara ); } + /** + * Split each worker's assigned buckets into one or more instances, then fill up + * missing bucket instances when needed for outer join or non-intersect set operations + * in bucket-shuffle mode. + *

+ * After the default even-split from {@link AbstractUnassignedScanJob#insideMachineParallelization}, + * this method checks whether the fragment contains right outer join, full outer join, + * semi/anti join, or non-intersect set operations that use bucket shuffle. If a bucket + * index has no left-side data (e.g. due to tablet pruning), a placeholder instance is + * created for that bucket so the right-side data still has a destination to be shuffled to, + * preventing the join from silently dropping rows. + * + * @param workerToScanRanges map from worker to its un-instanced bucket ranges + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @param distributeContext the distribute context for parallelism configuration + * @return the list of assigned jobs, with missing bucket placeholders filled in + */ @Override protected List insideMachineParallelization( Map workerToScanRanges, @@ -177,6 +206,29 @@ protected List insideMachineParallelization( return assignedJobs; } + /** + * Creates local shuffle instances for bucket-based join fragments. + * Handles two scenarios: + *

    + *
  1. All serial: all scan nodes use serial source. Only the first + * instance scans, others share via local shuffle. Each instance is + * assigned a subset of bucket indexes for join processing.
  2. + *
  3. Mixed serial/non-serial: some scan nodes are serial (e.g. + * multi-partition table) and some are not. The serial scan source is + * merged into the first instance, while non-serial sources are + * parallelized normally. All instances use + * {@link LocalShuffleBucketJoinAssignedJob} which carries the specific + * bucket indexes to join.
  4. + *
+ * Any remaining slots (when {@code instanceNum} exceeds the number of + * bucket groups) are filled with empty instances that have no join buckets. + * + * @param scanSource the bucket scan source to distribute + * @param instanceNum the target number of instances for this worker + * @param instances the output list receiving newly created assigned jobs + * @param context the connect context for generating instance IDs + * @param worker the worker that will host all instances + */ @Override protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List instances, ConnectContext context, DistributedPlanWorker worker) { @@ -506,6 +558,21 @@ private List getWorkersByReplicas(Tablet tablet, long cat return workers; } + /** + * Compute parallelism for bucket-based scan fragments. + * In addition to the base class constraints, this override introduces a + * tablet-count-based strategy for pure colocate scan (no exchange nodes): + * parallelism is derived from the total tablet count, capped by the + * session variable {@code colocateMaxParallelNum} (default 128). + *

+ * When exchange nodes are present (e.g. bucket shuffle join), falls back + * to {@link AbstractUnassignedScanJob#degreeOfParallelism} to avoid + * over-parallelizing the join fragment. + * + * @param maxParallel the maximum possible parallelism (bucket count) + * @param useLocalShuffleToAddParallel whether local shuffle is active + * @return the number of instances to create per worker + */ @Override protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddParallel) { Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java index aab0f20895d049..8ff7092300fb8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanMetadataJob.java @@ -48,6 +48,15 @@ public UnassignedScanMetadataJob( this.schemaScanNode = schemaScanNode; } + /** + * Select a worker for the schema metadata scan node (e.g. information_schema tables). + * Metadata scans are typically lightweight and produce a single scan range per node; + * this method distributes them across available workers for load balancing. + * + * @param distributeContext the distribute context + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return a map from worker to its assigned schema scan ranges + */ @Override protected Map multipleMachinesParallelization( DistributeContext distributeContext, ListMultimap inputJobs) { @@ -56,6 +65,16 @@ protected Map multipleMachinesPara ); } + /** + * If no workers could be selected for the metadata scan (e.g. all backends are + * unavailable), create a single empty instance on a random available worker + * as a fallback to prevent query failure. + * + * @param assignedJobs the list produced by {@link #insideMachineParallelization} + * @param workerManager the worker manager to select a fallback worker from + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return the original list if non-empty, otherwise a single empty instance + */ @Override protected List fillUpAssignedJobs(List assignedJobs, DistributedPlanWorkerManager workerManager, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java index fa72f8c01050b6..f6d1694d855fd7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java @@ -62,6 +62,16 @@ public UnassignedScanSingleOlapTableJob( this.olapScanNode = olapScanNode; } + /** + * Select a replica and its hosting worker for every tablet of the OLAP scan node, + * without bucket awareness. Each tablet is assigned to the best available backend + * holding a replica, and tablets on the same backend are grouped together. + * + * @param distributeContext the distribute context + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return a map from worker to its assigned tablets (as {@link UninstancedScanSource}), + * e.g. {@code {BackendWorker("172.0.0.1") -> [tablet_10001..10004]}} + */ @Override protected Map multipleMachinesParallelization( DistributeContext distributeContext, ListMultimap inputJobs) { @@ -78,6 +88,20 @@ protected Map multipleMachinesPara ); } + /** + * For each worker, split its assigned tablets into one or more instances. + * When the fragment uses query cache and the tablet count exceeds the threshold, + * a partition-based grouping strategy is attempted first: tablets belonging to + * the same partition are kept within the same instance to reduce backend + * concurrency pressure during cache lookup. If partition-based grouping is not + * applicable or fails, falls back to the default even-split strategy from + * {@link AbstractUnassignedScanJob#insideMachineParallelization}. + * + * @param workerToScanRanges map from worker to its un-instanced tablet ranges + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @param distributeContext the distribute context for parallelism configuration + * @return the list of assigned jobs, each bound to a worker with its tablet portion + */ @Override protected List insideMachineParallelization( Map workerToScanRanges, @@ -225,6 +249,17 @@ private Map splitScanRangesByPartition( return partitionToScanRanges; } + /** + * If the normal parallelization produced an empty list (e.g. all tablets have been + * pruned by TABLET() hint specifying a non-existent tablet), create a single empty + * instance on a random worker so the fragment can still execute and return an empty + * result set rather than failing. + * + * @param assignedJobs the list produced by {@link #insideMachineParallelization} + * @param workerManager the worker manager to select a fallback worker from + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return the original list if non-empty, otherwise a single empty instance + */ @Override protected List fillUpAssignedJobs(List assignedJobs, DistributedPlanWorkerManager workerManager, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java index bc98119d939aaa..e9dfc73588076c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java @@ -48,6 +48,16 @@ public UnassignedScanSingleRemoteTableJob( this.scanWorkerSelector = Objects.requireNonNull(scanWorkerSelector, "scanWorkerSelector is not null"); } + /** + * Select a worker for each scan range of the external / remote table scan node. + * For external tables (Hive, Iceberg, etc.), scan ranges represent file splits + * rather than tablets, and workers are selected based on data locality or + * workload balancing. + * + * @param distributeContext the distribute context + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return a map from worker to its assigned file scan ranges + */ @Override protected Map multipleMachinesParallelization( DistributeContext distributeContext, ListMultimap inputJobs) { @@ -56,6 +66,16 @@ protected Map multipleMachinesPara ); } + /** + * If all file scan ranges have been pruned and the assigned job list is empty, + * create a single empty instance on a random worker so the fragment can still + * execute (returning an empty result) rather than failing. + * + * @param assignedJobs the list produced by {@link #insideMachineParallelization} + * @param workerManager the worker manager to select a fallback worker from + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return the original list if non-empty, otherwise a single empty instance + */ @Override protected List fillUpAssignedJobs(List assignedJobs, DistributedPlanWorkerManager workerManager, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java index 27792eb288e1ca..40c90563730331 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java @@ -50,6 +50,22 @@ public UnassignedShuffleJob( super(statementContext, fragment, ImmutableList.of(), exchangeToChildJob); } + /** + * Compute assigned jobs for a shuffle (data redistribution) fragment. + * The instance count is determined by the parallelism of the largest child + * fragment. When the expected instance count is lower than the child count + * (e.g. due to session variable limits or query cache constraints), workers + * are shuffled to spread instances across different backends for load balancing. + * When more instances are needed, worker assignment follows the child layout. + *

+ * If {@code useSerialSource} is true, multiple local shuffle instances are + * created per worker to add intra-machine parallelism without rescanning data. + * + * @param distributeContext the distribute context for worker selection + * @param inputJobs multimap from child exchange nodes to their assigned jobs, + * used to determine the largest child fragment's instance layout + * @return assigned shuffle jobs with workers selected from child fragment layout + */ @Override public List computeAssignedJobs( DistributeContext distributeContext, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java index 6ded32e0cd98c5..ca3fe1c4aae025 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java @@ -42,6 +42,17 @@ public UnassignedSpecifyInstancesJob( this.specifyInstances = fragment.specifyInstances.get(); } + /** + * Compute assigned jobs by delegating to the fragment's + * {@link NereidsSpecifyInstances}. This is used when the fragment has + * pre-specified instance-to-worker mappings (e.g. from hints or + * statement-level instance specifications), bypassing the normal + * worker selection and parallelization logic. + * + * @param distributeContext the distribute context (forwarded to specify instances) + * @param inputJobs multimap from child exchange nodes to their assigned jobs + * @return assigned jobs built from the pre-specified instance layout + */ @Override public List computeAssignedJobs( DistributeContext distributeContext, ListMultimap inputJobs) {