From 14e2ff499b10c0f082e8efa3a3d85ca8b134c449 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Sat, 23 May 2026 14:21:03 +0200 Subject: [PATCH] Report distributed tasks in remote benchmarks --- benchmarks/cdk/bin/@bench-common.ts | 10 +++++++--- benchmarks/cdk/bin/@results.ts | 4 +++- benchmarks/cdk/bin/datafusion-bench.ts | 3 ++- benchmarks/cdk/bin/worker.rs | 21 ++++++++++++++++++--- 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/benchmarks/cdk/bin/@bench-common.ts b/benchmarks/cdk/bin/@bench-common.ts index 56ef6461..e441ccb6 100644 --- a/benchmarks/cdk/bin/@bench-common.ts +++ b/benchmarks/cdk/bin/@bench-common.ts @@ -67,6 +67,7 @@ export interface ExecuteQueryResult { rowCount: number, plan: string elapsed: number + tasks: number } export interface BenchmarkRunner { @@ -160,7 +161,8 @@ export async function runBenchmark( elapsed: 0, rowCount: 0, error: e.toString(), - plan: "" + plan: "", + tasks: 0 }) console.error(`Query ${id} failed: ${e.toString()}`) continue @@ -176,7 +178,8 @@ export async function runBenchmark( elapsed: 0, rowCount: 0, error: e.toString(), - plan: "" + plan: "", + tasks: 0 }) console.error(`Query ${id} failed: ${e.toString()}`) break @@ -188,7 +191,8 @@ export async function runBenchmark( result.iterations.push({ elapsed: response.elapsed, rowCount: response.rowCount, - plan: response.plan + plan: response.plan, + tasks: response.tasks, }) console.log( diff --git a/benchmarks/cdk/bin/@results.ts b/benchmarks/cdk/bin/@results.ts index 7a62e204..6993aed8 100644 --- a/benchmarks/cdk/bin/@results.ts +++ b/benchmarks/cdk/bin/@results.ts @@ -11,6 +11,7 @@ export interface QueryIter { plan: string; rowCount: number; elapsed: number; // Duration in milliseconds + tasks: number; error?: string; } @@ -226,7 +227,8 @@ export class BenchResult { rowCount: z.number(), elapsed: z.number(), error: z.string().optional(), - plan: z.string() + plan: z.string(), + tasks: z.number().default(0), }).array(), }) const data = fs.readFileSync(filePath, 'utf-8'); diff --git a/benchmarks/cdk/bin/datafusion-bench.ts b/benchmarks/cdk/bin/datafusion-bench.ts index 77d43dd2..ba5e0b2a 100644 --- a/benchmarks/cdk/bin/datafusion-bench.ts +++ b/benchmarks/cdk/bin/datafusion-bench.ts @@ -81,6 +81,7 @@ const QueryResponse = z.object({ count: z.number(), plan: z.string(), elapsed_ms: z.number(), + tasks: z.number() }) type QueryResponse = z.infer @@ -133,7 +134,7 @@ class DataFusionRunner implements BenchmarkRunner { response = await this.query(sql) } - return { rowCount: response.count, plan: response.plan, elapsed: response.elapsed_ms }; + return { rowCount: response.count, plan: response.plan, elapsed: response.elapsed_ms, tasks: response.tasks }; } private async query(sql: string): Promise { diff --git a/benchmarks/cdk/bin/worker.rs b/benchmarks/cdk/bin/worker.rs index ea7cbaab..bffd9797 100644 --- a/benchmarks/cdk/bin/worker.rs +++ b/benchmarks/cdk/bin/worker.rs @@ -5,14 +5,16 @@ use axum::{Json, Router, extract::Query, http::StatusCode, routing::get}; use datafusion::common::DataFusionError; use datafusion::common::instant::Instant; use datafusion::common::runtime::SpawnedTask; +use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion::execution::SessionStateBuilder; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::execute_stream; use datafusion::prelude::SessionContext; use datafusion_distributed::{ - ChannelResolver, DistributedExt, DistributedMetricsFormat, SessionStateBuilderExt, Worker, - WorkerResolver, display_plan_ascii, get_distributed_channel_resolver, - get_distributed_worker_resolver, rewrite_distributed_plan_with_metrics, + ChannelResolver, DistributedExt, DistributedMetricsFormat, NetworkBoundaryExt, + SessionStateBuilderExt, Worker, WorkerResolver, display_plan_ascii, + get_distributed_channel_resolver, get_distributed_worker_resolver, + rewrite_distributed_plan_with_metrics, }; use futures::{StreamExt, TryFutureExt}; use log::{error, info, warn}; @@ -41,6 +43,7 @@ struct QueryResult { plan: String, count: usize, elapsed_ms: f64, + tasks: usize, } #[derive(Serialize)] @@ -188,6 +191,17 @@ async fn main() -> Result<(), Box> { let plan = display_plan_ascii(physical.as_ref(), true); drop(task); + let mut task_count = 0; + physical + .apply(|plan| { + let Some(nb) = plan.as_network_boundary() else { + return Ok(TreeNodeRecursion::Continue); + }; + task_count += nb.input_stage().task_count(); + Ok(TreeNodeRecursion::Continue) + }) + .expect(".apply failed"); + let elapsed = start.elapsed(); let ms = elapsed.as_secs_f64() * 1000.0; info!("Finished executing query:\n{sql}\n\n{plan}"); @@ -198,6 +212,7 @@ async fn main() -> Result<(), Box> { count, plan, elapsed_ms: ms, + tasks: task_count, })) } .inspect_err(|(_, msg)| {