Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions benchmarks/cdk/bin/@bench-common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export interface ExecuteQueryResult {
rowCount: number,
plan: string
elapsed: number
tasks: number
}

export interface BenchmarkRunner {
Expand Down Expand Up @@ -160,7 +161,8 @@ export async function runBenchmark(
elapsed: 0,
rowCount: 0,
error: e.toString(),
plan: ""
plan: "",
tasks: 0
})
console.error(`Query ${id} failed: ${e.toString()}`)
continue
Expand All @@ -176,7 +178,8 @@ export async function runBenchmark(
elapsed: 0,
rowCount: 0,
error: e.toString(),
plan: ""
plan: "",
tasks: 0
})
console.error(`Query ${id} failed: ${e.toString()}`)
break
Expand All @@ -188,7 +191,8 @@ export async function runBenchmark(
result.iterations.push({
elapsed: response.elapsed,
rowCount: response.rowCount,
plan: response.plan
plan: response.plan,
tasks: response.tasks,
})

console.log(
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/cdk/bin/@results.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface QueryIter {
plan: string;
rowCount: number;
elapsed: number; // Duration in milliseconds
tasks: number;
error?: string;
}

Expand Down Expand Up @@ -226,7 +227,8 @@ export class BenchResult {
rowCount: z.number(),
elapsed: z.number(),
error: z.string().optional(),
plan: z.string()
plan: z.string(),
tasks: z.number().default(0),
}).array(),
})
const data = fs.readFileSync(filePath, 'utf-8');
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/cdk/bin/datafusion-bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const QueryResponse = z.object({
count: z.number(),
plan: z.string(),
elapsed_ms: z.number(),
tasks: z.number()
})
type QueryResponse = z.infer<typeof QueryResponse>

Expand Down Expand Up @@ -133,7 +134,7 @@ class DataFusionRunner implements BenchmarkRunner {
response = await this.query(sql)
}

return { rowCount: response.count, plan: response.plan, elapsed: response.elapsed_ms };
return { rowCount: response.count, plan: response.plan, elapsed: response.elapsed_ms, tasks: response.tasks };
}

private async query(sql: string): Promise<QueryResponse> {
Expand Down
21 changes: 18 additions & 3 deletions benchmarks/cdk/bin/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ use axum::{Json, Router, extract::Query, http::StatusCode, routing::get};
use datafusion::common::DataFusionError;
use datafusion::common::instant::Instant;
use datafusion::common::runtime::SpawnedTask;
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionContext;
use datafusion_distributed::{
ChannelResolver, DistributedExt, DistributedMetricsFormat, SessionStateBuilderExt, Worker,
WorkerResolver, display_plan_ascii, get_distributed_channel_resolver,
get_distributed_worker_resolver, rewrite_distributed_plan_with_metrics,
ChannelResolver, DistributedExt, DistributedMetricsFormat, NetworkBoundaryExt,
SessionStateBuilderExt, Worker, WorkerResolver, display_plan_ascii,
get_distributed_channel_resolver, get_distributed_worker_resolver,
rewrite_distributed_plan_with_metrics,
};
use futures::{StreamExt, TryFutureExt};
use log::{error, info, warn};
Expand Down Expand Up @@ -41,6 +43,7 @@ struct QueryResult {
plan: String,
count: usize,
elapsed_ms: f64,
tasks: usize,
}

#[derive(Serialize)]
Expand Down Expand Up @@ -188,6 +191,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
let plan = display_plan_ascii(physical.as_ref(), true);
drop(task);

let mut task_count = 0;
physical
.apply(|plan| {
let Some(nb) = plan.as_network_boundary() else {
return Ok(TreeNodeRecursion::Continue);
};
task_count += nb.input_stage().task_count();
Ok(TreeNodeRecursion::Continue)
})
.expect(".apply failed");

let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
info!("Finished executing query:\n{sql}\n\n{plan}");
Expand All @@ -198,6 +212,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
count,
plan,
elapsed_ms: ms,
tasks: task_count,
}))
}
.inspect_err(|(_, msg)| {
Expand Down
Loading