Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions docs/source/user-guide/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ Key terminology:

- `Stage`: a portion of the plan separated by a network boundary from other parts of the plan. A plan contains
one or more stages, each separated by network boundaries.
- `Task`: a unit of work in a stage that executes the inner plan in parallel to other tasks within the stage. Each task
in a stage executes a structurally identical plan in different worker, passing a `task_index` as a contextual value
for making choices about what data should be returned.
- `Task`: a unit of work in a stage that executes a plan in parallel to other tasks within the stage. Each task
in a stage runs on a different worker with its own plan variant — pre-specialized at planning time for the subset
of data it is responsible for.
- `Network Boundary`: a node in the plan that streams data from a network interface rather than directly from its
children nodes.
- `Worker`: a physical machine listening to serialized execution plans over an Arrow Flight interface. A task is
Expand Down Expand Up @@ -60,11 +60,14 @@ previous stages.
An extension present during the `ExecutionPlan::execute()` that contains information about the current task in
which the plan is being executed.

As a user, you will need to interact with this type in your custom leaf nodes, as depending on which task index
you are in, you might want to return a different set of data.
For built-in file-based plans (`DataSourceExec`), data partitioning is handled automatically at planning time via
`DistributedLeafExec`: each task receives a pre-built plan variant with its own isolated file groups, so no
runtime dispatch is needed.

For example, if you are on the task with index 0 of a 3-task stage, you might want to return only the first 1/3 of the
data. If you are on the task with index 2, you might want to return the last 1/3 of the data, and so on.
For custom leaf nodes that need to dispatch work themselves, `DistributedTaskContext` exposes `task_index` and
`task_count` so execution logic can select the appropriate data subset. For example, task 0 of 3 might return
the first third of rows, task 2 the last third, and so on. See the `TaskEstimator` documentation for guidance on
which approach to use.

## [ChannelResolver](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/networking/channel_resolver.rs)

Expand Down
24 changes: 14 additions & 10 deletions docs/source/user-guide/task-estimator.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@ can provide custom `TaskEstimator` implementations for your own `ExecutionPlan`

Providing a `TaskEstimator` allows you to do two things:

1. Tell the distributed planner how many tasks should be used for your own `ExecutionPlan`s.
2. Tell the distributed planner how to "scale up" your `ExecutionPlan` in order to account for it running in
multiple distributed tasks.

