diff --git a/FlinkDotNet/Flink.JobBuilder.Tests/Tests/JobDefinitionModelTests.cs b/FlinkDotNet/Flink.JobBuilder.Tests/Tests/JobDefinitionModelTests.cs index e8d03ad5..007c8028 100644 --- a/FlinkDotNet/Flink.JobBuilder.Tests/Tests/JobDefinitionModelTests.cs +++ b/FlinkDotNet/Flink.JobBuilder.Tests/Tests/JobDefinitionModelTests.cs @@ -527,236 +527,4 @@ public void WindowOperationDefinition_AllTimeUnits_Supported() } #endregion - - #region Unified Sink API v2 Tests - - [Test] - public void UnifiedSinkV2Definition_DefaultConstructor_InitializesProperties() - { - var sink = new UnifiedSinkV2Definition(); - - Assert.That(sink.Type, Is.EqualTo("unified_sink_v2")); - Assert.That(sink.SinkType, Is.EqualTo(string.Empty)); - Assert.That(sink.WriterConfig, Is.Not.Null); - Assert.That(sink.CommitterConfig, Is.Null); - Assert.That(sink.Semantics, Is.EqualTo("at-least-once")); - Assert.That(sink.Stateful, Is.False); - Assert.That(sink.Properties, Is.Not.Null); - Assert.That(sink.Properties, Is.Empty); - } - - [Test] - public void UnifiedSinkV2Definition_WithKafkaSink_StoresConfiguration() - { - var writerConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - Properties = new Dictionary - { - { "topic", "output-topic" }, - { "bootstrapServers", "localhost:9092" } - } - }; - - var sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = writerConfig, - Semantics = "at-least-once", - Stateful = false - }; - - Assert.That(sink.SinkType, Is.EqualTo("kafka")); - Assert.That(sink.WriterConfig.ClassName, Is.EqualTo("KafkaWriter")); - Assert.That(sink.WriterConfig.Properties["topic"], Is.EqualTo("output-topic")); - Assert.That(sink.WriterConfig.Properties["bootstrapServers"], Is.EqualTo("localhost:9092")); - Assert.That(sink.Semantics, Is.EqualTo("at-least-once")); - } - - [Test] - public void UnifiedSinkV2Definition_WithExactlyOnceSemantics_RequiresCommitter() - { - var committerConfig = new SinkCommitterConfig - { - Enabled = true, - ClassName = "KafkaCommitter", - Properties = new Dictionary - { - { "transactionPrefix", "flink-kafka-" } - } - }; - - var sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig(), - CommitterConfig = committerConfig, - Semantics = "exactly-once", - Stateful = true - }; - - Assert.That(sink.Semantics, Is.EqualTo("exactly-once")); - Assert.That(sink.CommitterConfig, Is.Not.Null); - Assert.That(sink.CommitterConfig.Enabled, Is.True); - Assert.That(sink.CommitterConfig.ClassName, Is.EqualTo("KafkaCommitter")); - Assert.That(sink.Stateful, Is.True); - } - - [Test] - public void UnifiedSinkV2Definition_ImplementsISinkDefinition() - { - var sink = new UnifiedSinkV2Definition(); - - Assert.That(sink, Is.InstanceOf()); - Assert.That(sink.Type, Is.EqualTo("unified_sink_v2")); - } - - [Test] - public void UnifiedSinkV2Definition_WithCustomProperties_StoresAll() - { - var properties = new Dictionary - { - { "retry.max", "3" }, - { "timeout.ms", "5000" }, - { "compression", "gzip" } - }; - - var sink = new UnifiedSinkV2Definition - { - SinkType = "custom", - Properties = properties - }; - - Assert.That(sink.Properties, Has.Count.EqualTo(3)); - Assert.That(sink.Properties["retry.max"], Is.EqualTo("3")); - Assert.That(sink.Properties["timeout.ms"], Is.EqualTo("5000")); - Assert.That(sink.Properties["compression"], Is.EqualTo("gzip")); - } - - [Test] - public void SinkWriterConfig_DefaultConstructor_InitializesEmpty() - { - var config = new SinkWriterConfig(); - - Assert.That(config.ClassName, Is.EqualTo(string.Empty)); - Assert.That(config.Properties, Is.Not.Null); - Assert.That(config.Properties, Is.Empty); - } - - [Test] - public void SinkWriterConfig_WithProperties_StoresAll() - { - var config = new SinkWriterConfig - { - ClassName = "MyWriter", - Properties = new Dictionary - { - { "batchSize", 100 }, - { "flushInterval", 5000 }, - { "bufferSize", 1024 } - } - }; - - Assert.That(config.ClassName, Is.EqualTo("MyWriter")); - Assert.That(config.Properties, Has.Count.EqualTo(3)); - Assert.That(config.Properties["batchSize"], Is.EqualTo(100)); - Assert.That(config.Properties["flushInterval"], Is.EqualTo(5000)); - } - - [Test] - public void SinkCommitterConfig_DefaultConstructor_InitializesEmpty() - { - var config = new SinkCommitterConfig(); - - Assert.That(config.Enabled, Is.False); - Assert.That(config.ClassName, Is.Null); - Assert.That(config.Properties, Is.Not.Null); - Assert.That(config.Properties, Is.Empty); - } - - [Test] - public void SinkCommitterConfig_WithProperties_StoresAll() - { - var config = new SinkCommitterConfig - { - Enabled = true, - ClassName = "MyCommitter", - Properties = new Dictionary - { - { "checkpointMode", "exactly-once" }, - { "commitTimeout", 60000 } - } - }; - - Assert.That(config.Enabled, Is.True); - Assert.That(config.ClassName, Is.EqualTo("MyCommitter")); - Assert.That(config.Properties, Has.Count.EqualTo(2)); - Assert.That(config.Properties["checkpointMode"], Is.EqualTo("exactly-once")); - } - - [Test] - public void JobDefinition_WithUnifiedSinkV2_CanBeAssigned() - { - var sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig { ClassName = "KafkaWriter" }, - Semantics = "exactly-once" - }; - - var jobDef = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = sink - }; - - Assert.That(jobDef.Sink, Is.InstanceOf()); - var unifiedSink = jobDef.Sink as UnifiedSinkV2Definition; - Assert.That(unifiedSink, Is.Not.Null); - Assert.That(unifiedSink.SinkType, Is.EqualTo("kafka")); - Assert.That(unifiedSink.Semantics, Is.EqualTo("exactly-once")); - } - - [Test] - public void UnifiedSinkV2Definition_Serialization_RoundTrip() - { - var original = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - Properties = new Dictionary - { - { "topic", "test" }, - { "servers", "localhost:9092" } - } - }, - CommitterConfig = new SinkCommitterConfig - { - Enabled = true, - ClassName = "KafkaCommitter" - }, - Semantics = "exactly-once", - Stateful = true, - Properties = new Dictionary { { "key", "value" } } - }; - - var json = System.Text.Json.JsonSerializer.Serialize(original); - var deserialized = System.Text.Json.JsonSerializer.Deserialize(json); - - Assert.That(deserialized, Is.Not.Null); - Assert.That(deserialized, Is.InstanceOf()); - - var sink = deserialized as UnifiedSinkV2Definition; - Assert.That(sink, Is.Not.Null); - Assert.That(sink.SinkType, Is.EqualTo("kafka")); - Assert.That(sink.Semantics, Is.EqualTo("exactly-once")); - Assert.That(sink.Stateful, Is.True); - Assert.That(sink.WriterConfig.ClassName, Is.EqualTo("KafkaWriter")); - Assert.That(sink.CommitterConfig, Is.Not.Null); - Assert.That(sink.CommitterConfig.Enabled, Is.True); - } - - #endregion } diff --git a/FlinkDotNet/Flink.JobBuilder.Tests/Tests/PerformanceConfigModelTests.cs b/FlinkDotNet/Flink.JobBuilder.Tests/Tests/PerformanceConfigModelTests.cs index 5faeac5d..e62917a3 100644 --- a/FlinkDotNet/Flink.JobBuilder.Tests/Tests/PerformanceConfigModelTests.cs +++ b/FlinkDotNet/Flink.JobBuilder.Tests/Tests/PerformanceConfigModelTests.cs @@ -4,117 +4,11 @@ namespace Flink.JobBuilder.Tests.Tests; /// /// Unit tests for Performance and Format configuration models (Flink 2.1+). -/// Tests BatchingConfig and StateBackendConfig to achieve 100% code coverage. +/// Tests StateBackendConfig, ExecutionPlanConfig, and OptimizerConfig to achieve 100% code coverage. /// [TestFixture] public class PerformanceConfigModelTests { - #region BatchingConfig Tests - - [Test] - public void BatchingConfig_DefaultConstructor_AllPropertiesNull() - { - var config = new BatchingConfig(); - - Assert.That(config.MaxBatchSize, Is.Null); - Assert.That(config.MaxBatchSizeInBytes, Is.Null); - Assert.That(config.MaxTimeInBufferMs, Is.Null); - Assert.That(config.MaxInFlightRequests, Is.Null); - Assert.That(config.MaxBufferedRequests, Is.Null); - } - - [Test] - public void BatchingConfig_SetMaxBatchSize_ReturnsValue() - { - var config = new BatchingConfig { MaxBatchSize = 1000 }; - - Assert.That(config.MaxBatchSize, Is.EqualTo(1000)); - } - - [Test] - public void BatchingConfig_SetMaxBatchSizeInBytes_ReturnsValue() - { - var config = new BatchingConfig { MaxBatchSizeInBytes = 5242880 }; // 5MB - - Assert.That(config.MaxBatchSizeInBytes, Is.EqualTo(5242880)); - } - - [Test] - public void BatchingConfig_SetMaxTimeInBufferMs_ReturnsValue() - { - var config = new BatchingConfig { MaxTimeInBufferMs = 1000 }; - - Assert.That(config.MaxTimeInBufferMs, Is.EqualTo(1000)); - } - - [Test] - public void BatchingConfig_SetMaxInFlightRequests_ReturnsValue() - { - var config = new BatchingConfig { MaxInFlightRequests = 50 }; - - Assert.That(config.MaxInFlightRequests, Is.EqualTo(50)); - } - - [Test] - public void BatchingConfig_SetMaxBufferedRequests_ReturnsValue() - { - var config = new BatchingConfig { MaxBufferedRequests = 10000 }; - - Assert.That(config.MaxBufferedRequests, Is.EqualTo(10000)); - } - - [Test] - public void BatchingConfig_SetAllProperties_ReturnsAllValues() - { - var config = new BatchingConfig - { - MaxBatchSize = 1000, - MaxBatchSizeInBytes = 5242880, - MaxTimeInBufferMs = 1000, - MaxInFlightRequests = 50, - MaxBufferedRequests = 10000 - }; - - Assert.That(config.MaxBatchSize, Is.EqualTo(1000)); - Assert.That(config.MaxBatchSizeInBytes, Is.EqualTo(5242880)); - Assert.That(config.MaxTimeInBufferMs, Is.EqualTo(1000)); - Assert.That(config.MaxInFlightRequests, Is.EqualTo(50)); - Assert.That(config.MaxBufferedRequests, Is.EqualTo(10000)); - } - - [Test] - public void BatchingConfig_SizeBased_OnlySetsSizeProperties() - { - var config = new BatchingConfig - { - MaxBatchSize = 2000, - MaxBatchSizeInBytes = 10485760 // 10MB - }; - - Assert.That(config.MaxBatchSize, Is.EqualTo(2000)); - Assert.That(config.MaxBatchSizeInBytes, Is.EqualTo(10485760)); - Assert.That(config.MaxTimeInBufferMs, Is.Null); - Assert.That(config.MaxInFlightRequests, Is.Null); - Assert.That(config.MaxBufferedRequests, Is.Null); - } - - [Test] - public void BatchingConfig_TimeBased_OnlySetsTimeProperty() - { - var config = new BatchingConfig - { - MaxTimeInBufferMs = 500 - }; - - Assert.That(config.MaxTimeInBufferMs, Is.EqualTo(500)); - Assert.That(config.MaxBatchSize, Is.Null); - Assert.That(config.MaxBatchSizeInBytes, Is.Null); - Assert.That(config.MaxInFlightRequests, Is.Null); - Assert.That(config.MaxBufferedRequests, Is.Null); - } - - #endregion - #region StateBackendConfig Tests [Test] @@ -288,41 +182,6 @@ public void StateBackendConfig_FilesystemBackend_OnlyCheckpointDir() #endregion - #region Integration with SinkWriterConfig Tests - - [Test] - public void SinkWriterConfig_WithBatchingConfig_StoresBatchingSettings() - { - var batchingConfig = new BatchingConfig - { - MaxBatchSize = 1000, - MaxTimeInBufferMs = 1000 - }; - - var writerConfig = new SinkWriterConfig - { - ClassName = "TestWriter", - BatchingConfig = batchingConfig - }; - - Assert.That(writerConfig.BatchingConfig, Is.Not.Null); - Assert.That(writerConfig.BatchingConfig, Is.EqualTo(batchingConfig)); - Assert.That(writerConfig.BatchingConfig!.MaxBatchSize, Is.EqualTo(1000)); - } - - [Test] - public void SinkWriterConfig_WithoutBatchingConfig_BatchingConfigIsNull() - { - var writerConfig = new SinkWriterConfig - { - ClassName = "TestWriter" - }; - - Assert.That(writerConfig.BatchingConfig, Is.Null); - } - - #endregion - #region Integration with JobMetadata Tests [Test] @@ -355,50 +214,6 @@ public void JobMetadata_WithoutStateBackendConfig_StateBackendConfigIsNull() #endregion - #region Complete Job Definition Tests - - [Test] - public void JobDefinition_WithBatchingAndStateBackend_StoresBothConfigs() - { - var job = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - BatchingConfig = new BatchingConfig - { - MaxBatchSize = 1000, - MaxBatchSizeInBytes = 5242880 - } - } - }, - Metadata = new JobMetadata - { - StateBackendConfig = new StateBackendConfig - { - Type = "rocksdb", - CheckpointDir = "s3://bucket/checkpoints", - IncrementalCheckpoints = true, - PredefinedProfile = "flash_ssd_optimized" - } - } - }; - - Assert.That(job.Metadata.StateBackendConfig, Is.Not.Null); - Assert.That(job.Metadata.StateBackendConfig!.Type, Is.EqualTo("rocksdb")); - - var sink = job.Sink as UnifiedSinkV2Definition; - Assert.That(sink, Is.Not.Null); - Assert.That(sink!.WriterConfig.BatchingConfig, Is.Not.Null); - Assert.That(sink.WriterConfig.BatchingConfig!.MaxBatchSize, Is.EqualTo(1000)); - } - - #endregion - #region ExecutionPlanConfig Tests [Test] @@ -690,76 +505,4 @@ public void JobMetadata_WithAllPerformanceConfigs_StoresAllSettings() } #endregion - - #region Complete Job Definition Tests (All 4 Features) - - [Test] - public void JobDefinition_WithAll4PerformanceFeatures_StoresAllConfigs() - { - var job = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - BatchingConfig = new BatchingConfig - { - MaxBatchSize = 1000, - MaxBatchSizeInBytes = 5242880, - MaxTimeInBufferMs = 1000 - } - } - }, - Metadata = new JobMetadata - { - StateBackendConfig = new StateBackendConfig - { - Type = "rocksdb", - CheckpointDir = "s3://production/checkpoints", - IncrementalCheckpoints = true, - PredefinedProfile = "flash_ssd_optimized" - }, - ExecutionPlanConfig = new ExecutionPlanConfig - { - Format = "smile", - EnableCompression = true - }, - OptimizerConfig = new OptimizerConfig - { - EnableMultiJoinOptimization = true, - JoinReorderingStrategy = "bushy", - EnableJoinPredicatePushdown = true, - EnableFilterPushdown = true - } - } - }; - - // Assert: All 4 performance features configured - Assert.That(job.Metadata.StateBackendConfig, Is.Not.Null); - Assert.That(job.Metadata.ExecutionPlanConfig, Is.Not.Null); - Assert.That(job.Metadata.OptimizerConfig, Is.Not.Null); - - var sink = job.Sink as UnifiedSinkV2Definition; - Assert.That(sink!.WriterConfig.BatchingConfig, Is.Not.Null); - - // Feature 1: Custom Async Sink Batching - Assert.That(sink.WriterConfig.BatchingConfig!.MaxBatchSize, Is.EqualTo(1000)); - - // Feature 2: Enhanced State Backend Configuration - Assert.That(job.Metadata.StateBackendConfig!.Type, Is.EqualTo("rocksdb")); - Assert.That(job.Metadata.StateBackendConfig.PredefinedProfile, Is.EqualTo("flash_ssd_optimized")); - - // Feature 3: Smile Format for Compiled Plans - Assert.That(job.Metadata.ExecutionPlanConfig!.Format, Is.EqualTo("smile")); - Assert.That(job.Metadata.ExecutionPlanConfig.EnableCompression, Is.True); - - // Feature 4: MultiJoin Optimization Configuration - Assert.That(job.Metadata.OptimizerConfig!.EnableMultiJoinOptimization, Is.True); - Assert.That(job.Metadata.OptimizerConfig.JoinReorderingStrategy, Is.EqualTo("bushy")); - } - - #endregion } diff --git a/FlinkDotNet/Flink.JobBuilder/Models/JobDefinition.cs b/FlinkDotNet/Flink.JobBuilder/Models/JobDefinition.cs index e2c73175..ded15b88 100644 --- a/FlinkDotNet/Flink.JobBuilder/Models/JobDefinition.cs +++ b/FlinkDotNet/Flink.JobBuilder/Models/JobDefinition.cs @@ -781,7 +781,6 @@ public class SideOutputOperationDefinition : IOperationDefinition [JsonDerivedType(typeof(DatabaseSinkDefinition), "database")] [JsonDerivedType(typeof(HttpSinkDefinition), "http")] [JsonDerivedType(typeof(RedisSinkDefinition), "redis")] - [JsonDerivedType(typeof(UnifiedSinkV2Definition), "unified_sink_v2")] public interface ISinkDefinition { public string Type @@ -955,152 +954,7 @@ public string? Key } /// - /// Unified Sink API v2 definition (Flink 1.20+) for modern sink pattern with exactly-once semantics - /// - public class UnifiedSinkV2Definition : ISinkDefinition - { - /// - /// Gets the type identifier - /// - [JsonIgnore] - public string Type => "unified_sink_v2"; - - /// - /// Type of sink (kafka, file, database, http, custom) - /// - public string SinkType { get; set; } = string.Empty; - - /// - /// Writer configuration - /// - public SinkWriterConfig WriterConfig { get; set; } = new(); - - /// - /// Committer configuration (optional, for exactly-once semantics) - /// - public SinkCommitterConfig? CommitterConfig - { - get; set; - } - - /// - /// Delivery semantics: exactly-once or at-least-once - /// - public string Semantics { get; set; } = "at-least-once"; - - /// - /// Whether sink writer is stateful (supports state snapshots) - /// - public bool Stateful - { - get; set; - } - - /// - /// Additional properties for sink configuration - /// - public Dictionary Properties { get; init; } = []; - } - - /// - /// Configuration for Unified Sink v2 writer - /// - public class SinkWriterConfig - { - /// - /// Writer class name (for custom sinks) - /// - public string ClassName { get; set; } = string.Empty; - - /// - /// Writer-specific properties - /// - public Dictionary Properties { get; init; } = []; - - /// - /// Optional batching configuration for async sink performance optimization - /// - public BatchingConfig? BatchingConfig - { - get; set; - } - } - - /// - /// Configuration for Unified Sink v2 committer (two-phase commit) - /// - public class SinkCommitterConfig - { - /// - /// Whether committer is enabled - /// - public bool Enabled - { - get; set; - } - - /// - /// Committer class name (for custom committers) - /// - public string? ClassName - { - get; set; - } - - /// - /// Committer-specific properties - /// - public Dictionary Properties { get; init; } = []; - } - - /// - /// Batching configuration for async sink performance optimization (Flink 2.1+) - /// - public class BatchingConfig - { - /// - /// Maximum number of records per batch - /// - public int? MaxBatchSize - { - get; set; - } - - /// - /// Maximum batch size in bytes - /// - public long? MaxBatchSizeInBytes - { - get; set; - } - - /// - /// Maximum time in milliseconds to buffer records before flushing - /// - public int? MaxTimeInBufferMs - { - get; set; - } - - /// - /// Maximum number of in-flight requests - /// - public int? MaxInFlightRequests - { - get; set; - } - - /// - /// Maximum number of buffered requests - /// - public int? MaxBufferedRequests - { - get; set; - } - } - - /// - /// State backend configuration for performance tuning (Flink 2.1+) + /// State backend configuration for performance tuning /// public class StateBackendConfig { diff --git a/FlinkDotNet/FlinkDotNet.DataStream.Tests/AdditionalCoverageFor95PercentTests.cs b/FlinkDotNet/FlinkDotNet.DataStream.Tests/AdditionalCoverageFor95PercentTests.cs index 3b23d415..120c37f2 100644 --- a/FlinkDotNet/FlinkDotNet.DataStream.Tests/AdditionalCoverageFor95PercentTests.cs +++ b/FlinkDotNet/FlinkDotNet.DataStream.Tests/AdditionalCoverageFor95PercentTests.cs @@ -73,43 +73,6 @@ public void ModelDescription_WithComplexSchema_InitializesCorrectly() #endregion - #region SinkWriterContext Tests - - [Test] - public void SinkWriterContext_SubtaskId_CanBeSet() - { - // Arrange & Act - var context = new SinkWriterContext - { - SubtaskId = 5 - }; - - // Assert - Assert.That(context.SubtaskId, Is.EqualTo(5)); - } - - [Test] - public void SinkWriterContext_SubtaskId_DefaultIsZero() - { - // Arrange & Act - var context = new SinkWriterContext(); - - // Assert - Assert.That(context.SubtaskId, Is.EqualTo(0)); - } - - [Test] - public void SinkWriterContext_WithValidSubtaskId_InitializesCorrectly() - { - // Arrange & Act - var context = new SinkWriterContext { SubtaskId = 127 }; - - // Assert - Assert.That(context.SubtaskId, Is.EqualTo(127)); - } - - #endregion - #region RocksDBOptions Additional Tests [Test] diff --git a/FlinkDotNet/FlinkDotNet.DataStream.Tests/AdditionalDataClassesCoverageImprovementTests.cs b/FlinkDotNet/FlinkDotNet.DataStream.Tests/AdditionalDataClassesCoverageImprovementTests.cs index 01976043..ccd3caab 100644 --- a/FlinkDotNet/FlinkDotNet.DataStream.Tests/AdditionalDataClassesCoverageImprovementTests.cs +++ b/FlinkDotNet/FlinkDotNet.DataStream.Tests/AdditionalDataClassesCoverageImprovementTests.cs @@ -68,16 +68,6 @@ public void RocksDBOptions_CanBeCreated() Assert.That(options, Is.Not.Null); } - [Test] - public void SinkWriterContext_CanBeCreated() - { - // Arrange & Act - var context = new SinkWriterContext(); - - // Assert - Assert.That(context, Is.Not.Null); - } - #endregion #region OutputTag Tests diff --git a/FlinkDotNet/FlinkDotNet.DataStream.Tests/CoverageImprovementForDataClassesTests.cs b/FlinkDotNet/FlinkDotNet.DataStream.Tests/CoverageImprovementForDataClassesTests.cs index 71d281b7..dab1fa9e 100644 --- a/FlinkDotNet/FlinkDotNet.DataStream.Tests/CoverageImprovementForDataClassesTests.cs +++ b/FlinkDotNet/FlinkDotNet.DataStream.Tests/CoverageImprovementForDataClassesTests.cs @@ -208,40 +208,6 @@ public void ModelDescription_AllProperties_AreCovered() #endregion - #region SinkWriterContext Property Coverage - - [Test] - public void SinkWriterContext_AllProperties_AreCovered() - { - // Create instance with init properties - var context = new SinkWriterContext - { - SubtaskId = 0, - NumberOfParallelSubtasks = 4, - AttemptNumber = 0, - Properties = new System.Collections.Generic.Dictionary - { - ["key"] = "value" - } - }; - - // Access all properties - Assert.That(context.SubtaskId, Is.EqualTo(0)); - Assert.That(context.NumberOfParallelSubtasks, Is.EqualTo(4)); - Assert.That(context.AttemptNumber, Is.EqualTo(0)); - Assert.That(context.Properties, Has.Count.EqualTo(1)); - - // Use reflection - var type = typeof(SinkWriterContext); - foreach (var prop in type.GetProperties(BindingFlags.Public | BindingFlags.Instance)) - { - var value = prop.GetValue(context); - Assert.That(value, Is.Not.Null, $"Property {prop.Name} should not be null"); - } - } - - #endregion - #region RocksDBOptions Property Coverage [Test] diff --git a/FlinkDotNet/FlinkDotNet.DataStream.Tests/FlinkApiContractTests.cs b/FlinkDotNet/FlinkDotNet.DataStream.Tests/FlinkApiContractTests.cs new file mode 100644 index 00000000..36b2b904 --- /dev/null +++ b/FlinkDotNet/FlinkDotNet.DataStream.Tests/FlinkApiContractTests.cs @@ -0,0 +1,858 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Threading.Tasks; +using Moq; +using NUnit.Framework; + +namespace FlinkDotNet.DataStream.Tests +{ + /// + /// Tests that validate the FlinkDotNet state and runtime API contracts + /// are correctly defined as pass-through interfaces for Flink Java translation. + /// These interfaces are NOT implemented in .NET — they define the API surface + /// that maps to org.apache.flink.api.common.state.* in Java Flink. + /// + [TestFixture] + public class FlinkApiContractTests + { + #region IValueState Contract Tests + + [Test] + public void IValueState_IsInterface() + { + Assert.That(typeof(IValueState<>).IsInterface, Is.True); + } + + [Test] + public void IValueState_DefinesValueAsync() + { + var method = typeof(IValueState).GetMethod("ValueAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task))); + Assert.That(method.GetParameters(), Is.Empty); + } + + [Test] + public void IValueState_DefinesUpdateAsync() + { + var method = typeof(IValueState).GetMethod("UpdateAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task))); + Assert.That(method.GetParameters(), Has.Length.EqualTo(1)); + Assert.That(method.GetParameters()[0].ParameterType, Is.EqualTo(typeof(string))); + } + + [Test] + public void IValueState_DefinesClearAsync() + { + var method = typeof(IValueState).GetMethod("ClearAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task))); + Assert.That(method.GetParameters(), Is.Empty); + } + + [Test] + public void IValueState_CanBeMocked_ForUserDefinedFunctions() + { + // Validates the interface can be used in user-defined process functions + // via dependency injection from the Flink Java runtime + var mockState = new Mock>(); + mockState.Setup(s => s.ValueAsync()).ReturnsAsync(42); + mockState.Setup(s => s.UpdateAsync(It.IsAny())).Returns(Task.CompletedTask); + mockState.Setup(s => s.ClearAsync()).Returns(Task.CompletedTask); + + Assert.That(mockState.Object, Is.Not.Null); + Assert.That(mockState.Object, Is.InstanceOf>()); + } + + [Test] + public async Task IValueState_MockedInProcessFunction_WorksCorrectly() + { + // Simulates how user code interacts with state provided by Flink Java runtime + var mockState = new Mock>(); + mockState.Setup(s => s.ValueAsync()).ReturnsAsync(100); + mockState.Setup(s => s.UpdateAsync(It.IsAny())).Returns(Task.CompletedTask); + + int value = await mockState.Object.ValueAsync(); + Assert.That(value, Is.EqualTo(100)); + + await mockState.Object.UpdateAsync(200); + mockState.Verify(s => s.UpdateAsync(200), Times.Once); + } + + #endregion + + #region IListState Contract Tests + + [Test] + public void IListState_IsInterface() + { + Assert.That(typeof(IListState<>).IsInterface, Is.True); + } + + [Test] + public void IListState_DefinesGetAsync() + { + var method = typeof(IListState).GetMethod("GetAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task>))); + } + + [Test] + public void IListState_DefinesAddAsync() + { + var method = typeof(IListState).GetMethod("AddAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task))); + Assert.That(method.GetParameters(), Has.Length.EqualTo(1)); + } + + [Test] + public void IListState_DefinesAddAllAsync() + { + var method = typeof(IListState).GetMethod("AddAllAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.GetParameters()[0].ParameterType, Is.EqualTo(typeof(IEnumerable))); + } + + [Test] + public void IListState_DefinesUpdateAsync() + { + var method = typeof(IListState).GetMethod("UpdateAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.GetParameters()[0].ParameterType, Is.EqualTo(typeof(IEnumerable))); + } + + [Test] + public void IListState_DefinesClearAsync() + { + var method = typeof(IListState).GetMethod("ClearAsync"); + Assert.That(method, Is.Not.Null); + } + + [Test] + public void IListState_CanBeMocked_ForUserDefinedFunctions() + { + var mockState = new Mock>(); + mockState.Setup(s => s.GetAsync()).ReturnsAsync(new[] { "a", "b" }); + mockState.Setup(s => s.AddAsync(It.IsAny())).Returns(Task.CompletedTask); + + Assert.That(mockState.Object, Is.InstanceOf>()); + } + + #endregion + + #region IMapState Contract Tests + + [Test] + public void IMapState_IsInterface() + { + Assert.That(typeof(IMapState<,>).IsInterface, Is.True); + } + + [Test] + public void IMapState_DefinesGetAsync() + { + var method = typeof(IMapState).GetMethod("GetAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task))); + } + + [Test] + public void IMapState_DefinesPutAsync() + { + var method = typeof(IMapState).GetMethod("PutAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.GetParameters(), Has.Length.EqualTo(2)); + } + + [Test] + public void IMapState_DefinesPutAllAsync() + { + var method = typeof(IMapState).GetMethod("PutAllAsync"); + Assert.That(method, Is.Not.Null); + } + + [Test] + public void IMapState_DefinesRemoveAsync() + { + var method = typeof(IMapState).GetMethod("RemoveAsync"); + Assert.That(method, Is.Not.Null); + } + + [Test] + public void IMapState_DefinesContainsAsync() + { + var method = typeof(IMapState).GetMethod("ContainsAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task))); + } + + [Test] + public void IMapState_DefinesEntriesAsync() + { + var method = typeof(IMapState).GetMethod("EntriesAsync"); + Assert.That(method, Is.Not.Null); + } + + [Test] + public void IMapState_DefinesKeysAsync() + { + var method = typeof(IMapState).GetMethod("KeysAsync"); + Assert.That(method, Is.Not.Null); + } + + [Test] + public void IMapState_DefinesValuesAsync() + { + var method = typeof(IMapState).GetMethod("ValuesAsync"); + Assert.That(method, Is.Not.Null); + } + + [Test] + public void IMapState_DefinesIsEmptyAsync() + { + var method = typeof(IMapState).GetMethod("IsEmptyAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task))); + } + + [Test] + public void IMapState_CanBeMocked_ForUserDefinedFunctions() + { + var mockState = new Mock>(); + mockState.Setup(s => s.GetAsync("key")).ReturnsAsync(42); + mockState.Setup(s => s.ContainsAsync("key")).ReturnsAsync(true); + + Assert.That(mockState.Object, Is.InstanceOf>()); + } + + #endregion + + #region IReducingState Contract Tests + + [Test] + public void IReducingState_IsInterface() + { + Assert.That(typeof(IReducingState<>).IsInterface, Is.True); + } + + [Test] + public void IReducingState_DefinesGetAsync() + { + var method = typeof(IReducingState).GetMethod("GetAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task))); + } + + [Test] + public void IReducingState_DefinesAddAsync() + { + var method = typeof(IReducingState).GetMethod("AddAsync"); + Assert.That(method, Is.Not.Null); + } + + [Test] + public void IReducingState_DefinesClearAsync() + { + var method = typeof(IReducingState).GetMethod("ClearAsync"); + Assert.That(method, Is.Not.Null); + } + + #endregion + + #region IAggregatingState Contract Tests + + [Test] + public void IAggregatingState_IsInterface() + { + Assert.That(typeof(IAggregatingState<,>).IsInterface, Is.True); + } + + [Test] + public void IAggregatingState_DefinesGetAsync() + { + var method = typeof(IAggregatingState).GetMethod("GetAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task))); + } + + [Test] + public void IAggregatingState_DefinesAddAsync() + { + var method = typeof(IAggregatingState).GetMethod("AddAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.GetParameters()[0].ParameterType, Is.EqualTo(typeof(string))); + } + + [Test] + public void IAggregatingState_DefinesClearAsync() + { + var method = typeof(IAggregatingState).GetMethod("ClearAsync"); + Assert.That(method, Is.Not.Null); + } + + #endregion + + #region State Descriptor IR Translation Tests + + [Test] + public void ValueStateDescriptor_CarriesTypeInfo_ForIRTranslation() + { + var descriptor = new ValueStateDescriptor("myValueState"); + + Assert.That(descriptor.Name, Is.EqualTo("myValueState")); + Assert.That(descriptor.ValueType, Is.EqualTo(typeof(string))); + Assert.That(descriptor, Is.InstanceOf()); + } + + [Test] + public void ListStateDescriptor_CarriesTypeInfo_ForIRTranslation() + { + var descriptor = new ListStateDescriptor("myListState"); + + Assert.That(descriptor.Name, Is.EqualTo("myListState")); + Assert.That(descriptor.ElementType, Is.EqualTo(typeof(int))); + Assert.That(descriptor, Is.InstanceOf()); + } + + [Test] + public void MapStateDescriptor_CarriesTypeInfo_ForIRTranslation() + { + var descriptor = new MapStateDescriptor("myMapState"); + + Assert.That(descriptor.Name, Is.EqualTo("myMapState")); + Assert.That(descriptor.KeyType, Is.EqualTo(typeof(string))); + Assert.That(descriptor.ValueType, Is.EqualTo(typeof(double))); + Assert.That(descriptor, Is.InstanceOf()); + } + + [Test] + public void ReducingStateDescriptor_CarriesReduceFunction_ForIRTranslation() + { + var mockReduceFunc = new Mock>(); + var descriptor = new ReducingStateDescriptor("myReducingState", mockReduceFunc.Object); + + Assert.That(descriptor.Name, Is.EqualTo("myReducingState")); + Assert.That(descriptor.ReduceFunction, Is.SameAs(mockReduceFunc.Object)); + Assert.That(descriptor, Is.InstanceOf()); + } + + [Test] + public void AggregatingStateDescriptor_CarriesAggregateFunction_ForIRTranslation() + { + var mockAggFunc = new Mock>(); + var descriptor = new AggregatingStateDescriptor("myAggState", mockAggFunc.Object); + + Assert.That(descriptor.Name, Is.EqualTo("myAggState")); + Assert.That(descriptor.AggregateFunction, Is.SameAs(mockAggFunc.Object)); + Assert.That(descriptor, Is.InstanceOf()); + } + + [Test] + public void StateDescriptor_NullName_ThrowsArgumentNullException() + { + Assert.Throws(() => new ValueStateDescriptor(null!)); + Assert.Throws(() => new ListStateDescriptor(null!)); + Assert.Throws(() => new MapStateDescriptor(null!)); + } + + [Test] + public void ReducingStateDescriptor_NullFunction_ThrowsArgumentNullException() + { + Assert.Throws( + () => new ReducingStateDescriptor("state", null!)); + } + + [Test] + public void AggregatingStateDescriptor_NullFunction_ThrowsArgumentNullException() + { + Assert.Throws( + () => new AggregatingStateDescriptor("state", null!)); + } + + #endregion + + #region Context Interface Contract Tests + + [Test] + public void IProcessContext_IsInterface() + { + Assert.That(typeof(IProcessContext).IsInterface, Is.True); + } + + [Test] + public void IProcessContext_DefinesTimestamp() + { + var prop = typeof(IProcessContext).GetProperty("Timestamp"); + Assert.That(prop, Is.Not.Null); + Assert.That(prop!.PropertyType, Is.EqualTo(typeof(long))); + } + + [Test] + public void IProcessContext_DefinesCurrentProcessingTime() + { + var prop = typeof(IProcessContext).GetProperty("CurrentProcessingTime"); + Assert.That(prop, Is.Not.Null); + Assert.That(prop!.PropertyType, Is.EqualTo(typeof(long))); + } + + [Test] + public void IProcessContext_DefinesCurrentWatermark() + { + var prop = typeof(IProcessContext).GetProperty("CurrentWatermark"); + Assert.That(prop, Is.Not.Null); + Assert.That(prop!.PropertyType, Is.EqualTo(typeof(long))); + } + + [Test] + public void IProcessContext_DefinesTimerRegistration() + { + var regEvent = typeof(IProcessContext).GetMethod("RegisterEventTimeTimer"); + var regProc = typeof(IProcessContext).GetMethod("RegisterProcessingTimeTimer"); + var delEvent = typeof(IProcessContext).GetMethod("DeleteEventTimeTimer"); + var delProc = typeof(IProcessContext).GetMethod("DeleteProcessingTimeTimer"); + + Assert.That(regEvent, Is.Not.Null); + Assert.That(regProc, Is.Not.Null); + Assert.That(delEvent, Is.Not.Null); + Assert.That(delProc, Is.Not.Null); + } + + [Test] + public void IKeyedProcessContext_ExtendsIProcessContext() + { + Assert.That(typeof(IKeyedProcessContext<>).GetInterfaces(), + Does.Contain(typeof(IProcessContext))); + } + + [Test] + public void IKeyedProcessContext_DefinesCurrentKey() + { + var prop = typeof(IKeyedProcessContext).GetProperty("CurrentKey"); + Assert.That(prop, Is.Not.Null); + Assert.That(prop!.PropertyType, Is.EqualTo(typeof(string))); + } + + [Test] + public void IOnTimerContext_ExtendsIProcessContext() + { + Assert.That(typeof(IOnTimerContext).GetInterfaces(), + Does.Contain(typeof(IProcessContext))); + } + + [Test] + public void IOnTimerContext_DefinesTimeDomain() + { + var prop = typeof(IOnTimerContext).GetProperty("TimeDomain"); + Assert.That(prop, Is.Not.Null); + Assert.That(prop!.PropertyType, Is.EqualTo(typeof(TimeDomain))); + } + + [Test] + public void IKeyedOnTimerContext_ExtendsIOnTimerContext() + { + Assert.That(typeof(IKeyedOnTimerContext<>).GetInterfaces(), + Does.Contain(typeof(IOnTimerContext))); + } + + [Test] + public void IKeyedOnTimerContext_DefinesCurrentKey() + { + var prop = typeof(IKeyedOnTimerContext).GetProperty("CurrentKey"); + Assert.That(prop, Is.Not.Null); + Assert.That(prop!.PropertyType, Is.EqualTo(typeof(string))); + } + + [Test] + public void IWindowContext_IsInterface() + { + Assert.That(typeof(IWindowContext).IsInterface, Is.True); + } + + [Test] + public void IWindowContext_DefinesWindowStartAndEnd() + { + var start = typeof(IWindowContext).GetProperty("WindowStart"); + var end = typeof(IWindowContext).GetProperty("WindowEnd"); + Assert.That(start, Is.Not.Null); + Assert.That(end, Is.Not.Null); + Assert.That(start!.PropertyType, Is.EqualTo(typeof(long))); + Assert.That(end!.PropertyType, Is.EqualTo(typeof(long))); + } + + [Test] + public void IWindowContext_DefinesProcessingTimeAndWatermark() + { + var procTime = typeof(IWindowContext).GetProperty("CurrentProcessingTime"); + var watermark = typeof(IWindowContext).GetProperty("CurrentWatermark"); + Assert.That(procTime, Is.Not.Null); + Assert.That(watermark, Is.Not.Null); + } + + #endregion + + #region ICollector Contract Tests + + [Test] + public void ICollector_IsInterface() + { + Assert.That(typeof(ICollector<>).IsInterface, Is.True); + } + + [Test] + public void ICollector_DefinesCollect() + { + var method = typeof(ICollector).GetMethod("Collect"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.GetParameters(), Has.Length.EqualTo(1)); + Assert.That(method.GetParameters()[0].ParameterType, Is.EqualTo(typeof(string))); + } + + [Test] + public void ICollector_CanBeMocked_ForUserDefinedFunctions() + { + var collected = new List(); + var mockCollector = new Mock>(); + mockCollector.Setup(c => c.Collect(It.IsAny())) + .Callback(s => collected.Add(s)); + + mockCollector.Object.Collect("hello"); + mockCollector.Object.Collect("world"); + + Assert.That(collected, Has.Count.EqualTo(2)); + Assert.That(collected, Is.EqualTo(new[] { "hello", "world" })); + } + + #endregion + + #region IResultFuture Contract Tests + + [Test] + public void IResultFuture_IsInterface() + { + Assert.That(typeof(IResultFuture<>).IsInterface, Is.True); + } + + [Test] + public void IResultFuture_DefinesComplete() + { + var method = typeof(IResultFuture).GetMethod("Complete"); + Assert.That(method, Is.Not.Null); + } + + [Test] + public void IResultFuture_DefinesCompleteExceptionally() + { + var method = typeof(IResultFuture).GetMethod("CompleteExceptionally"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.GetParameters()[0].ParameterType, Is.EqualTo(typeof(Exception))); + } + + #endregion + + #region Function Interface Contract Tests + + [Test] + public void IProcessFunction_DefinesProcessElementAsync() + { + var method = typeof(IProcessFunction).GetMethod("ProcessElementAsync"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(Task))); + + var parameters = method.GetParameters(); + Assert.That(parameters, Has.Length.EqualTo(3)); + Assert.That(parameters[0].ParameterType, Is.EqualTo(typeof(string))); + Assert.That(parameters[1].ParameterType, Is.EqualTo(typeof(IProcessContext))); + Assert.That(parameters[2].ParameterType, Is.EqualTo(typeof(ICollector))); + } + + [Test] + public void IProcessFunction_DefinesOnTimerAsync() + { + var method = typeof(IProcessFunction).GetMethod("OnTimerAsync"); + Assert.That(method, Is.Not.Null); + + var parameters = method!.GetParameters(); + Assert.That(parameters, Has.Length.EqualTo(3)); + Assert.That(parameters[0].ParameterType, Is.EqualTo(typeof(long))); + Assert.That(parameters[1].ParameterType, Is.EqualTo(typeof(IOnTimerContext))); + Assert.That(parameters[2].ParameterType, Is.EqualTo(typeof(ICollector))); + } + + [Test] + public void IKeyedProcessFunction_DefinesProcessElementAsync() + { + var method = typeof(IKeyedProcessFunction).GetMethod("ProcessElementAsync"); + Assert.That(method, Is.Not.Null); + + var parameters = method!.GetParameters(); + Assert.That(parameters, Has.Length.EqualTo(3)); + Assert.That(parameters[1].ParameterType, Is.EqualTo(typeof(IKeyedProcessContext))); + } + + [Test] + public void ICoProcessFunction_DefinesBothProcessElements() + { + var proc1 = typeof(ICoProcessFunction).GetMethod("ProcessElement1Async"); + var proc2 = typeof(ICoProcessFunction).GetMethod("ProcessElement2Async"); + Assert.That(proc1, Is.Not.Null); + Assert.That(proc2, Is.Not.Null); + } + + [Test] + public void IProcessWindowFunction_DefinesProcessAsync() + { + var method = typeof(IProcessWindowFunction).GetMethod("ProcessAsync"); + Assert.That(method, Is.Not.Null); + + var parameters = method!.GetParameters(); + Assert.That(parameters, Has.Length.EqualTo(4)); + Assert.That(parameters[2].ParameterType, Is.EqualTo(typeof(IWindowContext))); + Assert.That(parameters[3].ParameterType, Is.EqualTo(typeof(ICollector))); + } + + [Test] + public void IAsyncFunction_DefinesAsyncInvokeAsync() + { + var method = typeof(IAsyncFunction).GetMethod("AsyncInvokeAsync"); + Assert.That(method, Is.Not.Null); + + var parameters = method!.GetParameters(); + Assert.That(parameters, Has.Length.EqualTo(2)); + Assert.That(parameters[1].ParameterType, Is.EqualTo(typeof(IResultFuture))); + } + + [Test] + public void IJoinFunction_DefinesJoin() + { + var method = typeof(IJoinFunction).GetMethod("Join"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(bool))); + } + + [Test] + public void IFlatJoinFunction_DefinesJoin() + { + var method = typeof(IFlatJoinFunction).GetMethod("Join"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(IEnumerable))); + } + + [Test] + public void ICoGroupFunction_DefinesCoGroup() + { + var method = typeof(ICoGroupFunction).GetMethod("CoGroup"); + Assert.That(method, Is.Not.Null); + Assert.That(method!.ReturnType, Is.EqualTo(typeof(IEnumerable))); + } + + #endregion + + #region User-Defined Function Integration Pattern Tests + + [Test] + public async Task UserProcessFunction_CanBeImplemented_WithMockedRuntime() + { + // Demonstrates the pass-through pattern: user implements the function, + // Flink Java runtime provides the context and collector + + var userFunction = new SampleProcessFunction(); + var mockCtx = new Mock(); + mockCtx.Setup(c => c.Timestamp).Returns(1000L); + mockCtx.Setup(c => c.CurrentProcessingTime).Returns(2000L); + + var collected = new List(); + var mockCollector = new Mock>(); + mockCollector.Setup(c => c.Collect(It.IsAny())) + .Callback(s => collected.Add(s)); + + await userFunction.ProcessElementAsync(42, mockCtx.Object, mockCollector.Object); + + Assert.That(collected, Has.Count.EqualTo(1)); + Assert.That(collected[0], Does.Contain("42")); + } + + [Test] + public async Task UserKeyedProcessFunction_CanAccessKey_FromMockedRuntime() + { + var userFunction = new SampleKeyedProcessFunction(); + var mockCtx = new Mock>(); + mockCtx.Setup(c => c.CurrentKey).Returns("user-123"); + mockCtx.Setup(c => c.Timestamp).Returns(5000L); + + var collected = new List(); + var mockCollector = new Mock>(); + mockCollector.Setup(c => c.Collect(It.IsAny())) + .Callback(s => collected.Add(s)); + + await userFunction.ProcessElementAsync(42, mockCtx.Object, mockCollector.Object); + + Assert.That(collected, Has.Count.EqualTo(1)); + Assert.That(collected[0], Does.Contain("user-123")); + } + + [Test] + public async Task UserAsyncFunction_CanCompleteResultFuture_FromMockedRuntime() + { + var userFunction = new SampleAsyncFunction(); + var completedResults = new List>(); + var mockFuture = new Mock>(); + mockFuture.Setup(f => f.Complete(It.IsAny>())) + .Callback>(r => completedResults.Add(r)); + + await userFunction.AsyncInvokeAsync(42, mockFuture.Object); + + Assert.That(completedResults, Has.Count.EqualTo(1)); + Assert.That(completedResults[0].First(), Is.EqualTo("result-42")); + } + + [Test] + public async Task UserProcessFunction_CanRegisterTimers_OnMockedContext() + { + var userFunction = new TimerRegisteringProcessFunction(); + var registeredTimers = new List(); + var mockCtx = new Mock(); + mockCtx.Setup(c => c.RegisterEventTimeTimer(It.IsAny())) + .Callback(t => registeredTimers.Add(t)); + mockCtx.Setup(c => c.Timestamp).Returns(1000L); + + var mockCollector = new Mock>(); + + await userFunction.ProcessElementAsync("input", mockCtx.Object, mockCollector.Object); + + Assert.That(registeredTimers, Has.Count.EqualTo(1)); + Assert.That(registeredTimers[0], Is.EqualTo(6000L)); + } + + [Test] + public async Task UserProcessWindowFunction_CanProcessWindow_WithMockedContext() + { + var userFunction = new SampleWindowFunction(); + var mockCtx = new Mock(); + mockCtx.Setup(c => c.WindowStart).Returns(0L); + mockCtx.Setup(c => c.WindowEnd).Returns(60000L); + + var collected = new List(); + var mockCollector = new Mock>(); + mockCollector.Setup(c => c.Collect(It.IsAny())) + .Callback(v => collected.Add(v)); + + var elements = new[] { 10, 20, 30 }; + await userFunction.ProcessAsync("key", elements, mockCtx.Object, mockCollector.Object); + + Assert.That(collected, Has.Count.EqualTo(1)); + Assert.That(collected[0], Is.EqualTo(60)); + } + + #endregion + + #region No In-Memory State Implementations Exist Tests + + [Test] + public void NoInMemoryStateImplementations_ExistInAssembly() + { + // Verify that no concrete in-memory state implementations exist + // State is managed by the Flink Java runtime, not by .NET + var assembly = typeof(IValueState<>).Assembly; + var types = assembly.GetTypes(); + + var inMemoryTypes = types.Where(t => + t.Name.StartsWith("InMemory", StringComparison.Ordinal) && + t.Namespace?.Contains("State") == true).ToList(); + + Assert.That(inMemoryTypes, Is.Empty, + "No in-memory state implementations should exist. " + + "State is managed by the Flink Java runtime."); + } + + [Test] + public void NoRuntimeContextImplementations_ExistInAssembly() + { + // Verify that no concrete runtime context implementations exist in Runtime namespace + // Contexts are provided by the Flink Java runtime + var assembly = typeof(IProcessContext).Assembly; + var types = assembly.GetTypes(); + + var runtimeTypes = types.Where(t => + t.Namespace == "FlinkDotNet.DataStream.Runtime" && + t.IsClass && + !t.IsAbstract).ToList(); + + Assert.That(runtimeTypes, Is.Empty, + "No concrete runtime implementations should exist. " + + "Contexts, collectors, and result futures are provided by the Flink Java runtime."); + } + + #endregion + + #region Sample User-Defined Functions (for testing the pass-through pattern) + + private sealed class SampleProcessFunction : IProcessFunction + { + public Task ProcessElementAsync(int value, IProcessContext ctx, ICollector @out) + { + @out.Collect($"Processed: {value} at {ctx.Timestamp}"); + return Task.CompletedTask; + } + + public Task OnTimerAsync(long timestamp, IOnTimerContext ctx, ICollector @out) + { + @out.Collect($"Timer fired at {timestamp}"); + return Task.CompletedTask; + } + } + + private sealed class SampleKeyedProcessFunction : IKeyedProcessFunction + { + public Task ProcessElementAsync(int value, IKeyedProcessContext ctx, ICollector @out) + { + @out.Collect($"Key: {ctx.CurrentKey}, Value: {value}"); + return Task.CompletedTask; + } + + public Task OnTimerAsync(long timestamp, IKeyedOnTimerContext ctx, ICollector @out) + { + @out.Collect($"Timer for key {ctx.CurrentKey} at {timestamp}"); + return Task.CompletedTask; + } + } + + private sealed class SampleAsyncFunction : IAsyncFunction + { + public Task AsyncInvokeAsync(int input, IResultFuture resultFuture) + { + resultFuture.Complete(new[] { $"result-{input}" }); + return Task.CompletedTask; + } + } + + private sealed class TimerRegisteringProcessFunction : IProcessFunction + { + public Task ProcessElementAsync(string value, IProcessContext ctx, ICollector @out) + { + ctx.RegisterEventTimeTimer(ctx.Timestamp + 5000L); + return Task.CompletedTask; + } + + public Task OnTimerAsync(long timestamp, IOnTimerContext ctx, ICollector @out) + { + @out.Collect($"Timer at {timestamp}, domain: {ctx.TimeDomain}"); + return Task.CompletedTask; + } + } + + private sealed class SampleWindowFunction : IProcessWindowFunction + { + public Task ProcessAsync(string key, IEnumerable elements, IWindowContext ctx, ICollector @out) + { + @out.Collect(elements.Sum()); + return Task.CompletedTask; + } + } + + #endregion + } +} diff --git a/FlinkDotNet/FlinkDotNet.DataStream.Tests/UnifiedSinkV2ApiTests.cs b/FlinkDotNet/FlinkDotNet.DataStream.Tests/UnifiedSinkV2ApiTests.cs deleted file mode 100644 index 4736e2c8..00000000 --- a/FlinkDotNet/FlinkDotNet.DataStream.Tests/UnifiedSinkV2ApiTests.cs +++ /dev/null @@ -1,526 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#nullable enable - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using NUnit.Framework; - -namespace FlinkDotNet.DataStream.Tests -{ - /// - /// Tests for Unified Sink API v2 (Flink 1.20+) interfaces and builder. - /// These tests validate the C# API layer for the modern sink pattern. - /// - [TestFixture] - public class UnifiedSinkV2ApiTests - { - // Test implementations for the Unified Sink v2 API - - private class TestSinkWriter : ISinkWriter - { - public List WrittenElements { get; } = []; - public List Committables { get; } = []; - public int State - { - get; set; - } - public bool FlushCalled - { - get; set; - } - public bool DisposeCalled - { - get; set; - } - - public Task WriteAsync(string element, ElementContext context, CancellationToken cancellationToken = default) - { - this.WrittenElements.Add(element); - return Task.CompletedTask; - } - - public Task FlushAsync(bool endOfInput, CancellationToken cancellationToken = default) - { - this.FlushCalled = true; - return Task.CompletedTask; - } - - public Task> PrepareCommitAsync(CancellationToken cancellationToken = default) - { - return Task.FromResult(this.Committables); - } - - public Task SnapshotStateAsync(long checkpointId, CancellationToken cancellationToken = default) - { - return Task.FromResult(this.State); - } - - public ValueTask DisposeAsync() - { - this.DisposeCalled = true; - return ValueTask.CompletedTask; - } - } - - private class TestCommitter : ICommitter - { - public List CommittedItems { get; } = []; - public bool CloseCalled - { - get; set; - } - - public Task> CommitAsync(List committables, CancellationToken cancellationToken = default) - { - this.CommittedItems.AddRange(committables); - return Task.FromResult(new List()); // No failures - } - - public Task CloseAsync() - { - this.CloseCalled = true; - return Task.CompletedTask; - } - } - - private class TestGlobalCommitter : IGlobalCommitter - { - public List CombinedItems { get; } = []; - public List CommittedItems { get; } = []; - public bool CloseCalled - { - get; set; - } - - public Task> CombineAsync(List committables, CancellationToken cancellationToken = default) - { - this.CombinedItems.AddRange(committables); - return Task.FromResult(committables); - } - - public Task> CommitAsync(List globalCommittables, CancellationToken cancellationToken = default) - { - this.CommittedItems.AddRange(globalCommittables); - return Task.FromResult(new List()); // No failures - } - - public Task CloseAsync() - { - this.CloseCalled = true; - return Task.CompletedTask; - } - } - - [Test] - public async Task SinkWriter_WriteAsync_StoresElements() - { - // Arrange - TestSinkWriter writer = new(); - ElementContext context = new() - { - Timestamp = 1000, - Watermark = 900 - }; - - // Act - await writer.WriteAsync("element1", context); - await writer.WriteAsync("element2", context); - - // Assert - Assert.That(writer.WrittenElements, Has.Count.EqualTo(2)); - Assert.That(writer.WrittenElements[0], Is.EqualTo("element1")); - Assert.That(writer.WrittenElements[1], Is.EqualTo("element2")); - } - - [Test] - public async Task SinkWriter_FlushAsync_SetsFlagCorrectly() - { - // Arrange - TestSinkWriter writer = new(); - - // Act - await writer.FlushAsync(false); - - // Assert - Assert.That(writer.FlushCalled, Is.True); - } - - [Test] - public async Task SinkWriter_PrepareCommitAsync_ReturnsCommittables() - { - // Arrange - TestSinkWriter writer = new(); - writer.Committables.Add("commit1"); - writer.Committables.Add("commit2"); - - // Act - List result = await writer.PrepareCommitAsync(); - - // Assert - Assert.That(result, Has.Count.EqualTo(2)); - Assert.That(result, Contains.Item("commit1")); - Assert.That(result, Contains.Item("commit2")); - } - - [Test] - public async Task SinkWriter_SnapshotStateAsync_ReturnsState() - { - // Arrange - TestSinkWriter writer = new() - { - State = 42 - }; - - // Act - int state = await writer.SnapshotStateAsync(1000); - - // Assert - Assert.That(state, Is.EqualTo(42)); - } - - [Test] - public async Task SinkWriter_DisposeAsync_SetsFlagCorrectly() - { - // Arrange - TestSinkWriter writer = new(); - - // Act - await writer.DisposeAsync(); - - // Assert - Assert.That(writer.DisposeCalled, Is.True); - } - - [Test] - public async Task Committer_CommitAsync_StoresCommittedItems() - { - // Arrange - TestCommitter committer = new(); - List committables = ["item1", "item2", "item3"]; - - // Act - List failures = await committer.CommitAsync(committables); - - // Assert - Assert.That(committer.CommittedItems, Has.Count.EqualTo(3)); - Assert.That(failures, Is.Empty); - } - - [Test] - public async Task Committer_CloseAsync_SetsFlagCorrectly() - { - // Arrange - TestCommitter committer = new(); - - // Act - await committer.CloseAsync(); - - // Assert - Assert.That(committer.CloseCalled, Is.True); - } - - [Test] - public async Task GlobalCommitter_CombineAsync_StoresCommittables() - { - // Arrange - TestGlobalCommitter committer = new(); - List committables = ["item1", "item2"]; - - // Act - List result = await committer.CombineAsync(committables); - - // Assert - Assert.That(committer.CombinedItems, Has.Count.EqualTo(2)); - Assert.That(result, Has.Count.EqualTo(2)); - } - - [Test] - public async Task GlobalCommitter_CommitAsync_StoresCommittedItems() - { - // Arrange - TestGlobalCommitter committer = new(); - List globalCommittables = ["global1", "global2"]; - - // Act - List failures = await committer.CommitAsync(globalCommittables); - - // Assert - Assert.That(committer.CommittedItems, Has.Count.EqualTo(2)); - Assert.That(failures, Is.Empty); - } - - [Test] - public async Task GlobalCommitter_CloseAsync_SetsFlagCorrectly() - { - // Arrange - TestGlobalCommitter committer = new(); - - // Act - await committer.CloseAsync(); - - // Assert - Assert.That(committer.CloseCalled, Is.True); - } - - [Test] - public void SinkWriterContext_InitializesPropertiesCorrectly() - { - // Arrange & Act - SinkWriterContext context = new() - { - SubtaskId = 2, - NumberOfParallelSubtasks = 8, - AttemptNumber = 1, - Properties = new Dictionary { { "key", "value" } } - }; - - // Assert - Assert.That(context.SubtaskId, Is.EqualTo(2)); - Assert.That(context.NumberOfParallelSubtasks, Is.EqualTo(8)); - Assert.That(context.AttemptNumber, Is.EqualTo(1)); - Assert.That(context.Properties["key"], Is.EqualTo("value")); - } - - [Test] - public void ElementContext_InitializesPropertiesCorrectly() - { - // Arrange & Act - ElementContext context = new() - { - Timestamp = 1234567890, - Watermark = 1234567800, - IsLastElement = true - }; - - // Assert - Assert.That(context.Timestamp, Is.EqualTo(1234567890)); - Assert.That(context.Watermark, Is.EqualTo(1234567800)); - Assert.That(context.IsLastElement, Is.True); - } - - [Test] - public async Task SinkBuilder_WithWriter_BuildsSink() - { - // Arrange - SinkBuilder builder = new(); - bool writerCreated = false; - - // Act - ISink sink = builder - .WithWriter((context, state, ct) => - { - writerCreated = true; - return Task.FromResult>(new TestSinkWriter()); - }) - .Build(); - - // Create writer to verify factory is called - await sink.CreateWriterAsync(new SinkWriterContext()); - - // Assert - Assert.That(sink, Is.Not.Null); - Assert.That(writerCreated, Is.True); - } - - [Test] - public void SinkBuilder_WithoutWriter_ThrowsException() - { - // Arrange - SinkBuilder builder = new(); - - // Act & Assert - InvalidOperationException? ex = Assert.Throws(() => builder.Build()); - Assert.That(ex?.Message, Does.Contain("Writer factory must be set")); - } - - [Test] - public void SinkBuilder_WithCommitter_CreatesCommitter() - { - // Arrange - bool committerCreated = false; - - ISink sink = new SinkBuilder() - .WithWriter((context, state, ct) => Task.FromResult>(new TestSinkWriter())) - .WithCommitter(() => - { - committerCreated = true; - return new TestCommitter(); - }) - .Build(); - - // Act - ICommitter? committer = sink.CreateCommitter(); - - // Assert - Assert.That(committer, Is.Not.Null); - Assert.That(committerCreated, Is.True); - } - - [Test] - public void SinkBuilder_WithGlobalCommitter_CreatesGlobalCommitter() - { - // Arrange - bool globalCommitterCreated = false; - - ISink sink = new SinkBuilder() - .WithWriter((context, state, ct) => Task.FromResult>(new TestSinkWriter())) - .WithGlobalCommitter(() => - { - globalCommitterCreated = true; - return new TestGlobalCommitter(); - }) - .Build(); - - // Act - IGlobalCommitter? globalCommitter = sink.CreateGlobalCommitter(); - - // Assert - Assert.That(globalCommitter, Is.Not.Null); - Assert.That(globalCommitterCreated, Is.True); - } - - [Test] - public async Task SinkBuilder_FullWorkflow_CreatesAllComponents() - { - // Arrange - TestSinkWriter writer = new(); - TestCommitter committer = new(); - TestGlobalCommitter globalCommitter = new(); - - ISink sink = new SinkBuilder() - .WithWriter((context, state, ct) => Task.FromResult>(writer)) - .WithCommitter(() => committer) - .WithGlobalCommitter(() => globalCommitter) - .Build(); - - // Act - ISinkWriter createdWriter = await sink.CreateWriterAsync(new SinkWriterContext()); - ICommitter? createdCommitter = sink.CreateCommitter(); - IGlobalCommitter? createdGlobalCommitter = sink.CreateGlobalCommitter(); - - // Assert - Assert.That(createdWriter, Is.SameAs(writer)); - Assert.That(createdCommitter, Is.SameAs(committer)); - Assert.That(createdGlobalCommitter, Is.SameAs(globalCommitter)); - } - - [Test] - public void DataStream_AddSink_WithUnifiedSinkV2_ReturnsDataStream() - { - // Arrange - StreamExecutionEnvironment env = StreamExecutionEnvironment.GetExecutionEnvironment(); - DataStream stream = env.FromCollection(new[] { "test" }); - - ISink sink = new SinkBuilder() - .WithWriter((context, state, ct) => Task.FromResult>(new TestSinkWriter())) - .Build(); - - // Act - DataStream result = stream.AddSink(sink); - - // Assert - Assert.That(result, Is.Not.Null); - Assert.That(result, Is.SameAs(stream)); // Fluent API returns same stream - } - - [Test] - public void DataStream_AddSink_WithNullSink_ThrowsArgumentNullException() - { - // Arrange - StreamExecutionEnvironment env = StreamExecutionEnvironment.GetExecutionEnvironment(); - DataStream stream = env.FromCollection(new[] { "test" }); - - // Act & Assert - Assert.Throws(() => stream.AddSink(null!)); - } - - [Test] - public async Task BuiltSink_CreateWriterAsync_WithRestoredState_PassesStateToFactory() - { - // Arrange - int receivedState = -1; - ISink sink = new SinkBuilder() - .WithWriter((context, state, ct) => - { - receivedState = state; - return Task.FromResult>(new TestSinkWriter { State = state }); - }) - .Build(); - - // Act - await sink.CreateWriterAsync(new SinkWriterContext(), restoredState: 99); - - // Assert - Assert.That(receivedState, Is.EqualTo(99)); - } - - [Test] - public async Task BuiltSink_CreateWriterAsync_PassesCancellationToken() - { - // Arrange - CancellationTokenSource cts = new(); - CancellationToken receivedToken = default; - - ISink sink = new SinkBuilder() - .WithWriter((context, state, ct) => - { - receivedToken = ct; - return Task.FromResult>(new TestSinkWriter()); - }) - .Build(); - - // Act - await sink.CreateWriterAsync(new SinkWriterContext(), cancellationToken: cts.Token); - - // Assert - Assert.That(receivedToken, Is.EqualTo(cts.Token)); - } - - [Test] - public void SinkBuilder_WithWriter_ThrowsOnNullFactory() - { - // Arrange - SinkBuilder builder = new(); - - // Act & Assert - Assert.Throws(() => builder.WithWriter(null!)); - } - - [Test] - public void SinkBuilder_WithCommitter_ThrowsOnNullFactory() - { - // Arrange - SinkBuilder builder = new(); - - // Act & Assert - Assert.Throws(() => builder.WithCommitter(null!)); - } - - [Test] - public void SinkBuilder_WithGlobalCommitter_ThrowsOnNullFactory() - { - // Arrange - SinkBuilder builder = new(); - - // Act & Assert - Assert.Throws(() => builder.WithGlobalCommitter(null!)); - } - } -} diff --git a/FlinkDotNet/FlinkDotNet.DataStream/DataStream.cs b/FlinkDotNet/FlinkDotNet.DataStream/DataStream.cs index 058c51fd..3a4be90b 100644 --- a/FlinkDotNet/FlinkDotNet.DataStream/DataStream.cs +++ b/FlinkDotNet/FlinkDotNet.DataStream/DataStream.cs @@ -355,25 +355,6 @@ public DataStream AddSink(ISinkFunction sinkFunction) return this; } - /// - /// Adds a Unified Sink v2 (Flink 1.20+) to this DataStream. - /// This is the recommended API for custom sinks with exactly-once semantics support. - /// - /// Type of committable objects - /// Type of writer state for checkpointing - /// The unified sink v2 instance - /// This DataStream - public DataStream AddSink(ISink sink) - { - ArgumentNullException.ThrowIfNull(sink); - - // Note: Full IR integration with OperationCapture will be implemented in Java IR Runner phase - // For now, the unified sink v2 is registered for future execution - - // Sink registered for execution - return this; - } - /// /// Sets the parallelism for this operation. /// diff --git a/FlinkDotNet/FlinkDotNet.DataStream/UnifiedSinkV2.cs b/FlinkDotNet/FlinkDotNet.DataStream/UnifiedSinkV2.cs deleted file mode 100644 index 2087ce79..00000000 --- a/FlinkDotNet/FlinkDotNet.DataStream/UnifiedSinkV2.cs +++ /dev/null @@ -1,322 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace FlinkDotNet.DataStream -{ - /// - /// Unified Sink API v2 (Flink 1.20+) - Modern sink interface with exactly-once semantics support. - /// This is the recommended API for implementing custom sinks, replacing the legacy ISinkFunction. - /// - /// Type of elements to write - /// Type of committable objects for two-phase commit (use object if not needed) - /// Type of writer state for checkpointing (use object if stateless) -#pragma warning disable S2436 // Types and methods should not have too many generic parameters - Matching Flink's native API design - public interface ISink -#pragma warning restore S2436 - { - /// - /// Creates a new sink writer instance. - /// - /// Context providing runtime information - /// Restored state from previous checkpoint (default value if no state to restore) - /// Cancellation token - /// A new sink writer instance - public Task> CreateWriterAsync( - SinkWriterContext context, - TWriterState restoredState = default!, - CancellationToken cancellationToken = default); - - /// - /// Creates a committer for exactly-once semantics (optional). - /// Return null if the sink uses at-least-once semantics. - /// - /// Committer instance or null - public ICommitter? CreateCommitter(); - - /// - /// Creates a global committer for exactly-once semantics (optional). - /// Used for sinks requiring global coordination (e.g., file rename operations). - /// Return null if not needed. - /// - /// Global committer instance or null - public IGlobalCommitter? CreateGlobalCommitter(); - } - - /// - /// Writer for processing elements in the Unified Sink v2 API. - /// Writers are created per parallel instance and handle actual data writing. - /// - /// Type of elements to write - /// Type of committable objects - /// Type of writer state for checkpointing -#pragma warning disable S2436 // Types and methods should not have too many generic parameters - Matching Flink's native API design - public interface ISinkWriter : IAsyncDisposable -#pragma warning restore S2436 - { - /// - /// Writes a single element to the sink. - /// - /// Element to write - /// Context with timestamp and watermark information - /// Cancellation token - public Task WriteAsync(TInput element, ElementContext context, CancellationToken cancellationToken = default); - - /// - /// Flushes buffered data. Called before checkpoints and when the writer is closed. - /// - /// True if this is the final flush before the stream ends - /// Cancellation token - public Task FlushAsync(bool endOfInput, CancellationToken cancellationToken = default); - - /// - /// Prepares commit for exactly-once semantics. Returns committables that will be committed - /// only after the checkpoint completes successfully. - /// - /// Cancellation token - /// List of committables or empty list if using at-least-once semantics - public Task> PrepareCommitAsync(CancellationToken cancellationToken = default); - - /// - /// Snapshots the current state for checkpointing. Called during checkpoint creation. - /// Return default value if stateless. - /// - /// Checkpoint identifier - /// Cancellation token - /// Current writer state or default if stateless - public Task SnapshotStateAsync(long checkpointId, CancellationToken cancellationToken = default); - } - - /// - /// Committer for exactly-once semantics in Unified Sink v2. - /// Commits the committables only after checkpoint completion. - /// - /// Type of committable objects - public interface ICommitter - { - /// - /// Commits the given committables after successful checkpoint. - /// - /// List of committables to commit - /// Cancellation token - /// List of committables that failed to commit (for retry) - public Task> CommitAsync(List committables, CancellationToken cancellationToken = default); - - /// - /// Closes the committer and releases resources. - /// - public Task CloseAsync(); - } - - /// - /// Global committer for sinks requiring global coordination (e.g., file sinks with rename operations). - /// Receives all committables from all parallel instances and performs global commit operations. - /// - /// Type of committable objects from writers - /// Type of global committable objects - public interface IGlobalCommitter - { - /// - /// Combines committables from multiple writers into global committables. - /// - /// Committables from all parallel writer instances - /// Cancellation token - /// Combined global committables - public Task> CombineAsync(List committables, CancellationToken cancellationToken = default); - - /// - /// Performs the global commit operation. - /// - /// Global committables to commit - /// Cancellation token - /// List of global committables that failed (for retry) - public Task> CommitAsync(List globalCommittables, CancellationToken cancellationToken = default); - - /// - /// Closes the global committer and releases resources. - /// - public Task CloseAsync(); - } - - /// - /// Context provided when creating a sink writer, containing runtime information. - /// - public class SinkWriterContext - { - /// - /// Gets the subtask index (0-based) of this writer instance. - /// - public int SubtaskId - { - get; init; - } - - /// - /// Gets the total number of parallel writer instances. - /// - public int NumberOfParallelSubtasks - { - get; init; - } - - /// - /// Gets the attempt number for this writer (0 for first attempt, incremented on failures). - /// - public int AttemptNumber - { - get; init; - } - - /// - /// Gets custom properties from the job configuration. - /// - public IReadOnlyDictionary Properties { get; init; } = new Dictionary(); - } - - /// - /// Context for each element being written, providing timestamp and watermark information. - /// - public class ElementContext - { - /// - /// Gets the timestamp of the element (milliseconds since epoch). - /// - public long Timestamp - { - get; init; - } - - /// - /// Gets the current watermark (milliseconds since epoch). - /// - public long Watermark - { - get; init; - } - - /// - /// Gets whether this is the last element in the stream. - /// - public bool IsLastElement - { - get; init; - } - } - - /// - /// Builder for creating Unified Sink v2 instances with a fluent API. - /// - /// Type of elements to write - /// Type of committable objects - /// Type of writer state -#pragma warning disable S2436 // Types and methods should not have too many generic parameters - Matching Flink's native API design - public class SinkBuilder -#pragma warning restore S2436 - { - private Func>>? _writerFactory; - private Func?>? _committerFactory; - private Func?>? _globalCommitterFactory; - - /// - /// Sets the writer factory function. - /// - /// Factory function to create sink writers - /// This builder for chaining - public SinkBuilder WithWriter( - Func>> factory) - { - this._writerFactory = factory ?? throw new ArgumentNullException(nameof(factory)); - return this; - } - - /// - /// Sets the committer factory function for exactly-once semantics. - /// - /// Factory function to create committers - /// This builder for chaining - public SinkBuilder WithCommitter( - Func?> factory) - { - this._committerFactory = factory ?? throw new ArgumentNullException(nameof(factory)); - return this; - } - - /// - /// Sets the global committer factory function for global coordination. - /// - /// Factory function to create global committers - /// This builder for chaining - public SinkBuilder WithGlobalCommitter( - Func?> factory) - { - this._globalCommitterFactory = factory ?? throw new ArgumentNullException(nameof(factory)); - return this; - } - - /// - /// Builds the sink instance. - /// - /// Configured sink instance - /// If writer factory is not set - public ISink Build() => - this._writerFactory == null - ? throw new InvalidOperationException("Writer factory must be set before building sink") - : new BuiltSink( - this._writerFactory, - this._committerFactory, - this._globalCommitterFactory); - } - - /// - /// Internal implementation of ISink created by SinkBuilder. - /// -#pragma warning disable S2436 // Types and methods should not have too many generic parameters - Matching Flink's native API design -#pragma warning disable IDE0290 // Use primary constructor - Keeping explicit constructor for clarity - internal class BuiltSink : ISink -#pragma warning restore S2436 - { - private readonly Func>> _writerFactory; - private readonly Func?>? _committerFactory; - private readonly Func?>? _globalCommitterFactory; - - public BuiltSink( - Func>> writerFactory, - Func?>? committerFactory, - Func?>? globalCommitterFactory) - { - this._writerFactory = writerFactory ?? throw new ArgumentNullException(nameof(writerFactory)); - this._committerFactory = committerFactory; - this._globalCommitterFactory = globalCommitterFactory; - } - - public Task> CreateWriterAsync( - SinkWriterContext context, - TWriterState restoredState = default!, - CancellationToken cancellationToken = default) => - this._writerFactory(context, restoredState, cancellationToken); - - public ICommitter? CreateCommitter() => - this._committerFactory?.Invoke(); - - public IGlobalCommitter? CreateGlobalCommitter() => - this._globalCommitterFactory?.Invoke(); - } -#pragma warning restore IDE0290 -} diff --git a/LocalTesting/LocalTesting.IntegrationTests/PerformanceFormatTests.cs b/LocalTesting/LocalTesting.IntegrationTests/PerformanceFormatTests.cs index 9ec3710b..f2787131 100644 --- a/LocalTesting/LocalTesting.IntegrationTests/PerformanceFormatTests.cs +++ b/LocalTesting/LocalTesting.IntegrationTests/PerformanceFormatTests.cs @@ -141,633 +141,4 @@ public void Test1_StateBackendConfig_ValidatesIRSchemaAndSerialization() } #endregion - - #region Test 2: Async Sink Batching Configuration - - /// - /// Test 2: Validates BatchingConfig in async sinks including: - /// - Size-based batching (MaxBatchSize, MaxBatchSizeInBytes) - /// - Time-based batching (MaxTimeInBufferMs) - /// - In-flight and buffered request limits - /// - Integration with UnifiedSinkV2Definition - /// - JSON serialization - /// - [Test] - public void Test2_AsyncSinkBatching_ValidatesConfiguration() - { - // Part A: Size-based batching - JobDefinition sizeBatchingJob = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - Properties = new Dictionary - { - { "topic", "output" }, - { "bootstrapServers", "kafka:9092" } - }, - BatchingConfig = new BatchingConfig - { - MaxBatchSize = 1000, - MaxBatchSizeInBytes = 5 * 1024 * 1024, // 5MB - MaxInFlightRequests = 50, - MaxBufferedRequests = 10000 - } - }, - Semantics = "exactly-once" - }, - Metadata = new JobMetadata { Version = "1.0" } - }; - - // Part B: Time-based batching - JobDefinition timeBatchingJob = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - Properties = new Dictionary - { - { "topic", "output" }, - { "bootstrapServers", "kafka:9092" } - }, - BatchingConfig = new BatchingConfig - { - MaxBatchSize = 100, - MaxTimeInBufferMs = 1000, // 1 second - MaxInFlightRequests = 10 - } - } - }, - Metadata = new JobMetadata { Version = "1.0" } - }; - - // Part C: No batching config (defaults) - JobDefinition noBatchingJob = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - Properties = new Dictionary - { - { "topic", "output" }, - { "bootstrapServers", "kafka:9092" } - } - // No BatchingConfig - should work fine - } - }, - Metadata = new JobMetadata { Version = "1.0" } - }; - - // Act: Serialize and deserialize - JobDefinition[] jobs = [sizeBatchingJob, timeBatchingJob, noBatchingJob]; - List deserializedJobs = new List(); - - foreach (JobDefinition? job in jobs) - { - string json = JsonSerializer.Serialize(job, new JsonSerializerOptions { WriteIndented = true }); - JobDefinition? deserialized = JsonSerializer.Deserialize(json); - Assert.That(deserialized, Is.Not.Null); - deserializedJobs.Add(deserialized!); - } - - // Assert: Size-based batching - UnifiedSinkV2Definition? sizeSink = deserializedJobs[0].Sink as UnifiedSinkV2Definition; - Assert.That(sizeSink, Is.Not.Null); - Assert.That(sizeSink!.WriterConfig.BatchingConfig, Is.Not.Null); - Assert.That(sizeSink.WriterConfig.BatchingConfig!.MaxBatchSize, Is.EqualTo(1000)); - Assert.That(sizeSink.WriterConfig.BatchingConfig.MaxBatchSizeInBytes, Is.EqualTo(5 * 1024 * 1024)); - Assert.That(sizeSink.WriterConfig.BatchingConfig.MaxInFlightRequests, Is.EqualTo(50)); - Assert.That(sizeSink.WriterConfig.BatchingConfig.MaxBufferedRequests, Is.EqualTo(10000)); - - // Assert: Time-based batching - UnifiedSinkV2Definition? timeSink = deserializedJobs[1].Sink as UnifiedSinkV2Definition; - Assert.That(timeSink, Is.Not.Null); - Assert.That(timeSink!.WriterConfig.BatchingConfig, Is.Not.Null); - Assert.That(timeSink.WriterConfig.BatchingConfig!.MaxBatchSize, Is.EqualTo(100)); - Assert.That(timeSink.WriterConfig.BatchingConfig.MaxTimeInBufferMs, Is.EqualTo(1000)); - - // Assert: No batching config (backward compatibility) - UnifiedSinkV2Definition? noSink = deserializedJobs[2].Sink as UnifiedSinkV2Definition; - Assert.That(noSink, Is.Not.Null); - Assert.That(noSink!.WriterConfig.BatchingConfig, Is.Null, "Batching config should be optional"); - } - - #endregion - - #region Test 3: All 4 Performance & Format Features - - /// - /// Test 3: Validates ALL 4 Performance & Format features including: - /// - Feature 1: Custom Async Sink Batching - /// - Feature 2: Enhanced State Backend Configuration - /// - Feature 3: Smile Format for Compiled Plans - /// - Feature 4: MultiJoin Optimization Configuration - /// - Complete job definition with all performance features - /// - Backward compatibility (optional configs) - /// - [Test] - public void Test3_CombinedOptimizations_ValidatesAll4PerformanceFeatures() - { - // Part A: Complete job with all performance features - JobDefinition optimizedJob = new JobDefinition - { - Source = new KafkaSourceDefinition - { - Topic = "events", - BootstrapServers = "kafka:9092", - GroupId = "processor", - StartingOffsets = "latest" - }, - Operations = new List - { - new FilterOperationDefinition { Expression = "x.Length > 0" }, - new MapOperationDefinition { Expression = "x.ToUpper()" } - }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - Properties = new Dictionary - { - { "topic", "processed-events" }, - { "bootstrapServers", "kafka:9092" }, - { "compressionType", "gzip" } - }, - BatchingConfig = new BatchingConfig - { - MaxBatchSize = 1000, - MaxBatchSizeInBytes = 5 * 1024 * 1024, - MaxTimeInBufferMs = 1000, - MaxInFlightRequests = 50, - MaxBufferedRequests = 10000 - } - }, - CommitterConfig = new SinkCommitterConfig - { - Enabled = true, - ClassName = "KafkaCommitter" - }, - Semantics = "exactly-once", - Stateful = true - }, - Metadata = new JobMetadata - { - JobName = "High-Performance Event Processor", - Version = "1.0", - Parallelism = 8, - StateBackendConfig = new StateBackendConfig - { - Type = "rocksdb", - CheckpointDir = "s3://production/checkpoints", - IncrementalCheckpoints = true, - PredefinedProfile = "flash_ssd_optimized", - DbOptions = new Dictionary - { - { "maxBackgroundJobs", 8 }, - { "maxOpenFiles", -1 }, - { "compactionStyle", "level" } - }, - ColumnFamilyOptions = new Dictionary - { - { "blockCacheSize", 512 * 1024 * 1024L }, // 512MB - { "writeBufferSize", 128 * 1024 * 1024L } // 128MB - } - }, - // Feature 3: Smile Format for Compiled Plans - ExecutionPlanConfig = new ExecutionPlanConfig - { - Format = "smile", - EnableCompression = true, - Properties = new Dictionary - { - { "bufferSize", 8192 } - } - }, - // Feature 4: MultiJoin Optimization Configuration - OptimizerConfig = new OptimizerConfig - { - EnableMultiJoinOptimization = true, - JoinReorderingStrategy = "bushy", - EnableJoinPredicatePushdown = true, - EnableFilterPushdown = true, - Properties = new Dictionary - { - { "maxJoinDepth", 5 } - } - } - } - }; - - // Part B: Job without performance configs (backward compatibility) - JobDefinition standardJob = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new KafkaSinkDefinition { Topic = "output", BootstrapServers = "kafka:9092" }, - Metadata = new JobMetadata { Version = "1.0" } - }; - - // Act: Serialize and validate - string optimizedJson = JsonSerializer.Serialize(optimizedJob, new JsonSerializerOptions { WriteIndented = true }); - string standardJson = JsonSerializer.Serialize(standardJob, new JsonSerializerOptions { WriteIndented = true }); - - JobDefinition? optimizedDeserialized = JsonSerializer.Deserialize(optimizedJson); - JobDefinition? standardDeserialized = JsonSerializer.Deserialize(standardJson); - - // Assert: Optimized job has ALL 4 features - Assert.That(optimizedDeserialized, Is.Not.Null); - Assert.That(optimizedDeserialized!.Metadata.StateBackendConfig, Is.Not.Null); - Assert.That(optimizedDeserialized.Metadata.ExecutionPlanConfig, Is.Not.Null); - Assert.That(optimizedDeserialized.Metadata.OptimizerConfig, Is.Not.Null); - - // Feature 2: State Backend Configuration - StateBackendConfig? stateConfig = optimizedDeserialized.Metadata.StateBackendConfig; - Assert.That(stateConfig!.Type, Is.EqualTo("rocksdb")); - Assert.That(stateConfig.IncrementalCheckpoints, Is.True); - Assert.That(stateConfig.PredefinedProfile, Is.EqualTo("flash_ssd_optimized")); - - // Feature 1: Async Sink Batching - UnifiedSinkV2Definition? sink = optimizedDeserialized.Sink as UnifiedSinkV2Definition; - Assert.That(sink, Is.Not.Null); - Assert.That(sink!.WriterConfig.BatchingConfig, Is.Not.Null); - Assert.That(sink.WriterConfig.BatchingConfig!.MaxBatchSize, Is.EqualTo(1000)); - Assert.That(sink.Semantics, Is.EqualTo("exactly-once")); - - // Feature 3: Execution Plan Config (Smile Format) - ExecutionPlanConfig? planConfig = optimizedDeserialized.Metadata.ExecutionPlanConfig; - Assert.That(planConfig!.Format, Is.EqualTo("smile")); - Assert.That(planConfig.EnableCompression, Is.True); - Assert.That(planConfig.Properties, Is.Not.Null); - - // Feature 4: Optimizer Config (MultiJoin Optimization) - OptimizerConfig? optimizerConfig = optimizedDeserialized.Metadata.OptimizerConfig; - Assert.That(optimizerConfig!.EnableMultiJoinOptimization, Is.True); - Assert.That(optimizerConfig.JoinReorderingStrategy, Is.EqualTo("bushy")); - Assert.That(optimizerConfig.EnableJoinPredicatePushdown, Is.True); - Assert.That(optimizerConfig.EnableFilterPushdown, Is.True); - - // Assert: Standard job works without performance configs - Assert.That(standardDeserialized, Is.Not.Null); - Assert.That(standardDeserialized!.Metadata.StateBackendConfig, Is.Null, "Config should be optional"); - Assert.That(standardDeserialized.Metadata.ExecutionPlanConfig, Is.Null, "Config should be optional"); - Assert.That(standardDeserialized.Metadata.OptimizerConfig, Is.Null, "Config should be optional"); - - // Assert: JSON contains ALL 4 performance feature keywords - Assert.That(optimizedJson, Does.Contain("StateBackendConfig")); - Assert.That(optimizedJson, Does.Contain("BatchingConfig")); - Assert.That(optimizedJson, Does.Contain("ExecutionPlanConfig")); - Assert.That(optimizedJson, Does.Contain("OptimizerConfig")); - Assert.That(optimizedJson, Does.Contain("flash_ssd_optimized")); - Assert.That(optimizedJson, Does.Contain("smile")); - Assert.That(optimizedJson, Does.Contain("bushy")); - Assert.That(optimizedJson, Does.Contain("MaxBatchSize")); - } - - #endregion - - #region Test 4: Edge Cases and Validation - - /// - /// Test 4: Validates edge cases and error handling including: - /// - Null/missing configurations (optional features) - /// - Empty dictionaries for DbOptions and ColumnFamilyOptions - /// - Different state backend types (rocksdb, hashmap, filesystem) - /// - Mixing configurations (state backend without batching, etc.) - /// - [Test] - public void Test4_EdgeCases_ValidatesOptionalConfigsAndDefaults() - { - // Part A: State backend types - JobDefinition rocksDbJob = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new KafkaSinkDefinition { Topic = "output" }, - Metadata = new JobMetadata - { - StateBackendConfig = new StateBackendConfig - { - Type = "rocksdb", - CheckpointDir = "s3://bucket/checkpoints" - } - } - }; - - JobDefinition hashmapJob = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new KafkaSinkDefinition { Topic = "output" }, - Metadata = new JobMetadata - { - StateBackendConfig = new StateBackendConfig - { - Type = "hashmap" - // No checkpoint dir needed for hashmap - } - } - }; - - JobDefinition filesystemJob = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new KafkaSinkDefinition { Topic = "output" }, - Metadata = new JobMetadata - { - StateBackendConfig = new StateBackendConfig - { - Type = "filesystem", - CheckpointDir = "file:///tmp/checkpoints" - } - } - }; - - // Part B: Empty options dictionaries - JobDefinition emptyOptionsJob = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new KafkaSinkDefinition { Topic = "output" }, - Metadata = new JobMetadata - { - StateBackendConfig = new StateBackendConfig - { - Type = "rocksdb", - CheckpointDir = "s3://bucket/checkpoints", - DbOptions = new Dictionary(), // Empty but not null - ColumnFamilyOptions = new Dictionary() // Empty but not null - } - } - }; - - // Part C: State backend without batching - JobDefinition stateOnlyJob = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter" - // No BatchingConfig - } - }, - Metadata = new JobMetadata - { - StateBackendConfig = new StateBackendConfig - { - Type = "rocksdb", - CheckpointDir = "s3://bucket/checkpoints" - } - } - }; - - // Part D: Batching without state backend - JobDefinition batchingOnlyJob = new JobDefinition - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - BatchingConfig = new BatchingConfig - { - MaxBatchSize = 500 - } - } - }, - Metadata = new JobMetadata - { // No StateBackendConfig - } - }; - - // Act: Serialize and deserialize all jobs - JobDefinition[] jobs = [rocksDbJob, hashmapJob, filesystemJob, emptyOptionsJob, stateOnlyJob, batchingOnlyJob]; - List deserializedJobs = new List(); - - foreach (JobDefinition? job in jobs) - { - string json = JsonSerializer.Serialize(job); - JobDefinition? deserialized = JsonSerializer.Deserialize(json); - Assert.That(deserialized, Is.Not.Null, $"Job {job.Metadata.JobName} should deserialize"); - deserializedJobs.Add(deserialized!); - } - - // Assert: Different state backend types - Assert.That(deserializedJobs[0].Metadata.StateBackendConfig!.Type, Is.EqualTo("rocksdb")); - Assert.That(deserializedJobs[1].Metadata.StateBackendConfig!.Type, Is.EqualTo("hashmap")); - Assert.That(deserializedJobs[2].Metadata.StateBackendConfig!.Type, Is.EqualTo("filesystem")); - - // Assert: Empty dictionaries are preserved - Assert.That(deserializedJobs[3].Metadata.StateBackendConfig!.DbOptions, Is.Not.Null); - Assert.That(deserializedJobs[3].Metadata.StateBackendConfig!.DbOptions, Is.Empty); - Assert.That(deserializedJobs[3].Metadata.StateBackendConfig!.ColumnFamilyOptions, Is.Not.Null); - Assert.That(deserializedJobs[3].Metadata.StateBackendConfig!.ColumnFamilyOptions, Is.Empty); - - // Assert: State backend without batching - Assert.That(deserializedJobs[4].Metadata.StateBackendConfig, Is.Not.Null); - UnifiedSinkV2Definition? stateSink = deserializedJobs[4].Sink as UnifiedSinkV2Definition; - Assert.That(stateSink!.WriterConfig.BatchingConfig, Is.Null); - - // Assert: Batching without state backend - Assert.That(deserializedJobs[5].Metadata.StateBackendConfig, Is.Null); - UnifiedSinkV2Definition? batchSink = deserializedJobs[5].Sink as UnifiedSinkV2Definition; - Assert.That(batchSink!.WriterConfig.BatchingConfig, Is.Not.Null); - Assert.That(batchSink.WriterConfig.BatchingConfig!.MaxBatchSize, Is.EqualTo(500)); - } - - #endregion - - #region Test 5: Real-World Production Scenarios - - /// - /// Test 5: Validates realistic production scenarios including: - /// - High-throughput event processing pipeline - /// - Low-latency stream processing - /// - Multi-stage pipeline with complex operations - /// - Complete IR validation with all features enabled - /// - [Test] - public void Test5_ProductionScenarios_ValidatesRealWorldConfigurations() - { - // Part A: High-Throughput Scenario (maximize throughput, tolerate higher latency) - JobDefinition highThroughputJob = new JobDefinition - { - Source = new KafkaSourceDefinition - { - Topic = "high-volume-events", - BootstrapServers = "kafka:9092", - GroupId = "throughput-processor", - Properties = new Dictionary - { - { "fetch.min.bytes", "1048576" }, // 1MB - { "fetch.max.wait.ms", "500" } - } - }, - Operations = new List - { - new FilterOperationDefinition { Expression = "x != null && x.Length > 0" }, - new MapOperationDefinition { Expression = "x.Trim().ToLower()" } - }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - Properties = new Dictionary - { - { "topic", "processed-high-volume" }, - { "bootstrapServers", "kafka:9092" }, - { "compressionType", "snappy" }, - { "lingerMs", 100 }, - { "batchSize", 1_048_576 } // 1MB - }, - BatchingConfig = new BatchingConfig - { - MaxBatchSize = 5000, - MaxBatchSizeInBytes = 10 * 1024 * 1024, // 10MB - MaxTimeInBufferMs = 2000, // 2 seconds - MaxInFlightRequests = 100, - MaxBufferedRequests = 50000 - } - }, - Semantics = "at-least-once", // Favor throughput over exactly-once - Stateful = false - }, - Metadata = new JobMetadata - { - JobName = "High-Throughput Event Processor", - Version = "1.0", - Parallelism = 16, - StateBackendConfig = new StateBackendConfig - { - Type = "rocksdb", - CheckpointDir = "s3://production/high-throughput/checkpoints", - IncrementalCheckpoints = true, - PredefinedProfile = "flash_ssd_optimized", - DbOptions = new Dictionary - { - { "maxBackgroundJobs", 16 }, - { "maxOpenFiles", -1 }, - { "compactionStyle", "level" } - }, - ColumnFamilyOptions = new Dictionary - { - { "blockCacheSize", 1024 * 1024 * 1024L }, // 1GB - { "writeBufferSize", 256 * 1024 * 1024L } // 256MB - } - } - } - }; - - // Part B: Low-Latency Scenario (minimize latency, small batches) - JobDefinition lowLatencyJob = new JobDefinition - { - Source = new KafkaSourceDefinition - { - Topic = "realtime-events", - BootstrapServers = "kafka:9092", - GroupId = "latency-processor" - }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - Properties = new Dictionary - { - { "topic", "realtime-output" }, - { "bootstrapServers", "kafka:9092" }, - { "lingerMs", 0 } // Immediate send - }, - BatchingConfig = new BatchingConfig - { - MaxBatchSize = 10, - MaxBatchSizeInBytes = 10240, // 10KB - MaxTimeInBufferMs = 100, // 100ms - MaxInFlightRequests = 5 - } - }, - Semantics = "exactly-once", - Stateful = true, - CommitterConfig = new SinkCommitterConfig { Enabled = true } - }, - Metadata = new JobMetadata - { - JobName = "Low-Latency Stream Processor", - Version = "1.0", - Parallelism = 4, - StateBackendConfig = new StateBackendConfig - { - Type = "rocksdb", - CheckpointDir = "s3://production/low-latency/checkpoints", - IncrementalCheckpoints = true, - PredefinedProfile = "flash_ssd_optimized", - DbOptions = new Dictionary - { - { "maxBackgroundJobs", 4 }, - { "compactionStyle", "level" } - }, - ColumnFamilyOptions = new Dictionary - { - { "blockCacheSize", 128 * 1024 * 1024L } // 128MB (smaller for low-latency) - } - } - } - }; - - // Act: Serialize and validate both scenarios - string highThroughputJson = JsonSerializer.Serialize(highThroughputJob, new JsonSerializerOptions { WriteIndented = true }); - string lowLatencyJson = JsonSerializer.Serialize(lowLatencyJob, new JsonSerializerOptions { WriteIndented = true }); - - JobDefinition? highThroughputDeserialized = JsonSerializer.Deserialize(highThroughputJson); - JobDefinition? lowLatencyDeserialized = JsonSerializer.Deserialize(lowLatencyJson); - - // Assert: High-throughput configuration - Assert.That(highThroughputDeserialized, Is.Not.Null); - UnifiedSinkV2Definition? htSink = highThroughputDeserialized!.Sink as UnifiedSinkV2Definition; - Assert.That(htSink!.WriterConfig.BatchingConfig!.MaxBatchSize, Is.EqualTo(5000)); - Assert.That(htSink.WriterConfig.BatchingConfig.MaxBatchSizeInBytes, Is.EqualTo(10 * 1024 * 1024)); - Assert.That(htSink.Semantics, Is.EqualTo("at-least-once")); - Assert.That(highThroughputDeserialized.Metadata.Parallelism, Is.EqualTo(16)); - Assert.That(highThroughputDeserialized.Metadata.StateBackendConfig, Is.Not.Null); - Assert.That(highThroughputDeserialized.Metadata.StateBackendConfig!.ColumnFamilyOptions, Is.Not.Null); - Assert.That(highThroughputDeserialized.Metadata.StateBackendConfig.ColumnFamilyOptions!.ContainsKey("blockCacheSize"), Is.True); - - // Assert: Low-latency configuration - Assert.That(lowLatencyDeserialized, Is.Not.Null); - UnifiedSinkV2Definition? ltSink = lowLatencyDeserialized!.Sink as UnifiedSinkV2Definition; - Assert.That(ltSink!.WriterConfig.BatchingConfig!.MaxBatchSize, Is.EqualTo(10)); - Assert.That(ltSink.WriterConfig.BatchingConfig.MaxTimeInBufferMs, Is.EqualTo(100)); - Assert.That(ltSink.Semantics, Is.EqualTo("exactly-once")); - Assert.That(lowLatencyDeserialized.Metadata.Parallelism, Is.EqualTo(4)); - - // Assert: JSON contains production-relevant keywords - Assert.That(highThroughputJson, Does.Contain("high-throughput")); - Assert.That(highThroughputJson, Does.Contain("at-least-once")); - Assert.That(lowLatencyJson, Does.Contain("low-latency")); - Assert.That(lowLatencyJson, Does.Contain("exactly-once")); - - // Assert: Complete IR validation - Assert.That(highThroughputDeserialized.Source, Is.InstanceOf()); - Assert.That(highThroughputDeserialized.Operations, Has.Count.EqualTo(2)); - Assert.That(lowLatencyDeserialized.Sink, Is.InstanceOf()); - } - - #endregion } diff --git a/LocalTesting/LocalTesting.IntegrationTests/UnifiedSinkV2ConsolidatedTests.cs b/LocalTesting/LocalTesting.IntegrationTests/UnifiedSinkV2ConsolidatedTests.cs deleted file mode 100644 index 292bc464..00000000 --- a/LocalTesting/LocalTesting.IntegrationTests/UnifiedSinkV2ConsolidatedTests.cs +++ /dev/null @@ -1,600 +0,0 @@ -using System.Text.Json; -using Flink.JobBuilder.Models; -using FlinkDotNet.DataStream; -using NUnit.Framework; - -namespace LocalTesting.IntegrationTests; - -/// -/// Consolidated integration tests for Unified Sink API v2 (Flink 1.20+). -/// Combines IR schema and C# API tests into 5 comprehensive tests that maintain full coverage. -/// Maximum 5 tests per Flink version as per project guidelines. -/// -[TestFixture] -[Category("unified-sink-v2")] -public class UnifiedSinkV2ConsolidatedTests -{ - #region Test 1: Comprehensive Serialization & IR Schema - - /// - /// Test 1: Validates complete IR schema serialization including: - /// - Exactly-once semantics with committer - /// - At-least-once semantics without committer - /// - Custom sink types - /// - JSON round-trip serialization - /// - Multiple job definitions - /// - [Test] - public void Test1_ComprehensiveSerialization_ValidatesAllSemantics() - { - // Part A: Exactly-Once Kafka Sink with Committer - JobDefinition exactlyOnceJob = new() - { - Source = new KafkaSourceDefinition - { - Topic = "input-topic", - BootstrapServers = "localhost:9092", - GroupId = "test-group", - StartingOffsets = "earliest" - }, - Operations = new List - { - new MapOperationDefinition { Expression = "x => x.ToUpper()" } - }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaUnifiedWriter", - Properties = new Dictionary - { - { "topic", "output-topic" }, - { "bootstrapServers", "localhost:9092" } - } - }, - CommitterConfig = new SinkCommitterConfig - { - Enabled = true, - ClassName = "KafkaCommitter", - Properties = new Dictionary - { - { "transactionPrefix", "flink-" }, - { "transactionTimeout", 60000 } - } - }, - Semantics = "exactly-once", - Stateful = true, - Properties = new Dictionary - { - { "compression", "gzip" }, - { "maxInFlightRequests", "1" } - } - }, - Metadata = new JobMetadata - { - JobName = "Exactly-Once Test", - Version = "1.0", - Parallelism = 4 - } - }; - - // Part B: At-Least-Once File Sink (no committer) - JobDefinition atLeastOnceJob = new() - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "file", - WriterConfig = new SinkWriterConfig - { - ClassName = "FileWriter", - Properties = new Dictionary - { - { "path", "/tmp/output" }, - { "format", "parquet" } - } - }, - Semantics = "at-least-once", - Stateful = false - }, - Metadata = new JobMetadata { Version = "1.0" } - }; - - // Part C: Custom Sink - JobDefinition customSinkJob = new() - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "custom", - WriterConfig = new SinkWriterConfig - { - ClassName = "MyCustomWriter", - Properties = new Dictionary - { - { "endpoint", "https://api.example.com/ingest" }, - { "batchSize", 100 }, - { "timeout", 5000 } - } - }, - Semantics = "at-least-once", - Properties = new Dictionary - { - { "retryAttempts", "3" }, - { "backoffMs", "1000" } - } - }, - Metadata = new JobMetadata { Version = "1.0" } - }; - - // Act: Serialize and deserialize all three job types - JobDefinition[] jobs = [exactlyOnceJob, atLeastOnceJob, customSinkJob]; - List deserializedJobs = [.. jobs - .Select(job => JsonSerializer.Serialize(job, new JsonSerializerOptions { WriteIndented = true })) - .Select(json => JsonSerializer.Deserialize(json))]; - - // Assert: Exactly-Once Sink - Assert.That(deserializedJobs[0], Is.Not.Null); - UnifiedSinkV2Definition? exactlyOnceSink = deserializedJobs[0]!.Sink as UnifiedSinkV2Definition; - Assert.That(exactlyOnceSink, Is.Not.Null); - Assert.That(exactlyOnceSink!.Type, Is.EqualTo("unified_sink_v2")); - Assert.That(exactlyOnceSink.SinkType, Is.EqualTo("kafka")); - Assert.That(exactlyOnceSink.Semantics, Is.EqualTo("exactly-once")); - Assert.That(exactlyOnceSink.Stateful, Is.True); - Assert.That(exactlyOnceSink.WriterConfig!.ClassName, Is.EqualTo("KafkaUnifiedWriter")); - Assert.That(exactlyOnceSink.CommitterConfig, Is.Not.Null); - Assert.That(exactlyOnceSink.CommitterConfig!.Enabled, Is.True); - Assert.That(exactlyOnceSink.CommitterConfig.ClassName, Is.EqualTo("KafkaCommitter")); - - // Assert: At-Least-Once Sink - UnifiedSinkV2Definition? atLeastOnceSink = deserializedJobs[1]!.Sink as UnifiedSinkV2Definition; - Assert.That(atLeastOnceSink, Is.Not.Null); - Assert.That(atLeastOnceSink!.SinkType, Is.EqualTo("file")); - Assert.That(atLeastOnceSink.Semantics, Is.EqualTo("at-least-once")); - Assert.That(atLeastOnceSink.Stateful, Is.False); - Assert.That(atLeastOnceSink.CommitterConfig, Is.Null); - - // Assert: Custom Sink - UnifiedSinkV2Definition? customSink = deserializedJobs[2]!.Sink as UnifiedSinkV2Definition; - Assert.That(customSink, Is.Not.Null); - Assert.That(customSink!.SinkType, Is.EqualTo("custom")); - Assert.That(customSink.WriterConfig!.ClassName, Is.EqualTo("MyCustomWriter")); - Assert.That(customSink.WriterConfig.Properties!.ContainsKey("batchSize"), Is.True); - Assert.That(customSink.Properties!["retryAttempts"], Is.EqualTo("3")); - - // Assert: All sinks are independent - Assert.That(deserializedJobs, Has.Count.EqualTo(3)); - List sinkTypes = [.. deserializedJobs.Select(j => (j!.Sink as UnifiedSinkV2Definition)?.SinkType).Distinct()]; - Assert.That(sinkTypes, Has.Count.EqualTo(3)); - Assert.That(sinkTypes, Contains.Item("kafka")); - Assert.That(sinkTypes, Contains.Item("file")); - Assert.That(sinkTypes, Contains.Item("custom")); - } - - #endregion - - #region Test 2: C# API End-to-End Flow - - /// - /// Test 2: Validates complete C# API workflow including: - /// - Writer creation and element writing - /// - Flush and prepare commit operations - /// - Committer creation and commit - /// - SinkBuilder fluent API - /// - Element context handling - /// - [Test] - public async Task Test2_CSharpApiEndToEnd_WritesAndCommitsSuccessfully() - { - // Part A: Test with direct sink implementation - TestSink directTestSink = new(); - SinkWriterContext writerContext = new() - { - SubtaskId = 0, - NumberOfParallelSubtasks = 1, - AttemptNumber = 0 - }; - ElementContext elementContext = new() - { - Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), - Watermark = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - 1000 - }; - - // Create writer and write elements - ISinkWriter writer = await directTestSink.CreateWriterAsync(writerContext); - await writer.WriteAsync("element1", elementContext); - await writer.WriteAsync("element2", elementContext); - await writer.WriteAsync("element3", elementContext); - - // Flush and prepare commit - await writer.FlushAsync(false); - List committables = await writer.PrepareCommitAsync(); - - // Create committer and commit - ICommitter? committer = directTestSink.CreateCommitter(); - Assert.That(committer, Is.Not.Null); - await committer!.CommitAsync(committables); - - // Assert: Direct implementation - Assert.That(directTestSink.WrittenElements, Has.Count.EqualTo(3)); - Assert.That(directTestSink.CommittedElements, Has.Count.EqualTo(3)); - - // Part B: Test with SinkBuilder - List builderWrittenElements = new(); - List builderCommittedElements = new(); - - ISink builtSink = new SinkBuilder() - .WithWriter((ctx, state, ct) => Task.FromResult>( - new TestWriter(builderWrittenElements, state))) - .WithCommitter(() => new TestCommitter(builderCommittedElements)) - .Build(); - - ISinkWriter builderWriter = await builtSink.CreateWriterAsync(writerContext); - await builderWriter.WriteAsync("test1", elementContext); - await builderWriter.WriteAsync("test2", elementContext); - - List builderCommittables = await builderWriter.PrepareCommitAsync(); - ICommitter? builderCommitter = builtSink.CreateCommitter(); - await builderCommitter!.CommitAsync(builderCommittables); - - // Assert: SinkBuilder - Assert.That(builderWrittenElements, Has.Count.EqualTo(2)); - Assert.That(builderCommittedElements, Has.Count.EqualTo(2)); - Assert.That(builderWrittenElements, Contains.Item("test1")); - Assert.That(builderWrittenElements, Contains.Item("test2")); - } - - #endregion - - #region Test 3: State Management & Checkpointing - - /// - /// Test 3: Validates state management including: - /// - State snapshot creation - /// - State restoration - /// - Parallel writer independence - /// - Checkpoint coordination - /// - [Test] - public async Task Test3_StateManagement_HandlesSnapshotsAndParallelism() - { - // Part A: State snapshot progression - TestSink sink = new(); - SinkWriterContext context = new() - { - SubtaskId = 0, - NumberOfParallelSubtasks = 1 - }; - - ISinkWriter writer = await sink.CreateWriterAsync(context, restoredState: 100); - - int state1 = await writer.SnapshotStateAsync(1); - int state2 = await writer.SnapshotStateAsync(2); - int state3 = await writer.SnapshotStateAsync(3); - - // Assert: State increments from restored value - Assert.That(state1, Is.EqualTo(100)); - Assert.That(state2, Is.EqualTo(101)); - Assert.That(state3, Is.EqualTo(102)); - - // Part B: Parallel writers with independent state - TestSink parallelSink = new(); - - SinkWriterContext writer1Context = new() - { - SubtaskId = 0, - NumberOfParallelSubtasks = 2 - }; - SinkWriterContext writer2Context = new() - { - SubtaskId = 1, - NumberOfParallelSubtasks = 2 - }; - - ISinkWriter writer1 = await parallelSink.CreateWriterAsync(writer1Context, restoredState: 0); - ISinkWriter writer2 = await parallelSink.CreateWriterAsync(writer2Context, restoredState: 500); - - int writer1State = await writer1.SnapshotStateAsync(1); - int writer2State = await writer2.SnapshotStateAsync(1); - - // Assert: Each writer maintains independent state - Assert.That(writer1State, Is.EqualTo(0)); - Assert.That(writer2State, Is.EqualTo(500)); - - // Advance each writer independently - await writer1.SnapshotStateAsync(2); - int writer1State2 = await writer1.SnapshotStateAsync(3); - int writer2State2 = await writer2.SnapshotStateAsync(2); - - Assert.That(writer1State2, Is.EqualTo(2)); // Advanced 3 times from 0 - Assert.That(writer2State2, Is.EqualTo(501)); // Advanced 2 times from 500 - } - - #endregion - - #region Test 4: Backward Compatibility - - /// - /// Test 4: Validates backward compatibility including: - /// - Coexistence with legacy KafkaSinkDefinition - /// - Type discriminator handling - /// - Independent serialization paths - /// - No breaking changes to existing code - /// - [Test] - public void Test4_BackwardCompatibility_CoexistsWithLegacySinks() - { - // Part A: Legacy Kafka Sink - JobDefinition legacyJob = new() - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new KafkaSinkDefinition - { - Topic = "legacy-output", - BootstrapServers = "localhost:9092", - Serializer = "json" - }, - Metadata = new JobMetadata { Version = "1.0" } - }; - - // Part B: Unified Sink v2 - JobDefinition unifiedJob = new() - { - Source = new KafkaSourceDefinition { Topic = "input" }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - Properties = new Dictionary - { - { "topic", "unified-output" }, - { "bootstrapServers", "localhost:9092" } - } - }, - Semantics = "exactly-once", - CommitterConfig = new SinkCommitterConfig { Enabled = true } - }, - Metadata = new JobMetadata { Version = "1.0" } - }; - - // Act: Serialize both - string legacyJson = JsonSerializer.Serialize(legacyJob); - string unifiedJson = JsonSerializer.Serialize(unifiedJob); - - JobDefinition? legacyDeserialized = JsonSerializer.Deserialize(legacyJson); - JobDefinition? unifiedDeserialized = JsonSerializer.Deserialize(unifiedJson); - - // Assert: Both patterns work independently - Assert.That(legacyDeserialized!.Sink, Is.InstanceOf()); - Assert.That(unifiedDeserialized!.Sink, Is.InstanceOf()); - - // Assert: Type discriminators are different - KafkaSinkDefinition? legacySink = legacyDeserialized.Sink as KafkaSinkDefinition; - UnifiedSinkV2Definition? unifiedSink = unifiedDeserialized.Sink as UnifiedSinkV2Definition; - - Assert.That(legacySink?.Type, Is.EqualTo("kafka")); - Assert.That(unifiedSink?.Type, Is.EqualTo("unified_sink_v2")); - - // Assert: Properties are preserved correctly - Assert.That(legacySink?.Topic, Is.EqualTo("legacy-output")); - Assert.That(unifiedSink?.Semantics, Is.EqualTo("exactly-once")); - } - - #endregion - - #region Test 5: Advanced Features & Integration - - /// - /// Test 5: Validates advanced features including: - /// - DataStream API integration - /// - Committer retry logic - /// - Complete job validation with all components - /// - Complex job pipelines - /// - [Test] - public async Task Test5_AdvancedFeatures_IntegratesWithDataStreamAndRetries() - { - // Part A: DataStream Integration - StreamExecutionEnvironment env = StreamExecutionEnvironment.GetExecutionEnvironment(); - DataStream stream = env.FromCollection(new[] { "item1", "item2", "item3" }); - - ISink sink = new SinkBuilder() - .WithWriter((ctx, state, ct) => Task.FromResult>( - new TestWriter(new List(), state))) - .Build(); - - DataStream result = stream.AddSink(sink); - - Assert.That(result, Is.Not.Null, "AddSink should return a DataStream"); - Assert.That(result, Is.SameAs(stream), "Fluent API should return same stream"); - - // Part B: Committer Retry Logic - List committedElements = new(); - int commitAttempts = 0; - - RetryableCommitter retryCommitter = new(committedElements, attempts => commitAttempts = attempts); - - // First attempt (partial failure) - List failures1 = await retryCommitter.CommitAsync(new List { "item1", "item2", "item3" }); - Assert.That(failures1, Is.Not.Empty, "First attempt should have failures"); - Assert.That(commitAttempts, Is.EqualTo(1)); - - // Retry failed items - List failures2 = await retryCommitter.CommitAsync(failures1); - Assert.That(failures2, Is.Empty, "Retry should succeed for all items"); - Assert.That(commitAttempts, Is.EqualTo(2)); - Assert.That(committedElements, Has.Count.GreaterThan(0)); - - // Part C: Complete Job with All Components - JobDefinition? completeJob = new() - { - Source = new KafkaSourceDefinition - { - Topic = "events", - BootstrapServers = "kafka:9092", - GroupId = "processor", - StartingOffsets = "latest" - }, - Operations = new List - { - new FilterOperationDefinition { Expression = "x.Length > 0" }, - new MapOperationDefinition { Expression = "x.ToUpper()" } - }, - Sink = new UnifiedSinkV2Definition - { - SinkType = "kafka", - WriterConfig = new SinkWriterConfig - { - ClassName = "KafkaWriter", - Properties = new Dictionary - { - { "topic", "processed-events" }, - { "bootstrapServers", "kafka:9092" }, - { "compressionType", "gzip" } - } - }, - CommitterConfig = new SinkCommitterConfig - { - Enabled = true, - ClassName = "KafkaCommitter", - Properties = new Dictionary - { - { "transactionPrefix", "flink-processor-" }, - { "transactionTimeout", 60000 } - } - }, - Semantics = "exactly-once", - Stateful = true, - Properties = new Dictionary - { - { "maxInFlightRequests", "1" }, - { "acks", "all" } - } - }, - Metadata = new JobMetadata - { - JobName = "Complete Event Processor", - Version = "1.0", - Parallelism = 4 - } - }; - - string json = JsonSerializer.Serialize(completeJob, new JsonSerializerOptions { WriteIndented = true }); - - // Assert: Complete job structure - Assert.That(json, Does.Contain("unified_sink_v2")); - Assert.That(json, Does.Contain("exactly-once")); - Assert.That(json, Does.Contain("KafkaWriter")); - Assert.That(json, Does.Contain("KafkaCommitter")); - - JobDefinition? deserialized = JsonSerializer.Deserialize(json); - Assert.That(deserialized, Is.Not.Null); - Assert.That(deserialized!.Source, Is.InstanceOf()); - Assert.That(deserialized.Operations, Has.Count.EqualTo(2)); - Assert.That(deserialized.Sink, Is.InstanceOf()); - Assert.That(deserialized.Metadata.Parallelism, Is.EqualTo(4)); - } - - #endregion - - #region Helper Classes - - private class TestSink : ISink - { - private readonly List _writtenElements = new(); - private readonly List _committedElements = new(); - - public List WrittenElements => this._writtenElements; - public List CommittedElements => this._committedElements; - - // SonarQube S1006: Analyzer limitation with generic type resolution - [System.Diagnostics.CodeAnalysis.SuppressMessage("Critical Code Smell", "S1006:Method overrides should not change parameter defaults", Justification = "")] - [System.Diagnostics.CodeAnalysis.SuppressMessage("CodeQuality", "IDE0079:Remove unnecessary suppression", Justification = "")] - public Task> CreateWriterAsync( - SinkWriterContext context, - int restoredState = default, - CancellationToken cancellationToken = default) - { - return Task.FromResult>( - new TestWriter(this._writtenElements, restoredState)); - } - - public ICommitter? CreateCommitter() => new TestCommitter(this._committedElements); - - public IGlobalCommitter? CreateGlobalCommitter() => null; - } - - private class TestWriter(List elements, int initialState) : ISinkWriter - { - private readonly List _elements = elements; - private readonly List _pendingCommits = new(); - private int _state = initialState; - - public Task WriteAsync(string element, ElementContext context, CancellationToken cancellationToken = default) - { - this._elements.Add(element); - this._pendingCommits.Add(element); - return Task.CompletedTask; - } - - public Task FlushAsync(bool endOfInput, CancellationToken cancellationToken = default) => Task.CompletedTask; - - public Task> PrepareCommitAsync(CancellationToken cancellationToken = default) - { - List result = [.. this._pendingCommits]; - this._pendingCommits.Clear(); - return Task.FromResult(result); - } - - public Task SnapshotStateAsync(long checkpointId, CancellationToken cancellationToken = default) => Task.FromResult(this._state++); - - public ValueTask DisposeAsync() => ValueTask.CompletedTask; - } - - private class TestCommitter(List committedElements) : ICommitter - { - private readonly List _committedElements = committedElements; - - public Task> CommitAsync(List committables, CancellationToken cancellationToken = default) - { - this._committedElements.AddRange(committables); - return Task.FromResult(new List()); // No failures - } - - public Task CloseAsync() => Task.CompletedTask; - } - - private class RetryableCommitter(List committedElements, Action reportAttempt) : ICommitter - { - private readonly List _committedElements = committedElements; - private int _attempts; - private readonly Action _reportAttempt = reportAttempt; - - public Task> CommitAsync(List committables, CancellationToken cancellationToken = default) - { - this._attempts++; - this._reportAttempt(this._attempts); - - if (this._attempts == 1) - { - // First attempt - commit half, fail half - int halfSize = committables.Count / 2; - this._committedElements.AddRange(committables.GetRange(0, halfSize)); - return Task.FromResult(committables.GetRange(halfSize, committables.Count - halfSize)); - } - else - { - // Subsequent attempts - commit all - this._committedElements.AddRange(committables); - return Task.FromResult(new List()); - } - } - - public Task CloseAsync() => Task.CompletedTask; - } - - #endregion -} diff --git a/docs/flink-20-features.md b/docs/flink-20-features.md deleted file mode 100644 index a3689f15..00000000 --- a/docs/flink-20-features.md +++ /dev/null @@ -1,424 +0,0 @@ -# Apache Flink 2.0 Features in FlinkDotNet - -FlinkDotNet provides comprehensive support for Apache Flink 2.0 features, including the revolutionary disaggregated state management architecture and other major improvements. - -## Overview of Flink 2.0 - -Apache Flink 2.0 (released March 24, 2025) represents a major evolution in real-time stream and batch processing, with significant changes for cloud-native deployments: - -### Key Flink 2.0 Features Implemented in FlinkDotNet - -1. ✅ **Disaggregated State Management** - Remote storage as primary state backend -2. ✅ **Materialized Tables** - Already implemented in Flink 1.20 -3. ✅ **Adaptive Batch Execution** - Dynamic query optimization -4. ✅ **Streaming Lakehouse Integration** - Apache Paimon support (Flink 1.15-1.18) -5. ✅ **Unified Sink API v2** - Modern sink pattern (Flink 1.20) -6. ✅ **Enhanced AdaptiveScheduler** - Improved rescaling and checkpointing coordination - -## Disaggregated State Management - -Apache Flink 2.0 introduces a groundbreaking state management architecture that uses remote/disaggregated storage as primary state storage. This architecture decouples state storage from compute resources, enabling: - -- **Massive Scalability**: Handle hundreds of TB of state -- **Cloud-Native Optimization**: Ideal for Kubernetes and cloud deployments -- **Faster Recovery**: Improved checkpoint and recovery performance -- **Dynamic Scaling**: Easier job rescaling with reduced state transfer overhead -- **Resource Efficiency**: Minimizes CPU and network spikes during state operations - -### State Backend Options - -FlinkDotNet supports all major state backend types introduced in Flink 2.0: - -#### 1. DisaggregatedStateBackend (New in Flink 2.0) - -The default state backend for Flink 2.0, using remote storage for state. - -```csharp -using FlinkDotNet.DataStream.State; - -// S3-based disaggregated state (AWS) -var env = Flink.GetExecutionEnvironment() - .SetStateBackend(new DisaggregatedStateBackend( - DisaggregatedStorageType.S3, - "s3://my-flink-bucket/state" - ) - .EnableIncrementalCheckpointing(true) - .EnableStateCompression(true) - .SetAsyncCompactionThreads(8)); - -// HDFS-based disaggregated state (On-premise) -var hdfsBackend = new DisaggregatedStateBackend() - .SetStorageType(DisaggregatedStorageType.HDFS) - .SetStoragePath("hdfs://namenode:9000/flink/state") - .EnableIncrementalCheckpointing(true); - -env.SetStateBackend(hdfsBackend); - -// Azure Blob Storage (Azure) -var azureBackend = new DisaggregatedStateBackend() - .SetStorageType(DisaggregatedStorageType.AZURE_BLOB) - .SetStoragePath("wasbs://container@account.blob.core.windows.net/flink-state") - .EnableStateCompression(true) - .SetAsyncCompactionThreads(4); - -env.SetStateBackend(azureBackend); - -// Google Cloud Storage (GCP) -var gcsBackend = new DisaggregatedStateBackend() - .SetStorageType(DisaggregatedStorageType.GCS) - .SetStoragePath("gs://my-gcs-bucket/flink-state") - .EnableIncrementalCheckpointing(true); - -env.SetStateBackend(gcsBackend); -``` - -**Configuration Options:** - -- **Storage Type**: S3, HDFS, Azure Blob Storage, or Google Cloud Storage -- **Storage Path**: Remote storage location for state data -- **Incremental Checkpointing**: Enabled by default for efficiency -- **State Compression**: Reduces storage costs and network bandwidth -- **Async Compaction Threads**: Controls parallelism of state compaction (default: 4) - -**When to Use:** -- Cloud-native deployments (AWS, Azure, GCP) -- Very large state (hundreds of TB) -- Kubernetes-based Flink clusters -- Dynamic scaling requirements -- High availability with fast recovery needs - -#### 2. EmbeddedRocksDBStateBackend (Flink 1.x+) - -Off-heap state storage using RocksDB, suitable for large state on local disk. - -```csharp -// RocksDB with default configuration -var rocksDbBackend = new EmbeddedRocksDBStateBackend() - .EnableIncrementalCheckpointing(true); - -env.SetStateBackend(rocksDbBackend); - -// RocksDB optimized for SSD -var ssdBackend = new EmbeddedRocksDBStateBackend() - .SetPredefinedOptions(RocksDBPredefinedOptions.FLASH_SSD_OPTIMIZED) - .SetDbStoragePath("/data/flink/rocksdb") - .EnableIncrementalCheckpointing(true); - -env.SetStateBackend(ssdBackend); - -// RocksDB optimized for spinning disk with high memory -var spinningDiskBackend = new EmbeddedRocksDBStateBackend() - .SetPredefinedOptions(RocksDBPredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM) - .SetDbStoragePath("/mnt/disk/rocksdb") - .EnableIncrementalCheckpointing(true); - -env.SetStateBackend(spinningDiskBackend); -``` - -**When to Use:** -- On-premise deployments with local disk -- Large state that exceeds available memory -- Jobs requiring high-throughput checkpointing -- When remote storage is not available or preferred - -#### 3. HashMapStateBackend (Flink 1.x+) - -In-memory state storage, suitable for small state and development. - -```csharp -// In-memory state backend -var hashMapBackend = new HashMapStateBackend(); -env.SetStateBackend(hashMapBackend); -``` - -**When to Use:** -- Development and testing -- Jobs with small state (fits in memory) -- Jobs requiring very low latency state access -- Prototyping and experimentation - -### State Backend Comparison - -| Feature | DisaggregatedStateBackend | EmbeddedRocksDBStateBackend | HashMapStateBackend | -|---------|---------------------------|----------------------------|---------------------| -| **Storage Location** | Remote (S3, HDFS, Azure, GCS) | Local disk (RocksDB) | Memory (on-heap) | -| **State Size Limit** | Hundreds of TB | Limited by disk | Limited by memory | -| **Incremental Checkpointing** | ✅ Supported | ✅ Supported | ❌ Not supported | -| **Latency** | Higher (network I/O) | Medium (disk I/O) | Lowest (memory) | -| **Scalability** | Excellent | Good | Limited | -| **Cloud-Native** | ✅ Optimized | Moderate | Not recommended | -| **Recovery Speed** | Fast | Medium | Fast | -| **Resource Usage** | Minimal local resources | Disk + CPU | Memory intensive | -| **Best For** | Cloud, large state, K8s | On-premise, large state | Small state, dev/test | - -## Unified Batch and Stream Processing - -Flink 2.0 further unifies batch and stream processing with enhanced optimizations: - -### Adaptive Broadcast Joins - -```csharp -// Automatically optimizes join strategy based on data size -var orders = env.FromKafka("orders"); -var products = env.FromKafka("products"); - -var enrichedOrders = orders - .Join(products) - .Where(order => order.ProductId) - .EqualTo(product => product.Id) - .With((order, product) => new EnrichedOrder - { - OrderId = order.Id, - ProductName = product.Name, - Amount = order.Amount - }); -``` - -### Dynamic Partition Pruning - -Flink 2.0 automatically prunes partitions based on filter predicates, improving batch query performance. - -## Enhanced Fault Tolerance and Recovery - -### Improved Checkpoint Performance - -```csharp -// Configure enhanced checkpointing for Flink 2.0 -env.EnableCheckpointing(TimeSpan.FromSeconds(30)); - -var checkpointConfig = env.GetCheckpointConfig(); -checkpointConfig - .SetCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) - .SetMinPauseBetweenCheckpoints(TimeSpan.FromSeconds(5)) - .SetCheckpointTimeout(TimeSpan.FromMinutes(10)) - .SetMaxConcurrentCheckpoints(1) - .SetTolerableCheckpointFailureNumber(3) - .EnableExternalizedCheckpoints( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION - ) - .EnableUnalignedCheckpoints(true); // New in Flink 2.0 for faster checkpoints -``` - -### Fast Recovery with Disaggregated State - -When using DisaggregatedStateBackend, recovery is optimized: - -```csharp -// State is already in remote storage, enabling fast recovery -var backend = new DisaggregatedStateBackend( - DisaggregatedStorageType.S3, - "s3://flink-state-bucket/checkpoints" -) -.EnableIncrementalCheckpointing(true) -.EnableStateCompression(true); - -env.SetStateBackend(backend); -env.EnableCheckpointing(TimeSpan.FromMinutes(5)); - -// Job can recover quickly from failures without large state transfers -await env.ExecuteAsync("fault-tolerant-job"); -``` - -## Migration from Flink 1.x to 2.0 - -### State Backend Migration - -Migrating from legacy state backends to DisaggregatedStateBackend: - -```csharp -// Legacy (Flink 1.x) -var oldBackend = new EmbeddedRocksDBStateBackend() - .EnableIncrementalCheckpointing(true); - -// Modern (Flink 2.0) -var newBackend = new DisaggregatedStateBackend( - DisaggregatedStorageType.S3, - "s3://my-bucket/flink-state" -) -.EnableIncrementalCheckpointing(true); - -// Migration steps: -// 1. Take a savepoint with old backend -// 2. Update state backend configuration -// 3. Restore from savepoint with new backend -``` - -## Adaptive Batch Execution - -Flink 2.0 enhances adaptive batch execution with dynamic query optimization based on runtime data. - -### Adaptive Broadcast Join - -Flink 2.0 automatically optimizes join strategies based on runtime data sizes: - -```csharp -// Flink will automatically choose the best join strategy -var orders = env.FromKafka("orders"); -var products = env.FromKafka("products"); - -// Adaptive broadcast join - automatically switches to broadcast -// if one side is small enough -var enrichedOrders = orders - .Join(products) - .Where(order => order.ProductId) - .EqualTo(product => product.Id) - .With((order, product) => new EnrichedOrder - { - OrderId = order.Id, - ProductName = product.Name, - Amount = order.Amount - }); -``` - -### Enhanced AdaptiveScheduler - -The AdaptiveScheduler in Flink 2.0 now synchronizes checkpointing and rescaling: - -```csharp -// Enable adaptive scheduler with automatic parallelism -var env = Flink.GetExecutionEnvironment() - .EnableAdaptiveScheduler(true) - .SetMaxParallelism(256); - -// Flink will automatically adjust parallelism based on -// available resources and workload characteristics -env.EnableCheckpointing(TimeSpan.FromMinutes(5)); - -// AdaptiveScheduler coordinates rescaling with checkpoints, -// minimizing reprocessing time -``` - -### Dynamic Partition Pruning - -Flink 2.0 automatically prunes partitions based on filter predicates in batch queries: - -```csharp -// Partition pruning happens automatically in SQL queries -var tableEnv = TableEnvironment.Create(env); - -// Flink will automatically prune partitions based on the date filter -tableEnv.ExecuteSql(@" - SELECT * FROM orders - WHERE order_date >= '2025-01-01' - AND order_date < '2025-02-01' -"); -``` - -## Configuration Migration (flink-conf.yaml → config.yaml) - -Flink 2.0 replaces the legacy `flink-conf.yaml` with a new `config.yaml` format. FlinkDotNet handles this automatically through its configuration APIs: - -```csharp -// FlinkDotNet uses modern configuration internally -var env = Flink.GetExecutionEnvironment() - .SetParallelism(8) - .EnableCheckpointing(TimeSpan.FromMinutes(5)); - -// Configuration is automatically converted to Flink 2.0 format -env.GetCheckpointConfig() - .SetCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) - .SetCheckpointTimeout(TimeSpan.FromMinutes(10)); -``` - -**Note**: When deploying to Flink clusters, ensure your cluster configuration uses `config.yaml` instead of `flink-conf.yaml`. Flink provides a migration tool for existing configurations. - -## Performance Best Practices - -### State Backend Selection Guide - -1. **For Cloud Deployments (AWS, Azure, GCP)**: - ```csharp - var backend = new DisaggregatedStateBackend( - DisaggregatedStorageType.S3, // or AZURE_BLOB, GCS - "s3://bucket/state" - ) - .EnableIncrementalCheckpointing(true) - .EnableStateCompression(true) - .SetAsyncCompactionThreads(8); - ``` - -2. **For On-Premise with Large State**: - ```csharp - var backend = new EmbeddedRocksDBStateBackend() - .SetPredefinedOptions(RocksDBPredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM) - .EnableIncrementalCheckpointing(true); - ``` - -3. **For Development and Testing**: - ```csharp - var backend = new HashMapStateBackend(); - ``` - -### Optimization Tips - -- **Enable Incremental Checkpointing**: Reduces checkpoint time and storage -- **Use State Compression**: Lowers storage costs and network bandwidth -- **Tune Async Compaction Threads**: Balance between throughput and CPU usage -- **Configure Checkpoint Intervals**: Longer intervals reduce overhead but increase recovery time -- **Monitor State Size**: Use Flink metrics to track state growth - -## Breaking Changes and API Removals in Flink 2.0 - -Flink 2.0 removes several deprecated APIs. FlinkDotNet only implements modern APIs, so these changes don't affect FlinkDotNet users: - -### Removed APIs (Not Applicable to FlinkDotNet) - -- ❌ **DataSet API** - Removed in Flink 2.0 (FlinkDotNet uses DataStream API) -- ❌ **Legacy SourceFunction/SinkFunction** - Replaced by Unified Source/Sink APIs (FlinkDotNet implements modern APIs) -- ❌ **Scala API** - Removed in Flink 2.0 (FlinkDotNet is C#-based) -- ❌ **Legacy TableSource/TableSink** - Replaced by DynamicTableSource/DynamicTableSink - -### FlinkDotNet Compatibility - -FlinkDotNet is fully compatible with Flink 2.0 because: - -1. **Modern APIs Only**: FlinkDotNet implements only modern Flink APIs (DataStream API, Table API, Unified Source/Sink v2) -2. **No Legacy Dependencies**: No reliance on removed APIs like DataSet or Scala -3. **Forward Compatible**: Code written for FlinkDotNet works seamlessly with Flink 2.0 clusters -4. **State Backend Support**: Full support for both legacy (RocksDB, HashMap) and new (Disaggregated) state backends - -### Migration Notes - -If you're upgrading Flink cluster from 1.x to 2.0: - -1. **Savepoints are Compatible**: Take a savepoint with Flink 1.x, restore with Flink 2.0 -2. **Update Configuration**: Migrate `flink-conf.yaml` to `config.yaml` -3. **Update State Backend**: Consider migrating to DisaggregatedStateBackend for better scalability -4. **Test Thoroughly**: Validate jobs in staging environment before production deployment - -## What's New in Flink 2.0 - Complete Summary - -### Architecture & Performance -- ✅ **Disaggregated State Management** - Remote storage as primary state backend -- ✅ **Adaptive Batch Execution** - Dynamic query optimization with broadcast joins -- ✅ **Enhanced AdaptiveScheduler** - Synchronized checkpointing and rescaling -- ✅ **Dynamic Partition Pruning** - Automatic partition pruning in batch queries -- ✅ **Native File Copy for S3** - s5cmd integration for faster recovery (infrastructure level) - -### Data Processing -- ✅ **Materialized Tables** - Simplified ETL with automatic refresh (implemented in Flink 1.20) -- ✅ **Streaming Lakehouse** - Deep Apache Paimon integration (implemented in Flink 1.15-1.18) -- ✅ **Unified Sink API v2** - Modern, reliable sink pattern (implemented in Flink 1.20) - -### Developer Experience -- ✅ **Unified Programming Model** - Table API/SQL for both batch and stream -- ✅ **Modern Configuration** - New `config.yaml` format -- ✅ **API Cleanup** - Removal of deprecated APIs (doesn't affect FlinkDotNet) - -### Cloud-Native Optimization -- ✅ **Kubernetes Optimization** - Disaggregated state ideal for K8s deployments -- ✅ **Multi-Cloud Support** - S3, Azure Blob, GCS, HDFS storage backends -- ✅ **Resource Efficiency** - Minimized resource spikes during state operations - -## See Also - -- [Flink 2.1 Features](flink-21-features.md) - AI/ML integration and advanced features -- [API Reference](api-reference.md) - Complete API documentation -- [Getting Started](getting-started.md) - Setup and first job -- [Performance Benchmarks](performance-benchmarks.md) - Performance comparisons - -## References - -- [Apache Flink 2.0 Release Notes](https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-2.0/) -- [Apache Flink 2.0 Announcement](https://flink.apache.org/2025/03/24/apache-flink-2.0.0-a-new-era-of-real-time-data-processing/) -- [Disaggregated State Management](https://flink.apache.org/2025/03/24/apache-flink-2.0.0-a-new-era-of-real-time-data-processing/#disaggregated-state-management) diff --git a/docs/flink-21-features.md b/docs/flink-21-features.md deleted file mode 100644 index df8ac6c5..00000000 --- a/docs/flink-21-features.md +++ /dev/null @@ -1,363 +0,0 @@ -# Apache Flink 2.1 Features in FlinkDotNet - -FlinkDotNet provides comprehensive support for Apache Flink 2.1 features through native C# APIs. - -## Dynamic Scaling and Resource Management - -### Adaptive Scheduler - -Automatically optimizes parallelism based on workload characteristics. - -```csharp -var env = Flink.GetExecutionEnvironment() - .EnableAdaptiveScheduler() - .SetMaxParallelism(128); -``` - -### Reactive Mode - -Elastic scaling that adapts to available cluster resources. - -```csharp -var env = Flink.GetExecutionEnvironment() - .EnableReactiveMode() - .SetParallelism(8); // Initial parallelism -``` - -### Savepoint-based Scaling - -Scale jobs using savepoints for state consistency. - -```csharp -// Execute job and get JobClient -var jobClient = await env.ExecuteAsyncJob("my-job"); - -// Create savepoint for scaling -var savepointResult = await jobClient.TriggerSavepointAsync("/savepoints/scaling"); - -// Stop with savepoint -var stopResult = await jobClient.StopWithSavepointAsync( - savepointPath: savepointResult.SavepointPath, - drain: true -); - -// Restart with new parallelism -var scaledEnv = Flink.GetExecutionEnvironment() - .FromSavepoint(stopResult.SavepointPath) - .SetParallelism(16) // New parallelism - .SetMaxParallelism(256); - -await scaledEnv.ExecuteAsyncJob("scaled-job"); -``` - -## Advanced Partitioning Strategies - -### Rebalance -Uniform distribution across all operators. - -```csharp -stream.Map(x => x * 2) - .Rebalance() // Round-robin distribution - .SetParallelism(8) -``` - -### Rescale -Efficient distribution to subset of operators. - -```csharp -stream.Map(x => x * 3) - .Rescale() // Local round-robin - .Filter(x => x > 10) -``` - -### Forward -Direct one-to-one forwarding. - -```csharp -stream.Forward() // Same parallelism required - .Map(x => x + 1) -``` - -### Shuffle -Random distribution. - -```csharp -stream.Shuffle() // Random partitioning - .Map(x => x * 2) -``` - -### Broadcast -Send to all parallel instances. - -```csharp -stream.Broadcast() // All instances receive all elements - .Map(x => x + 10) -``` - -### Custom Partitioning - -```csharp -stream.PartitionCustom( - (key, numPartitions) => key % numPartitions, // Partitioner - x => x.GetHashCode() // Key selector -) -``` - -## Fine-Grained Resource Management - -### Slot Sharing Groups - -```csharp -// Heavy operations in dedicated slots -stream1.Map(new HeavyProcessor()) - .SetParallelism(4) - .SlotSharingGroup("heavy-processing"); - -// Light operations share slots -stream2.Map(new LightProcessor()) - .SetParallelism(8) - .SlotSharingGroup("light-processing"); -``` - -### Resource Profiles - -```csharp -env.SetParallelism(8) - .SetMaxParallelism(128) - .EnableSlotSharing(); // Resource optimization -``` - -## Enhanced Checkpointing - -### Checkpoint Configuration - -```csharp -env.EnableCheckpointing(5000); // Every 5 seconds - -env.GetCheckpointConfig() - .SetCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) - .SetMinPauseBetweenCheckpoints(1000) - .SetCheckpointTimeout(60000) - .SetMaxConcurrentCheckpoints(1) - .EnableExternalizedCheckpoints( - ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); -``` - -### Savepoint Operations - -```csharp -// Trigger savepoint -var savepoint = await jobClient.TriggerSavepointAsync("/savepoints/manual"); - -// Cancel with savepoint -var cancelResult = await jobClient.CancelWithSavepointAsync(); - -// Dispose savepoint -await jobClient.DisposeSavepointAsync("/savepoints/old"); -``` - -## Advanced Restart Strategies - -### Exponential Delay - -```csharp -env.SetRestartStrategy(RestartStrategies.ExponentialDelayRestart( - maxAttempts: 10, - initialDelay: Time.Seconds(1), - maxDelay: Time.Minutes(5), - multiplier: 2.0 -)); -``` - -### Fixed Delay - -```csharp -env.SetRestartStrategy(RestartStrategies.FixedDelayRestart( - restartAttempts: 3, - delayBetweenAttempts: Time.Seconds(10) -)); -``` - -### Failure Rate - -```csharp -env.SetRestartStrategy(RestartStrategies.FailureRateRestart( - maxFailuresPerInterval: 5, - failureInterval: Time.Minutes(5), - delayBetweenAttempts: Time.Seconds(10) -)); -``` - -## Watermark Strategies - -### Bounded Out-of-Orderness - -```csharp -stream.AssignTimestampsAndWatermarks( - WatermarkStrategy - .ForBoundedOutOfOrderness(Duration.OfSeconds(5)) - .WithTimestampAssigner(e => e.Timestamp.ToUnixTimeMilliseconds()) - .WithIdleness(Duration.OfSeconds(60)) // Handle idle sources -); -``` - -### Monotonous Timestamps - -```csharp -stream.AssignTimestampsAndWatermarks( - WatermarkStrategy - .ForMonotonousTimestamps() - .WithTimestampAssigner(e => e.Timestamp) -); -``` - -### Custom Watermark Generator - -```csharp -public class CustomWatermarkGenerator : IWatermarkGenerator -{ - private long maxTimestamp = long.MinValue; - private readonly long outOfOrdernessMillis = 5000; - - public void OnEvent(Event event, long eventTimestamp, IWatermarkOutput output) - { - maxTimestamp = Math.Max(maxTimestamp, eventTimestamp); - } - - public void OnPeriodicEmit(IWatermarkOutput output) - { - output.EmitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis)); - } -} - -stream.AssignTimestampsAndWatermarks( - WatermarkStrategy.ForGenerator(() => new CustomWatermarkGenerator()) - .WithTimestampAssigner(e => e.Timestamp) -); -``` - -## Job Monitoring and Control - -### JobClient Operations - -```csharp -// Get job status -var status = await jobClient.GetJobStatusAsync(); -Console.WriteLine($"State: {status.State}"); -Console.WriteLine($"Parallelism: {status.Parallelism}/{status.MaxParallelism}"); - -// Get execution plan -var plan = await jobClient.GetExecutionPlanAsync(); - -// Cancel job -await jobClient.CancelAsync(); - -// Stop job gracefully -await jobClient.StopAsync(); -``` - -### Metrics Access - -```csharp -// Get job metrics -var metrics = await jobClient.GetMetricsAsync(); -foreach (var metric in metrics) -{ - Console.WriteLine($"{metric.Name}: {metric.Value}"); -} -``` - -## Performance Optimizations - -### Object Reuse - -```csharp -env.GetConfig().EnableObjectReuse(); // Reduce GC pressure -``` - -### Operator Chaining - -```csharp -// Disable chaining for specific operator -stream.Map(new HeavyOperation()) - .DisableChaining(); - -// Start new chain -stream.Map(new Operation1()) - .StartNewChain() - .Map(new Operation2()); -``` - -### Buffer Timeout - -```csharp -env.SetBufferTimeout(100); // Milliseconds -``` - -## API Mapping: Java Flink → FlinkDotNet - -| Java Flink API | FlinkDotNet C# API | Notes | -|----------------|-------------------|-------| -| `StreamExecutionEnvironment.getExecutionEnvironment()` | `Flink.GetExecutionEnvironment()` | Static method | -| `env.setParallelism(8)` | `env.SetParallelism(8)` | Fluent API | -| `stream.map(new MyMapper())` | `stream.Map(new MyMapper())` | IMapFunction | -| `stream.filter(new MyFilter())` | `stream.Filter(new MyFilter())` | IFilterFunction | -| `stream.keyBy(e -> e.key)` | `stream.KeyBy(e => e.Key)` | Lambda expressions | -| `stream.timeWindowAll(Time.hours(24))` | `stream.TimeWindowAll(Time.Hours(24))` | Time helper class | -| `env.execute("job-name")` | `await env.ExecuteAsync("job-name")` | Async by default | -| `env.executeAsync("job-name")` | `await env.ExecuteAsyncJob("job-name")` | Returns JobClient | - -## Complete Example - -```csharp -using FlinkDotNet.DataStream; - -var env = Flink.GetExecutionEnvironment(); - -// Configure Flink 2.1 features -env.SetParallelism(8) - .SetMaxParallelism(128) - .EnableAdaptiveScheduler() // Flink 2.1 adaptive scheduler - .EnableReactiveMode() // Flink 2.1 reactive mode - .EnableCheckpointing(5000) - .SetRestartStrategy("exponential-delay"); - -// Create stream with timestamp assignment -var events = env.FromKafka("events", "kafka:9093", "my-group") - .AssignTimestampsAndWatermarks( - WatermarkStrategy - .ForBoundedOutOfOrderness(Duration.OfSeconds(5)) - .WithTimestampAssigner(e => e.Timestamp) - ); - -// Process with dynamic partitioning -var processed = events - .Map(new EnrichEvent()) - .SetParallelism(4) - .SlotSharingGroup("enrichment") - .Rebalance() // Flink 2.1 rebalancing - .Filter(new ValidateEvent()) - .SetParallelism(8) - .KeyBy(e => e.CustomerId) - .TimeWindow(Time.Minutes(5)) - .Aggregate(new AggregateEvents()); - -// Sink with exactly-once guarantees -processed.SinkToKafka("results", "kafka:9093"); - -// Execute with monitoring -var jobClient = await env.ExecuteAsyncJob("flink21-example"); - -// Monitor execution -var status = await jobClient.GetJobStatusAsync(); -Console.WriteLine($"Job running with parallelism: {status.Parallelism}"); -``` - ---- - -## See Also - -- [API Reference](api-reference.md) - Complete DataStream API -- [Architecture Guide](architecture-and-usecases.md) - System design -- [Performance Benchmarks](performance-benchmarks.md) - Throughput metrics -- [TODO: Missing Flink 2.1.0 Features](../TODO/README.md) - Features not yet implemented (AI/ML, advanced Table API, etc.) \ No newline at end of file