Skip to content
232 changes: 0 additions & 232 deletions FlinkDotNet/Flink.JobBuilder.Tests/Tests/JobDefinitionModelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -527,236 +527,4 @@ public void WindowOperationDefinition_AllTimeUnits_Supported()
}

#endregion

#region Unified Sink API v2 Tests

[Test]
public void UnifiedSinkV2Definition_DefaultConstructor_InitializesProperties()
{
var sink = new UnifiedSinkV2Definition();

Assert.That(sink.Type, Is.EqualTo("unified_sink_v2"));
Assert.That(sink.SinkType, Is.EqualTo(string.Empty));
Assert.That(sink.WriterConfig, Is.Not.Null);
Assert.That(sink.CommitterConfig, Is.Null);
Assert.That(sink.Semantics, Is.EqualTo("at-least-once"));
Assert.That(sink.Stateful, Is.False);
Assert.That(sink.Properties, Is.Not.Null);
Assert.That(sink.Properties, Is.Empty);
}

[Test]
public void UnifiedSinkV2Definition_WithKafkaSink_StoresConfiguration()
{
var writerConfig = new SinkWriterConfig
{
ClassName = "KafkaWriter",
Properties = new Dictionary<string, object>
{
{ "topic", "output-topic" },
{ "bootstrapServers", "localhost:9092" }
}
};

var sink = new UnifiedSinkV2Definition
{
SinkType = "kafka",
WriterConfig = writerConfig,
Semantics = "at-least-once",
Stateful = false
};

Assert.That(sink.SinkType, Is.EqualTo("kafka"));
Assert.That(sink.WriterConfig.ClassName, Is.EqualTo("KafkaWriter"));
Assert.That(sink.WriterConfig.Properties["topic"], Is.EqualTo("output-topic"));
Assert.That(sink.WriterConfig.Properties["bootstrapServers"], Is.EqualTo("localhost:9092"));
Assert.That(sink.Semantics, Is.EqualTo("at-least-once"));
}

[Test]
public void UnifiedSinkV2Definition_WithExactlyOnceSemantics_RequiresCommitter()
{
var committerConfig = new SinkCommitterConfig
{
Enabled = true,
ClassName = "KafkaCommitter",
Properties = new Dictionary<string, object>
{
{ "transactionPrefix", "flink-kafka-" }
}
};

var sink = new UnifiedSinkV2Definition
{
SinkType = "kafka",
WriterConfig = new SinkWriterConfig(),
CommitterConfig = committerConfig,
Semantics = "exactly-once",
Stateful = true
};

Assert.That(sink.Semantics, Is.EqualTo("exactly-once"));
Assert.That(sink.CommitterConfig, Is.Not.Null);
Assert.That(sink.CommitterConfig.Enabled, Is.True);
Assert.That(sink.CommitterConfig.ClassName, Is.EqualTo("KafkaCommitter"));
Assert.That(sink.Stateful, Is.True);
}

[Test]
public void UnifiedSinkV2Definition_ImplementsISinkDefinition()
{
var sink = new UnifiedSinkV2Definition();

Assert.That(sink, Is.InstanceOf<ISinkDefinition>());
Assert.That(sink.Type, Is.EqualTo("unified_sink_v2"));
}

[Test]
public void UnifiedSinkV2Definition_WithCustomProperties_StoresAll()
{
var properties = new Dictionary<string, string>
{
{ "retry.max", "3" },
{ "timeout.ms", "5000" },
{ "compression", "gzip" }
};

var sink = new UnifiedSinkV2Definition
{
SinkType = "custom",
Properties = properties
};

Assert.That(sink.Properties, Has.Count.EqualTo(3));
Assert.That(sink.Properties["retry.max"], Is.EqualTo("3"));
Assert.That(sink.Properties["timeout.ms"], Is.EqualTo("5000"));
Assert.That(sink.Properties["compression"], Is.EqualTo("gzip"));
}

[Test]
public void SinkWriterConfig_DefaultConstructor_InitializesEmpty()
{
var config = new SinkWriterConfig();

Assert.That(config.ClassName, Is.EqualTo(string.Empty));
Assert.That(config.Properties, Is.Not.Null);
Assert.That(config.Properties, Is.Empty);
}

