Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 171 additions & 41 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod connected_components;
mod pagerank;
mod pregel;
mod shortest_paths;
Expand All @@ -7,38 +8,190 @@ use datafusion::error::Result;
use datafusion::functions_aggregate::count::count;
use datafusion::prelude::*;

/// Column names for the vertex id column.
pub const VERTEX_ID: &str = "id";
/// Column names for the edge source column.
pub const EDGE_SRC: &str = "src";
/// Column names for the edge destination column.
pub const EDGE_DST: &str = "dst";
/// Column names for the edge column in triplet representation.
pub const EDGE_COL: &str = "edge";
/// Column names for the source vertex in triplet representation.
pub const SRC_VERTEX: &str = "src_vertex";
/// Column names for the destination vertex in triplet representation.
pub const DST_VERTEX: &str = "dst_vertex";

/// A data structure representing a graph in the form of vertices and edges.
///
/// The `GraphFrame` struct is designed to hold a graph's data where vertices
/// (nodes) and edges (connections) are represented as `DataFrame` structures.
///
/// # Fields
///
/// * `vertices` - A `DataFrame` that contains information about the graph's vertices.
/// Each row in the `DataFrame` represents a vertex (`VERTEX_ID`), and additional
/// columns can store attributes (e.g., labels or properties) for
/// each vertex.
///
/// * `edges` - A `DataFrame` that contains information about the graph's edges.
/// Each row in the `DataFrame` represents an edge, with columns
/// typically storing the source vertex (`EDGE_SRC`), destination vertex (`EDGE_DST`), and
/// any additional attributes (e.g., weights or labels) associated
/// with the edge.
///
/// # Example
///
/// ```
/// use datafusion::dataframe;
/// use graphframes_rs::{GraphFrame, VERTEX_ID, EDGE_SRC, EDGE_DST};
/// let vertices = dataframe!(
/// VERTEX_ID => vec![1i64, 2i64, 3i64],
/// "attr" => vec!["a", "b", "c"]
/// ).unwrap();
/// let edges = dataframe!(
/// EDGE_SRC => vec![1i64, 2i64, 3i64],
/// EDGE_DST => vec![3i64, 1i64, 2i64],
/// "attr" => vec!["d", "j", "h"]
/// ).unwrap();
///
/// let graph = GraphFrame { vertices, edges };
/// ```
#[derive(Debug, Clone)]
pub struct GraphFrame {
pub vertices: DataFrame,
pub edges: DataFrame,
}