If your custom nodes will execute in a distributed manner, you must handle this during execution. When your
`TaskEstimator` specifies N tasks for a node, your execution logic must respond to the
[DistributedTaskContext](https://github.com/datafusion-contrib/datafusion-distributed/blob/75b4e73e9052c6596b9d1744ce2bdfa6cbc010d3/src/stage.rs#L137-L137)
present in DataFusion's `TaskContext` to determine which subset of data this task should process.

There's an example of how to do that in the `examples/` folder:
1. Tell the distributed planner how many tasks should be used for your own `ExecutionPlan`s
(`task_estimation`).
2. Prepare the leaf node at planning time for distributed execution (`scale_up_leaf_node`).
The recommended approach is to return a `DistributedLeafExec` wrapping your original plan
along with `N` per-task variants (one per task). `DistributedLeafExec` is transparent to
network boundaries—it exposes the same partition count as the original—and the task spawner
automatically replaces it with the appropriate per-task variant before serialising the plan
and sending it to a worker.

If your leaf node already handles task dispatch internally (e.g., by reading
`DistributedTaskContext.task_index` in `execute()`), you can omit `DistributedLeafExec` and
simply return the prepared plan directly from `scale_up_leaf_node`.

There's a complete example in the `examples/` folder:

- [custom_execution_plan.rs](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/examples/custom_execution_plan.rs) -
A complete example showing how to implement a custom execution plan (`numbers(start, end)` table function)
Expand Down
3 changes: 2 additions & 1 deletion examples/custom_execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ impl TableProvider for NumbersTableProvider {
}

/// Custom execution plan that generates numbers from start to end.
/// When distributed, each task generates a subset of numbers based on its task_index.
/// When distributed, `scale_up_leaf_node` populates `ranges_per_task` with one entry per task
/// and `execute()` uses `DistributedTaskContext.task_index` to select this task's range.
#[derive(Debug, Clone)]
struct NumbersExec {
ranges_per_task: Vec<Range<i64>>,
Expand Down
16 changes: 5 additions & 11 deletions src/coordinator/prepare_static_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use crate::coordinator::distributed::PreparedPlan;
use crate::coordinator::task_spawner::{
CoordinatorToWorkerMetrics, CoordinatorToWorkerTaskSpawner,
};
use crate::distributed_planner::get_distributed_task_estimator;
use crate::stage::RemoteStage;
use crate::{
DistributedCodec, NetworkBoundaryExt, Stage, TaskRoutingContext,
DistributedConfig, NetworkBoundaryExt, Stage, TaskEstimator, TaskRoutingContext,
get_distributed_worker_resolver,
};
use datafusion::common::runtime::JoinSet;
Expand Down Expand Up @@ -35,7 +34,6 @@ pub(super) fn prepare_static_plan(
ctx: &Arc<TaskContext>,
) -> Result<PreparedPlan> {
let worker_resolver = get_distributed_worker_resolver(ctx.session_config())?;
let codec = DistributedCodec::new_combined_with_user(ctx.session_config());

let available_urls = worker_resolver.get_urls()?;

Expand All @@ -52,15 +50,11 @@ pub(super) fn prepare_static_plan(
return exec_err!("Input stage from network boundary was not in Local state");
};

let task_estimator = get_distributed_task_estimator(ctx.session_config())?;
let d_cfg = DistributedConfig::from_config_options(ctx.session_config().options())?;
let task_estimator = &d_cfg.__private_task_estimator;

let mut spawner = CoordinatorToWorkerTaskSpawner::new(
stage,
&metrics,
task_metrics,
&codec,
&mut join_set,
)?;
let mut spawner =
CoordinatorToWorkerTaskSpawner::new(stage, &metrics, task_metrics, ctx, &mut join_set)?;

let routed_urls = match task_estimator.route_tasks(&TaskRoutingContext {
task_ctx: Arc::clone(ctx),
Expand Down
56 changes: 36 additions & 20 deletions src/coordinator/task_spawner.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
use crate::common::{TreeNodeExt, serialize_uuid, task_ctx_with_extension};
use crate::config_extension_ext::get_config_extension_propagation_headers;
use crate::coordinator::MetricsStore;
use crate::execution_plans::{ChildrenIsolatorUnionExec, DistributedLeafExec};
use crate::passthrough_headers::get_passthrough_headers;
use crate::protobuf::tonic_status_to_datafusion_error;
use crate::stage::LocalStage;
use crate::worker::generated::worker as pb;
use crate::worker::generated::worker::coordinator_to_worker_msg::Inner;
use crate::worker::generated::worker::set_plan_request::WorkUnitFeedDeclaration;
use crate::{
DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, DistributedConfig, DistributedTaskContext,
DistributedWorkUnitFeedContext, TaskKey, get_distributed_channel_resolver,
DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, DistributedCodec, DistributedConfig,
DistributedTaskContext, DistributedWorkUnitFeedContext, TaskKey,
get_distributed_channel_resolver,
};
use datafusion::common::Result;
use datafusion::common::instant::Instant;
use datafusion::common::runtime::JoinSet;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
use datafusion::common::{DataFusionError, exec_datafusion_err};
use datafusion::execution::TaskContext;
use datafusion::physical_expr_common::metrics::{
Count, ExecutionPlanMetricsSet, Label, MetricBuilder, MetricValue, Time,
};
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf::PhysicalPlanNode;
use futures::StreamExt;
use http::Extensions;
Expand Down Expand Up @@ -72,10 +74,10 @@ impl CoordinatorToWorkerMetrics {
/// remote counterparts.
pub(super) struct CoordinatorToWorkerTaskSpawner<'a> {
plan: &'a Arc<dyn ExecutionPlan>,
plan_proto: Vec<u8>,
query_id: Uuid,
stage_id: usize,
task_count: usize,
task_ctx: &'a TaskContext,
metrics: &'a CoordinatorToWorkerMetrics,
task_metrics: &'a Option<Arc<MetricsStore>>,
join_set: &'a mut JoinSet<Result<()>>,
Expand All @@ -88,18 +90,15 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> {
stage: &'a LocalStage,
metrics: &'a CoordinatorToWorkerMetrics,
task_metrics: &'a Option<Arc<MetricsStore>>,
codec: &'a dyn PhysicalExtensionCodec,
task_ctx: &'a TaskContext,
join_set: &'a mut JoinSet<Result<()>>,
) -> Result<Self> {
let plan_proto = PhysicalPlanNode::try_from_physical_plan(Arc::clone(&stage.plan), codec)?
.encode_to_vec();

Ok(Self {
plan: &stage.plan,
plan_proto,
query_id: stage.query_id,
stage_id: stage.num,
task_count: stage.tasks,
task_ctx,
metrics,
task_metrics,
join_set,
Expand All @@ -126,32 +125,49 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> {
task_index: task_i,
task_count: self.task_count,
};
self.plan.apply_with_dt_ctx(d_ctx, |plan, _| {
let Some(wuf) = wuf_registry.get_work_unit_feed(plan) else {
return Ok(TreeNodeRecursion::Continue);

let plan = Arc::clone(self.plan);
let specialized = plan.transform_down_with_dt_ctx(d_ctx, |plan, d_ctx| {
if let Some(wuf) = wuf_registry.get_work_unit_feed(&plan) {
work_unit_feed_declarations.push(WorkUnitFeedDeclaration {
id: serialize_uuid(&wuf.id()),
partitions: plan.properties().partitioning.partition_count() as u64,
});
};
work_unit_feed_declarations.push(WorkUnitFeedDeclaration {
id: serialize_uuid(&wuf.id()),
partitions: plan.properties().partitioning.partition_count() as u64,
});
Ok(TreeNodeRecursion::Continue)

if let Some(ciu) = plan.as_any().downcast_ref::<ChildrenIsolatorUnionExec>() {
let ciu = ciu.to_task_specialized(d_ctx.task_index);
return Ok(Transformed::yes(Arc::new(ciu)));
};

if let Some(dle) = plan.as_any().downcast_ref::<DistributedLeafExec>() {
let specialized = dle.to_task_specialized(d_ctx.task_index);
return Ok(Transformed::yes(specialized));
}

Ok(Transformed::no(plan))
})?;

let codec = DistributedCodec::new_combined_with_user(self.task_ctx.session_config());

let plan_proto =
PhysicalPlanNode::try_from_physical_plan(specialized.data, &codec)?.encode_to_vec();
let plan_size = plan_proto.len();

let task_key = TaskKey {
query_id: serialize_uuid(&self.query_id),
stage_id: self.stage_id as u64,
task_number: task_i as u64,
};
let msg = pb::CoordinatorToWorkerMsg {
inner: Some(Inner::SetPlanRequest(pb::SetPlanRequest {
plan_proto: self.plan_proto.clone(),
plan_proto,
task_count: self.task_count as u64,
task_key: Some(task_key.clone()),
work_unit_feed_declarations,
target_worker_url: url.to_string(),
})),
};
let plan_size = self.plan_proto.len();

let (coordinator_to_worker_tx, coordinator_to_worker_rx) =
tokio::sync::mpsc::unbounded_channel();
Expand Down
Loading
Loading