Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 133 additions & 88 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 9 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@ exclude = [
]

[dependencies]
datafusion = "48.0.1"
datafusion = "49.0.2"
tokio = {version = "1"}

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports", "async_tokio"] }
criterion = { version = "0.7", features = ["html_reports", "async_tokio"] }
tokio = { version = "1", features = ["full"] }

[[bench]]
name = "pagerank_benchmark"
harness = false # To disable Rust's default benchmarking and use the Criterion one

# Adding more benchmarks
# [[bench]]
# name = "shortestdistance_benchmark"
# harness = false
[[bench]]
name = "cc_benchmark"
harness = false

[[bench]]
name = "sp_benchmark"
harness = false
10 changes: 5 additions & 5 deletions benches/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Running Benchmarks for Graphframe-rs
# Running Benchmarks for graphframes-rs

Benchmarking for Graphframe-rs are currently done on LDBC Graphalytics [datasets](https://ldbcouncil.org/benchmarks/graphalytics/datasets/).
Benchmarking for graphframes-rs is currently done on LDBC Graphalytics [datasets](https://ldbcouncil.org/benchmarks/graphalytics/datasets/).
Benchmarking runs and reports are executed/generated as html-reports using Rust Criterion crate.

## How to run benchmarks ?
## How to run benchmarks?

`run_benchmarks.py` file is the main source for running the benchmarks.

Expand All @@ -24,9 +24,9 @@ CLI utility:

### Parameters for `run_benchmarks.py`

- `--dataset`: [MANDATORY] LDBC dataset name on which user want to run the benchmark (for e.g. test-pr-directed, cit-Patents). Dataset name are exactly same as mentioned in LDBC website.
- `--dataset`: LDBC dataset name on which user want to run the benchmark (for e.g., test-pr-directed, cit-Patents). Dataset name is exactly the same as mentioned in LDBC website. Default is wiki-Talk.
- `--checkpoint_interval`: If user wants to define a specific number of checkpoints for Algorithms to run on. `default: 1`
- `--name`: If a particular benchmark needs to run. Name should be same as the `[[bench]]` names present in `Cargo.toml`
- `--name`: [MANDATORY] If a particular benchmark needs to run. Name should be same as the `[[bench]]` names present in `Cargo.toml`

```bash
# Running all the benchmarks
Expand Down
42 changes: 42 additions & 0 deletions benches/cc_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use criterion::{Criterion, criterion_group, criterion_main};
use graphframes_rs::util::create_ldbc_test_graph;
use std::env;
use tokio::runtime::Runtime;

fn benchmark_cc(c: &mut Criterion) {
let dataset_name =
env::var("BENCHMARK_DATASET").expect("BENCHMARK_DATASET environment variable not set");
let is_weighted = match env::var("WEIGHTED").expect("WEIGHTED environment variable not set") {
s if s == "true" => true,
_ => false,
};

let mut group = c.benchmark_group("Connected Components");
group.sample_size(10);
group.measurement_time(std::time::Duration::from_secs(200));

// Create a Tokio runtime to execute the async graph loading function.
let rt = Runtime::new().unwrap();

// Load the graph data once before running the benchmark.
let graph = rt
.block_on(create_ldbc_test_graph(&dataset_name, true, is_weighted))
.expect("Failed to create test graph");

// Creating cc_builder here so to exclude the time of generation in each iteration
let cc_builder = graph.connected_components();

// Define the benchmark.
// Criterion runs the code inside the closure many times to get a reliable measurement.
group.bench_function(String::from("cc-".to_owned() + &dataset_name), |b| {
// Use the `to_async` adapter to benchmark an async function.
b.to_async(&rt).iter(|| async {
let _ = cc_builder.clone().run().await.unwrap().data.collect().await;
})
});

group.finish();
}

criterion_group!(benches, benchmark_cc);
criterion_main!(benches);
9 changes: 8 additions & 1 deletion benches/pagerank_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,21 @@ fn benchmark_pagerank(c: &mut Criterion) {
.parse()
.expect("CHECKPOINT_INTERVAL is not a valid int");

let is_weighted = match env::var("WEIGHTED").expect("WEIGHTED environment variable not set") {
s if s == "true" => true,
_ => false,
};

let mut group = c.benchmark_group("PageRank");
group.sample_size(10);
group.measurement_time(std::time::Duration::from_secs(200));

// Create a Tokio runtime to execute the async graph loading function.
let rt = Runtime::new().unwrap();

// Load the graph data once before running the benchmark.
let graph = rt
.block_on(create_ldbc_test_graph(&dataset_name, true, false))
.block_on(create_ldbc_test_graph(&dataset_name, true, is_weighted))
.expect("Failed to create test graph");

// Creating pagerank_builder here so to exclude the time of generation in each iteration
Expand Down
45 changes: 45 additions & 0 deletions benches/sp_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use criterion::{Criterion, criterion_group, criterion_main};
use graphframes_rs::util::create_ldbc_test_graph;
use std::env;
use tokio::runtime::Runtime;

fn benchmark_sp(c: &mut Criterion) {
let dataset_name =
env::var("BENCHMARK_DATASET").expect("BENCHMARK_DATASET environment variable not set");
let checkpoint_interval: usize = env::var("CHECKPOINT_INTERVAL")
.expect("BENCHMARK_DATASET environment variable not set")
.parse()
.expect("CHECKPOINT_INTERVAL is not a valid int");

let is_weighted = match env::var("WEIGHTED").expect("WEIGHTED environment variable not set") {
s if s == "true" => true,
_ => false,
};
let mut group = c.benchmark_group("ShortestPath");
group.sample_size(10);
group.measurement_time(std::time::Duration::from_secs(200));

let rt = Runtime::new().unwrap();
let graph = rt
.block_on(create_ldbc_test_graph(&dataset_name, true, is_weighted))
.expect("Failed to create test graph");

let sp_builder = graph
.shortest_paths(vec![2i64]) // TODO: replace to read from props
.checkpoint_interval(checkpoint_interval);

group.bench_function(
String::from("sp-".to_owned() + &dataset_name + "-cp-" + &checkpoint_interval.to_string()),
|b| {
// Use the `to_async` adapter to benchmark an async function.
b.to_async(&rt).iter(|| async {
let _ = sp_builder.clone().run().await.unwrap().collect().await;
})
},
);

group.finish();
}

criterion_group!(benches, benchmark_sp);
criterion_main!(benches);
28 changes: 19 additions & 9 deletions run_benchmarks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import argparse
import os
import pathlib
import requests
import shutil
import subprocess
import sys
import shutil
import time

import requests

# The base URL for downloading Graphalytics datasets.
BASE_URL = "https://datasets.ldbcouncil.org/graphalytics"

Expand All @@ -27,7 +28,7 @@ def prepare_dataset(dataset_name: str):
print(f"Dataset '{dataset_name}' is ready.")
return

# make dataset_dir if doesn't exist
# make dataset_dir if it doesn't exist
os.mkdir(dataset_dir)

# If the archive doesn't exist, download it.
Expand Down Expand Up @@ -97,7 +98,7 @@ def prepare_dataset(dataset_name: str):
for dirpath, _, filenames in os.walk(dataset_dir):
for filename in filenames:
if (not filename.endswith(".properties")) and (
not filename.endswith(".tar.zst")
not filename.endswith(".tar.zst")
):
old_path = pathlib.Path(dirpath) / filename
new_path = old_path.with_name(f"{old_path.name}.csv")
Expand All @@ -118,7 +119,7 @@ def prepare_dataset(dataset_name: str):
sys.exit(1)


def run_benchmarks(dataset_name: str, checkpoint_interval: int, benchmark_name: str):
def run_benchmarks(dataset_name: str, checkpoint_interval: int, benchmark_name: str, is_weighted: str = "false"):
"""
Runs the Rust benchmarks using 'cargo bench', passing the dataset name
as an environment variable.
Expand All @@ -128,7 +129,8 @@ def run_benchmarks(dataset_name: str, checkpoint_interval: int, benchmark_name:
# Set the dataset name in an environment variable for the benchmark process.
env = os.environ.copy()
env["BENCHMARK_DATASET"] = dataset_name
env["CHECKPOINT_INTERVAL"] = checkpoint_interval
env["CHECKPOINT_INTERVAL"] = str(checkpoint_interval)
env["WEIGHTED"] = is_weighted

# Execute 'cargo bench' and stream its output.
try:
Expand Down Expand Up @@ -181,7 +183,8 @@ def main():
parser.add_argument(
"--dataset",
type=str,
required=True,
default="wiki-Talk",
required=False,
help="The name of the Graphalytics dataset to download and use for benchmarking (e.g., 'test-pr-directed').",
)
parser.add_argument(
Expand All @@ -194,9 +197,16 @@ def main():
parser.add_argument(
"--name",
type=str,
required=False,
required=True,
help="Name of the benchmark that needs to run.",
)
parser.add_argument(
"--weighted",
type=str,
required=False,
default="false",
help="Whether the graph is weighted or not.",
)
args = parser.parse_args()
dataset = args.dataset
checkpoint_interval = args.checkpoint_interval
Expand All @@ -206,7 +216,7 @@ def main():
BENCH_DATA_DIR.mkdir(parents=True, exist_ok=True)

prepare_dataset(dataset)
run_benchmarks(dataset, checkpoint_interval, benchmark_name)
run_benchmarks(dataset, checkpoint_interval, benchmark_name, is_weighted=args.weighted)


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions src/connected_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ async fn min_nbr_sum(min_neighbours: &DataFrame) -> Result<i128> {
.map(|a| a.value(0))
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ConnectedComponentsOutput {
pub data: DataFrame,
pub num_iterations: usize,
pub min_nbr_sum: Vec<i128>,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ConnectedComponentsBuilder<'a> {
graph_frame: &'a GraphFrame,
}
Expand Down
1 change: 1 addition & 0 deletions src/shortest_paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl Accumulator for DistancesMap {
///
/// This builder helps configure and execute a Pregel algorithm that computes the shortest paths
/// from all vertices in the graph to a specified set of landmark vertices.
#[derive(Debug, Clone)]
pub struct ShortestPathsBuilder<'a> {
/// Reference to the graph frame containing vertices and edges
graph_frame: &'a GraphFrame,
Expand Down
2 changes: 2 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub async fn create_ldbc_test_graph(
.schema(&vertices_schema),
)
.await?;
println!("read {} vertices", vertices.clone().count().await?);
println!("read {} edges", edges.clone().count().await?);
Ok(GraphFrame { vertices, edges })
}

Expand Down
Loading