impl GraphFrame {
/// Returns the total number of nodes in the graph.
///
/// # Returns
///
/// This function returns a `Result<i64>`:
/// - `Ok(i64)`: The total number of nodes (vertices) in the graph, represented as a 64-bit signed integer.
/// - `Err`: If an error occurs during the computation or retrieval of the node count.
///
/// # Example
///
/// ```
/// use datafusion::dataframe;
/// use graphframes_rs::{GraphFrame, VERTEX_ID, EDGE_SRC, EDGE_DST};
/// let vertices = dataframe!(
/// VERTEX_ID => vec![1i64, 2i64, 3i64],
/// "attr" => vec!["a", "b", "c"]
/// ).unwrap();
/// let edges = dataframe!(
/// EDGE_SRC => vec![1i64, 2i64, 3i64],
/// EDGE_DST => vec![3i64, 1i64, 2i64],
/// "attr" => vec!["d", "j", "h"]
/// ).unwrap();
///
/// let graph = GraphFrame { vertices, edges };
/// let node_count = graph.num_nodes();
/// ```
pub async fn num_nodes(&self) -> Result<i64> {
let count = self.vertices.clone().count().await?;
Ok(count as i64)
}

/// Returns the total number of edges in the graph.
///
/// # Returns
///
/// This function returns a `Result<i64>`:
/// - `Ok(i64)` - The total number of edges, represented as a 64-bit integer.
/// - `Err(E)` - If an error occurs during the computation, the error is propagated.
///
/// # Examples
///
/// ```
/// use datafusion::dataframe;
/// use graphframes_rs::{GraphFrame, VERTEX_ID, EDGE_SRC, EDGE_DST};
/// let vertices = dataframe!(
/// VERTEX_ID => vec![1i64, 2i64, 3i64],
/// "attr" => vec!["a", "b", "c"]
/// ).unwrap();
/// let edges = dataframe!(
/// EDGE_SRC => vec![1i64, 2i64, 3i64],
/// EDGE_DST => vec![3i64, 1i64, 2i64],
/// "attr" => vec!["d", "j", "h"]
/// ).unwrap();
///
/// let graph = GraphFrame { vertices, edges };
/// let edge_count = graph.num_edges();
/// ```
pub async fn num_edges(&self) -> Result<i64> {
let count = self.edges.clone().count().await?;
Ok(count as i64)
}

/// Computes the in-degrees for each vertex in the graph.
///
/// This function calculates the in-degree of each vertex by counting the number of
/// incoming edges. It returns a `DataFrame`
/// containing two columns:
/// - `VERTEX_ID`: The unique identifier of the vertex (derived from the destination of the edges).
/// - `in_degree`: The count of incoming edges (in-degrees) for each vertex.
///
/// # Returns
/// An asynchronous function that returns:
/// - `Ok(DataFrame)` containing the vertex IDs and their corresponding in-degrees.
/// - `Err` if the aggregation or selection operation fails.
///
/// # Example
/// ```rust
/// use datafusion::dataframe;
/// use graphframes_rs::{GraphFrame, VERTEX_ID, EDGE_SRC, EDGE_DST};
/// let vertices = dataframe!(
/// VERTEX_ID => vec![1i64, 2i64, 3i64],
/// "attr" => vec!["a", "b", "c"]
/// ).unwrap();
/// let edges = dataframe!(
/// EDGE_SRC => vec![1i64, 2i64, 3i64],
/// EDGE_DST => vec![3i64, 1i64, 2i64],
/// "attr" => vec!["d", "j", "h"]
/// ).unwrap();
///
/// let graph = GraphFrame { vertices, edges };
/// let edge_count = graph.in_degrees();
/// ```
pub async fn in_degrees(&self) -> Result<DataFrame> {
let df = self.edges.clone().aggregate(
vec![col(EDGE_DST)],
vec![count(col(EDGE_SRC)).alias("in_degree")],
)?;
Ok(df.select(vec![col(EDGE_DST).alias(VERTEX_ID), col("in_degree")])?)
}

/// Computes the out-degrees for each vertex in the graph.
///
/// This function calculates the out-degree of each vertex by counting the number of
/// outcoming edges. It returns a `DataFrame`
/// containing two columns:
/// - `VERTEX_ID`: The unique identifier of the vertex (derived from the destination of the edges).
/// - `in_degree`: The count of incoming edges (in-degrees) for each vertex.
///
/// # Returns
/// An asynchronous function that returns:
/// - `Ok(DataFrame)` containing the vertex IDs and their corresponding in-degrees.
/// - `Err` if the aggregation or selection operation fails.
///
/// # Example
/// ```rust
/// use datafusion::dataframe;
/// use graphframes_rs::{GraphFrame, VERTEX_ID, EDGE_SRC, EDGE_DST};
/// let vertices = dataframe!(
/// VERTEX_ID => vec![1i64, 2i64, 3i64],
/// "attr" => vec!["a", "b", "c"]
/// ).unwrap();
/// let edges = dataframe!(
/// EDGE_SRC => vec![1i64, 2i64, 3i64],
/// EDGE_DST => vec![3i64, 1i64, 2i64],
/// "attr" => vec!["d", "j", "h"]
/// ).unwrap();
///
/// let graph = GraphFrame { vertices, edges };
/// let edge_count = graph.in_degrees();
/// ```
pub async fn out_degrees(&self) -> Result<DataFrame> {
let df = self.edges.clone().aggregate(
vec![col(EDGE_SRC)],
Expand Down Expand Up @@ -133,7 +286,6 @@ impl GraphFrame {
/// let graph = GraphFrame { vertices, edges };
/// let triplets = graph.triplets();
/// ```
/// // Assuming `edges_df` and `vertices_df` are initialized DataFrames for
pub async fn triplets(&self) -> Result<DataFrame> {
let edges_struct = self.edges.clone().select(vec![
col(EDGE_SRC),
Expand Down Expand Up @@ -189,46 +341,24 @@ impl GraphFrame {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Fields};
use std::collections::HashMap;
use std::sync::Arc;

fn create_test_graph() -> Result<GraphFrame> {
let ctx = SessionContext::new();

let vertices_data = RecordBatch::try_new(
SchemaRef::from(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
])),
vec![
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
Arc::new(StringArray::from(vec![
"Hub", "Alice", "Bob", "Carol", "David", "Eve", "Frank", "Grace", "Henry",
"Ivy",
])),
],
);
let vertices = ctx.read_batch(vertices_data?)?;

let edges_data = RecordBatch::try_new(
SchemaRef::from(Schema::new(vec![
Field::new("src", DataType::Int64, false),
Field::new("dst", DataType::Int64, false),
])),
vec![
Arc::new(Int64Array::from(vec![
1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 7, 7,
8, 8, 9, 10,
])),
Arc::new(Int64Array::from(vec![
2, 3, 4, 5, 6, 7, 8, 9, 10, 3, 4, 5, 6, 4, 5, 6, 5, 6, 7, 6, 7, 8, 7, 8, 8, 9,
9, 10, 10, 1,
])),
],
);
let edges = ctx.read_batch(edges_data?)?;

pub(crate) fn create_test_graph() -> Result<GraphFrame> {
let vertices = dataframe!(
VERTEX_ID => vec![1i64, 2i64, 3i64, 4i64, 5i64, 6i64, 7i64, 8i64, 9i64, 10i64],
"name" => vec!["Hub", "Alice", "Bob", "Carol", "David", "Eve", "Frank", "Grace", "Henry", "Ivy"]
)?;

let edges = dataframe!(
EDGE_SRC => Vec::<i64>::from(
vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 7, 7, 8, 8, 9, 10,]
),
EDGE_DST => Vec::<i64>::from(
vec![2, 3, 4, 5, 6, 7, 8, 9, 10, 3, 4, 5, 6, 4, 5, 6, 5, 6, 7, 6, 7, 8, 7, 8, 8, 9, 9, 10, 10, 1,]
),
)?;

Ok(GraphFrame { vertices, edges })
}
Expand Down
33 changes: 8 additions & 25 deletions src/pregel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,36 +413,19 @@ impl GraphFrame {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::{Array, Int32Array, Int64Array, RecordBatch};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::array::{Array, Int32Array, Int64Array};
use datafusion::functions_aggregate::min_max::max;
use datafusion::functions_aggregate::sum::sum;
use std::sync::Arc;

fn create_graph(vertices: Vec<i64>, edges: Vec<Vec<i64>>) -> Result<GraphFrame> {
let ctx = SessionContext::new();

let vertices_data = RecordBatch::try_new(
SchemaRef::from(Schema::new(vec![Field::new("id", DataType::Int64, false)])),
vec![Arc::new(Int64Array::from(vertices))],
)?;
let vertices_df = ctx.read_batch(vertices_data)?;

let edges_data = RecordBatch::try_new(
SchemaRef::from(Schema::new(vec![
Field::new("src", DataType::Int64, false),
Field::new("dst", DataType::Int64, false),
])),
vec![
Arc::new(Int64Array::from(
edges.iter().map(|e| e[0]).collect::<Vec<i64>>(),
)),
Arc::new(Int64Array::from(
edges.iter().map(|e| e[1]).collect::<Vec<i64>>(),
)),
],
let vertices_df = dataframe!(
VERTEX_ID => Vec::<i64>::from(vertices),
)?;
let edges_df = ctx.read_batch(edges_data)?;
let edges_df = dataframe!(EDGE_SRC => Vec::<i64>::from(
edges.iter().map(|e| e[0]).collect::<Vec<i64>>()
), EDGE_DST => Vec::<i64>::from(
edges.iter().map(|e| e[1]).collect::<Vec<i64>>()
))?;

Ok(GraphFrame {
vertices: vertices_df,
Expand Down
Loading