[Test]
public void SinkWriterConfig_WithProperties_StoresAll()
{
var config = new SinkWriterConfig
{
ClassName = "MyWriter",
Properties = new Dictionary<string, object>
{
{ "batchSize", 100 },
{ "flushInterval", 5000 },
{ "bufferSize", 1024 }
}
};

Assert.That(config.ClassName, Is.EqualTo("MyWriter"));
Assert.That(config.Properties, Has.Count.EqualTo(3));
Assert.That(config.Properties["batchSize"], Is.EqualTo(100));
Assert.That(config.Properties["flushInterval"], Is.EqualTo(5000));
}

[Test]
public void SinkCommitterConfig_DefaultConstructor_InitializesEmpty()
{
var config = new SinkCommitterConfig();

Assert.That(config.Enabled, Is.False);
Assert.That(config.ClassName, Is.Null);
Assert.That(config.Properties, Is.Not.Null);
Assert.That(config.Properties, Is.Empty);
}

[Test]
public void SinkCommitterConfig_WithProperties_StoresAll()
{
var config = new SinkCommitterConfig
{
Enabled = true,
ClassName = "MyCommitter",
Properties = new Dictionary<string, object>
{
{ "checkpointMode", "exactly-once" },
{ "commitTimeout", 60000 }
}
};

Assert.That(config.Enabled, Is.True);
Assert.That(config.ClassName, Is.EqualTo("MyCommitter"));
Assert.That(config.Properties, Has.Count.EqualTo(2));
Assert.That(config.Properties["checkpointMode"], Is.EqualTo("exactly-once"));
}

[Test]
public void JobDefinition_WithUnifiedSinkV2_CanBeAssigned()
{
var sink = new UnifiedSinkV2Definition
{
SinkType = "kafka",
WriterConfig = new SinkWriterConfig { ClassName = "KafkaWriter" },
Semantics = "exactly-once"
};

var jobDef = new JobDefinition
{
Source = new KafkaSourceDefinition { Topic = "input" },
Sink = sink
};

Assert.That(jobDef.Sink, Is.InstanceOf<UnifiedSinkV2Definition>());
var unifiedSink = jobDef.Sink as UnifiedSinkV2Definition;
Assert.That(unifiedSink, Is.Not.Null);
Assert.That(unifiedSink.SinkType, Is.EqualTo("kafka"));
Assert.That(unifiedSink.Semantics, Is.EqualTo("exactly-once"));
}

[Test]
public void UnifiedSinkV2Definition_Serialization_RoundTrip()
{
var original = new UnifiedSinkV2Definition
{
SinkType = "kafka",
WriterConfig = new SinkWriterConfig
{
ClassName = "KafkaWriter",
Properties = new Dictionary<string, object>
{
{ "topic", "test" },
{ "servers", "localhost:9092" }
}
},
CommitterConfig = new SinkCommitterConfig
{
Enabled = true,
ClassName = "KafkaCommitter"
},
Semantics = "exactly-once",
Stateful = true,
Properties = new Dictionary<string, string> { { "key", "value" } }
};

var json = System.Text.Json.JsonSerializer.Serialize<ISinkDefinition>(original);
var deserialized = System.Text.Json.JsonSerializer.Deserialize<ISinkDefinition>(json);

Assert.That(deserialized, Is.Not.Null);
Assert.That(deserialized, Is.InstanceOf<UnifiedSinkV2Definition>());

var sink = deserialized as UnifiedSinkV2Definition;
Assert.That(sink, Is.Not.Null);
Assert.That(sink.SinkType, Is.EqualTo("kafka"));
Assert.That(sink.Semantics, Is.EqualTo("exactly-once"));
Assert.That(sink.Stateful, Is.True);
Assert.That(sink.WriterConfig.ClassName, Is.EqualTo("KafkaWriter"));
Assert.That(sink.CommitterConfig, Is.Not.Null);
Assert.That(sink.CommitterConfig.Enabled, Is.True);
}

#endregion
}
Loading
Loading