From 811e6da35c320b644196f67b42b32890a461f9ab Mon Sep 17 00:00:00 2001 From: chakkk309 Date: Mon, 25 May 2026 16:46:32 +0300 Subject: [PATCH] Port IsNotNullExpr proto serialization hooks --- .../src/expressions/is_not_null.rs | 177 +++++++++++++++++- .../proto/src/physical_plan/from_proto.rs | 10 +- .../proto/src/physical_plan/to_proto.rs | 15 +- 3 files changed, 178 insertions(+), 24 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 86acf0a4ea116..cc476f8768fc7 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -22,8 +22,7 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use datafusion_common::Result; -use datafusion_common::ScalarValue; +use datafusion_common::{Result, ScalarValue}; use datafusion_expr::ColumnarValue; use std::hash::Hash; use std::sync::Arc; @@ -103,6 +102,60 @@ impl PhysicalExpr for IsNotNullExpr { self.arg.fmt_sql(f)?; write!(f, " IS NOT NULL") } + + #[cfg(feature = "proto")] + fn try_to_proto( + &self, + ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + Ok(Some(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( + Box::new(protobuf::PhysicalIsNotNull { + expr: Some(Box::new(ctx.encode_child(&self.arg)?)), + }), + )), + })) + } +} + +#[cfg(feature = "proto")] +impl IsNotNullExpr { + /// Reconstruct an [`IsNotNullExpr`] from its protobuf representation. + /// + /// Takes the whole [`PhysicalExprNode`] — the exact inverse of what + /// [`PhysicalExpr::try_to_proto`] produces — so every expression's + /// `try_from_proto` shares one signature. + /// + /// [`PhysicalExprNode`]: datafusion_proto_models::protobuf::PhysicalExprNode + /// [`PhysicalExpr::try_to_proto`]: datafusion_physical_expr_common::physical_expr::PhysicalExpr::try_to_proto + /// [`PhysicalExprDecodeCtx::decode`]: datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx::decode + pub fn try_from_proto( + node: &datafusion_proto_models::protobuf::PhysicalExprNode, + ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + let node = match &node.expr_type { + Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(node)) => { + node.as_ref() + } + _ => { + return datafusion_common::internal_err!( + "PhysicalExprNode is not an IsNotNullExpr" + ); + } + }; + let expr = node.expr.as_deref().ok_or_else(|| { + datafusion_common::DataFusionError::Internal( + "IsNotNullExpr is missing required field 'expr'".to_string(), + ) + })?; + + Ok(Arc::new(IsNotNullExpr::new(ctx.decode(expr)?))) + } } /// Create an IS NOT NULL expression @@ -113,6 +166,8 @@ pub fn is_not_null(arg: Arc) -> Result> #[cfg(test)] mod tests { use super::*; + #[cfg(feature = "proto")] + use crate::expressions::Column; use crate::expressions::col; use arrow::array::{ Array, BooleanArray, Float64Array, Int32Array, StringArray, UnionArray, @@ -122,6 +177,54 @@ mod tests { use datafusion_common::cast::as_boolean_array; use datafusion_physical_expr_common::physical_expr::fmt_sql; + #[cfg(feature = "proto")] + use datafusion_physical_expr_common::physical_expr::{ + proto_decode::{PhysicalExprDecode, PhysicalExprDecodeCtx}, + proto_encode::{PhysicalExprEncode, PhysicalExprEncodeCtx}, + }; + #[cfg(feature = "proto")] + use datafusion_proto_models::protobuf; + + #[cfg(feature = "proto")] + struct TestProtoCodec; + + #[cfg(feature = "proto")] + impl PhysicalExprEncode for TestProtoCodec { + fn encode( + &self, + expr: &Arc, + ) -> Result { + let ctx = PhysicalExprEncodeCtx::new(self); + expr.try_to_proto(&ctx)?.ok_or_else(|| { + datafusion_common::DataFusionError::Internal( + "Expression did not serialize in test codec".to_string(), + ) + }) + } + } + + #[cfg(feature = "proto")] + impl PhysicalExprDecode for TestProtoCodec { + fn decode( + &self, + node: &protobuf::PhysicalExprNode, + schema: &Schema, + ) -> Result> { + let ctx = PhysicalExprDecodeCtx::new(schema, self); + match &node.expr_type { + Some(protobuf::physical_expr_node::ExprType::Column(_)) => { + Column::try_from_proto(node, &ctx) + } + Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(_)) => { + IsNotNullExpr::try_from_proto(node, &ctx) + } + _ => datafusion_common::internal_err!( + "Unsupported expression in test decoder" + ), + } + } + } + #[test] fn is_not_null_op() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]); @@ -212,4 +315,74 @@ mod tests { Ok(()) } + + #[cfg(feature = "proto")] + #[test] + fn is_not_null_proto_hook_roundtrip() -> Result<()> { + let arg = Arc::new(Column::new("a", 0)) as Arc; + let expr = Arc::new(IsNotNullExpr::new(arg)) as Arc; + + let codec = TestProtoCodec; + let proto = codec.encode(&expr)?; + + let is_not_null = match &proto.expr_type { + Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(is_not_null)) => { + is_not_null + } + other => panic!("Expected IsNotNullExpr proto, got {other:?}"), + }; + assert!(matches!( + is_not_null + .expr + .as_deref() + .and_then(|expr| expr.expr_type.as_ref()), + Some(protobuf::physical_expr_node::ExprType::Column(_)) + )); + + let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]); + let decode_ctx = PhysicalExprDecodeCtx::new(&schema, &codec); + let decoded = IsNotNullExpr::try_from_proto(&proto, &decode_ctx)?; + + assert_eq!(decoded.to_string(), "a@0 IS NOT NULL"); + Ok(()) + } + + #[cfg(feature = "proto")] + #[test] + fn is_not_null_try_from_proto_errors() { + let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]); + let codec = TestProtoCodec; + let decode_ctx = PhysicalExprDecodeCtx::new(&schema, &codec); + + let wrong_variant = protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::Column( + protobuf::PhysicalColumn { + name: "a".to_string(), + index: 0, + }, + )), + }; + let error = IsNotNullExpr::try_from_proto(&wrong_variant, &decode_ctx) + .expect_err("wrong variant should error") + .strip_backtrace(); + assert!( + error.contains("PhysicalExprNode is not an IsNotNullExpr"), + "{error}" + ); + + let missing_child = protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( + Box::new(protobuf::PhysicalIsNotNull { expr: None }), + )), + }; + let error = IsNotNullExpr::try_from_proto(&missing_child, &decode_ctx) + .expect_err("missing child should error") + .strip_backtrace(); + assert!( + error.contains("IsNotNullExpr is missing required field 'expr'"), + "{error}" + ); + } } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 96144b11e9d3a..e98a8db366e43 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -303,15 +303,7 @@ pub fn parse_physical_expr_with_converter( proto_converter, )?)) } - ExprType::IsNotNullExpr(e) => { - Arc::new(IsNotNullExpr::new(parse_required_physical_expr( - e.expr.as_deref(), - ctx, - "expr", - input_schema, - proto_converter, - )?)) - } + ExprType::IsNotNullExpr(_) => IsNotNullExpr::try_from_proto(proto, &decode_ctx)?, ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr( e.expr.as_deref(), ctx, diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 5dd643c84ba21..bd3741d067ee4 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, - NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNullExpr, Literal, NegativeExpr, + NotExpr, TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -401,17 +401,6 @@ pub fn serialize_physical_expr_with_converter( }), )), }) - } else if let Some(expr) = expr.downcast_ref::() { - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( - Box::new(protobuf::PhysicalIsNotNull { - expr: Some(Box::new( - proto_converter.physical_expr_to_proto(expr.arg(), codec)?, - )), - }), - )), - }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id,