diff --git a/crates/connect/src/dataframe.rs b/crates/connect/src/dataframe.rs index a69f655..e67e2f6 100644 --- a/crates/connect/src/dataframe.rs +++ b/crates/connect/src/dataframe.rs @@ -730,7 +730,27 @@ impl DataFrame { DataFrameNaFunctions::new(self) } - // !TODO observe + /// Define (named) metrics to observe on the DataFrame. + /// + /// This method attaches metric expressions to the query plan. The metrics + /// are computed during query execution and can be retrieved from the + /// execution metrics. + /// + /// # Arguments + /// * `name` - Name of the observation + /// * `exprs` - Metric expressions (e.g., `count(lit(1))`, `avg(col("value"))`) + pub fn observe(self, name: &str, exprs: I) -> DataFrame + where + I: IntoIterator, + I::Item: Into, + { + let plan = self.plan.observe(name, exprs); + + DataFrame { + spark_session: self.spark_session, + plan, + } + } /// Returns a new [DataFrame] by skiping the first n rows pub fn offset(self, num: i32) -> DataFrame { diff --git a/crates/connect/src/plan.rs b/crates/connect/src/plan.rs index 30fabb5..ea36696 100644 --- a/crates/connect/src/plan.rs +++ b/crates/connect/src/plan.rs @@ -516,6 +516,23 @@ impl LogicalPlanBuilder { LogicalPlanBuilder::from(hint_rel) } + pub fn observe(self, name: &str, exprs: I) -> LogicalPlanBuilder + where + I: IntoIterator, + I::Item: Into, + { + let metrics: Vec = + exprs.into_iter().map(|e| e.into().expression).collect(); + + let collect_metrics = RelType::CollectMetrics(Box::new(spark::CollectMetrics { + input: self.relation_input(), + name: name.to_string(), + metrics, + })); + + LogicalPlanBuilder::from(collect_metrics) + } + pub fn join<'a, T, I>( self, right: LogicalPlanBuilder,