From d704b3cec74e167738e2becdccccc57e38ba91ca Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 2 Oct 2024 21:16:33 -0400 Subject: [PATCH 1/2] should fail Signed-off-by: Keran Yang --- .github/workflows/ci.yaml | 5 +++++ rust/Cargo.lock | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e9861ca408..698f2f9ee0 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -122,6 +122,11 @@ jobs: CARGO_INCREMENTAL=0 RUSTFLAGS='-Cinstrument-coverage' LLVM_PROFILE_FILE='./target/debug/coverage/cargo-test-%p-%m.profraw' cargo test --all-features --workspace --all grcov . -s ./target/debug/coverage/ --binary-path ./target/debug/ -t lcov --branch --ignore-not-existing -o ./target/debug/coverage/lcov.info + - name: Check Rust formatting + working-directory: ./rust + run: | + cargo fmt -- --check + - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v4 with: diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 761512fdbf..9837be100c 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1591,7 +1591,6 @@ dependencies = [ "log", "numaflow 0.1.1", "numaflow-models", - "once_cell", "parking_lot", "pep440_rs", "prometheus-client", From 576797bcbbe276c0f388d69606c5b345af705704 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 2 Oct 2024 21:34:46 -0400 Subject: [PATCH 2/2] should pass Signed-off-by: Keran Yang --- rust/numaflow-core/src/config.rs | 2 +- rust/numaflow-core/src/message.rs | 2 +- rust/numaflow-core/src/shared/server_info.rs | 186 +++++++++++++----- .../src/transformer/user_defined.rs | 25 +-- 4 files changed, 150 insertions(+), 65 deletions(-) diff --git a/rust/numaflow-core/src/config.rs b/rust/numaflow-core/src/config.rs index c3263e999c..6310295f46 100644 --- a/rust/numaflow-core/src/config.rs +++ b/rust/numaflow-core/src/config.rs @@ -645,4 +645,4 @@ mod tests { let drop = OnFailureStrategy::Drop; assert_eq!(drop.to_string(), "drop"); } -} \ No newline at end of file +} diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index d230e994fb..64f4079764 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -7,9 +7,9 @@ use chrono::{DateTime, Utc}; use crate::error::Error; use crate::monovertex::sink_pb::sink_request::Request; use crate::monovertex::sink_pb::SinkRequest; -use crate::monovertex::{source_pb, sourcetransform_pb}; use crate::monovertex::source_pb::{read_response, AckRequest}; use crate::monovertex::sourcetransform_pb::SourceTransformRequest; +use crate::monovertex::{source_pb, sourcetransform_pb}; use crate::shared::utils::{prost_timestamp_from_utc, utc_from_timestamp}; /// A message that is sent from the source to the sink. diff --git a/rust/numaflow-core/src/shared/server_info.rs b/rust/numaflow-core/src/shared/server_info.rs index 0df809363f..f058b7a313 100644 --- a/rust/numaflow-core/src/shared/server_info.rs +++ b/rust/numaflow-core/src/shared/server_info.rs @@ -73,7 +73,12 @@ pub(crate) async fn check_for_server_compatibility( } else { // Get minimum supported SDK versions and check compatibility let min_supported_sdk_versions = version::get_minimum_supported_sdk_versions(); - check_sdk_compatibility(sdk_version, sdk_language, container_type, min_supported_sdk_versions)?; + check_sdk_compatibility( + sdk_version, + sdk_language, + container_type, + min_supported_sdk_versions, + )?; } Ok(()) @@ -110,19 +115,20 @@ fn check_numaflow_compatibility( fn check_sdk_compatibility( sdk_version: &str, sdk_language: &str, - container_type : &str, + container_type: &str, min_supported_sdk_versions: &SdkConstraints, ) -> error::Result<()> { // Check if the SDK language is present in the minimum supported SDK versions if !min_supported_sdk_versions.contains_key(sdk_language) { return Err(Error::ServerInfoError(format!( "SDK version constraint not found for language: {}, container type: {}", - sdk_language, - container_type + sdk_language, container_type ))); } let empty_map = HashMap::new(); - let lang_constraints = min_supported_sdk_versions.get(sdk_language).unwrap_or(&empty_map); + let lang_constraints = min_supported_sdk_versions + .get(sdk_language) + .unwrap_or(&empty_map); if let Some(sdk_required_version) = lang_constraints.get(container_type) { let sdk_constraint = format!(">={}", sdk_required_version); @@ -161,15 +167,13 @@ fn check_sdk_compatibility( // Language not found in the supported SDK versions warn!( "SDK version constraint not found for language: {}, container type: {}", - sdk_language, - container_type + sdk_language, container_type ); // Return error indicating the language return Err(Error::ServerInfoError(format!( "SDK version constraint not found for language: {}, container type: {}", - sdk_language, - container_type + sdk_language, container_type ))); } Ok(()) @@ -263,9 +267,7 @@ fn trim_after_dash(input: &str) -> &str { /// The file name is in the format of -server-info. fn get_container_type(server_info_file: &PathBuf) -> Option<&str> { let file_name = server_info_file.file_name()?; - let container_type = file_name - .to_str()? - .trim_end_matches("-server-info"); + let container_type = file_name.to_str()?.trim_end_matches("-server-info"); if container_type.is_empty() { None } else { @@ -562,8 +564,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -574,8 +580,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language,TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -589,8 +599,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -601,8 +615,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -616,8 +634,12 @@ mod tests { let sdk_language = "java"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -628,8 +650,12 @@ mod tests { let sdk_language = "java"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -643,8 +669,12 @@ mod tests { let sdk_language = "go"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -655,8 +685,12 @@ mod tests { let sdk_language = "go"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -670,8 +704,12 @@ mod tests { let sdk_language = "rust"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -682,8 +720,12 @@ mod tests { let sdk_language = "rust"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -697,8 +739,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -709,8 +755,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -724,8 +774,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -736,8 +790,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -751,8 +809,12 @@ mod tests { let sdk_language = "java"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -763,8 +825,12 @@ mod tests { let sdk_language = "java"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -778,8 +844,12 @@ mod tests { let sdk_language = "go"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -790,8 +860,12 @@ mod tests { let sdk_language = "go"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -805,8 +879,12 @@ mod tests { let sdk_language = "rust"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -817,8 +895,12 @@ mod tests { let sdk_language = "rust"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index 71a9d24cd6..b2564b0e72 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -1,17 +1,20 @@ use std::collections::HashMap; -use tonic::transport::Channel; -use tonic::{Request, Streaming}; +use crate::config::config; +use crate::error::{Error, Result}; +use crate::message::{Message, Offset}; +use crate::monovertex::sourcetransform_pb::{ + self, source_transform_client::SourceTransformClient, SourceTransformRequest, + SourceTransformResponse, +}; +use crate::shared::utils::utc_from_timestamp; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; +use tonic::transport::Channel; +use tonic::{Request, Streaming}; use tracing::warn; -use crate::error::{Result, Error}; -use crate::message::{Message, Offset}; -use crate::monovertex::sourcetransform_pb::{self, SourceTransformRequest, SourceTransformResponse, source_transform_client::SourceTransformClient}; -use crate::shared::utils::utc_from_timestamp; -use crate::config::config; const DROP: &str = "U+005C__DROP__"; @@ -216,7 +219,7 @@ mod tests { let mut client = SourceTransformer::new(SourceTransformClient::new( create_rpc_channel(sock_file).await?, )) - .await?; + .await?; let message = crate::message::Message { keys: vec!["first".into()], @@ -234,7 +237,7 @@ mod tests { tokio::time::Duration::from_secs(2), client.transform_fn(vec![message]), ) - .await??; + .await??; assert_eq!(resp.len(), 1); // we need to drop the client, because if there are any in-flight requests @@ -291,7 +294,7 @@ mod tests { let mut client = SourceTransformer::new(SourceTransformClient::new( create_rpc_channel(sock_file).await?, )) - .await?; + .await?; let message = crate::message::Message { keys: vec!["second".into()], @@ -318,4 +321,4 @@ mod tests { handle.await.expect("failed to join server task"); Ok(()) } -} \ No newline at end of file +}