-
Notifications
You must be signed in to change notification settings - Fork 0
Kafka: Configuration
To add platform Kafka to DI container, you should call AddPlatformKafka extension method on an IServiceCollection instance. This method accepts a delegate, that have to perform some configuration actions on a builder. This builder allows you to specify configuration, add consumers and producers.
collection.AddPlatformKafka(builder => builder
.ConfigureOptions(configuration.GetSection("Presentation:Kafka"))){
"Host": string,
"SecurityProtocol": string enum,
"SslCaPem": string,
"SaslMechanism": string enum,
"SaslUsername": string,
"SaslPassword": string
}- Host
Bootstrap servers of Kafka cluster. - SecurityProtocol
Enum that specifies a security protocol used for connection to Kafka cluster. Allowed values:Plaintext(default),Ssl,SaslPlaintext,SaslSsl). This value determines whether some other configuration values are required or not. - SslCaPem
String representation of CA certificate. Required if SecurityProtocol is set toSslorSaslSsl. - SaslMechanism
Enum that specifies Sasl mechanism used for connection to Kafka cluster. Allowed values:Gssapi,Plain,ScramSha256,ScramSha512,OAuthBearer. Used if SecurityProtocol is set toSaslPlaintextorSaslSsl, but not required (default value =null). - SaslUsername
Username used to authorise when connecting to Kafka cluster. Required if SecurityProtocol is set toSaslPlaintextorSaslSsl. - SaslPassword
Password used to authorise when connecting to Kafka cluster. Required if SecurityProtocol is set toSaslPlaintextorSaslSsl.
- Plaintext
No additional security configuration needed - Ssl
RequiresSslCaPem - SaslPlaintext
RequiresSaslUsernameandSaslPassword - SaslSsl
RequiresSaslUsername,SaslPasswordandSslCaPem
{
"Presentation": {
"Kafka": {
"Host": "localhost:8001",
"SecurityProtocol": "SaslSsl",
"SslCaPem": "...",
"SaslMechanism": "ScramSha512",
"SaslUsername": "username",
"SaslPassword": "password"
}
}
}To register a consumer, you have to use same builder, provided in AddPlatformKafka extension method.
Call method AddConsumer in builder's call chain, after ConfigureOptions (or other AddConsumer, AddProducer) call. It accepts a delegate, used to configure consumer's builder.
You must configure message's key and value type, configuration, key and value deserialisers and message handler. All these methods can only be called in a strict order, which is shown in a snipped below. Method HandleWith<> accepts a class, implementing IKafkaConsumerHandler<,> as it's type parameter.
string group = Assembly.GetExecutingAssembly().GetName().Name ?? string.Empty;
collection.AddPlatformKafka(builder => builder
.ConfigureOptions(...)
.AddConsumer(b => b
.WithKey<MessageKey>()
.WithValue<MessageValue>()
.WithConfiguration(configuration.GetSection("Presentation:Kafka:Consumers:MessageName"), c => c.WithGroup(group))
.DeserializeKeyWithNewtonsoft()
.DeserializeValueWithNewtonsoft()
.HandleWith<MessageHandler>()));You can configure custom deserialiser types (that have to implement IDeserializer<> interface) by using .DeserializeKeyWith<> and .DeserializeKeyWith<T>(IDeserializer<T>) (same for values). Library provides an extensions for using protobuf and Newtonsoft.Json deserialisers out of the box (.DeserializeKeyWithProto, .DeserializeKeyWithNewtonsoft)
WithConfiguration method can accept a delegate as a second parameter for additional configuration, it allows to configure consumer group and instance id for consumer via WithGroup and WithInstanceId methods respectively.
{
"IsDisabled": bool,
"Topic": string,
"Group": string,
"InstanceId": string,
"ParallelismDegree": int,
"BufferSize": int,
"BufferWaitLimit": string timespan,
"ReadLatest": bool
}- IsDisabled
Specifies whether consumer is disabled. As consumer options are used asIOptionsMonitor<>, it will react in real time to this options being changed. - Topic
Name of the topic to consume from. Required. Changing this option during application execution will lead to undefined behaviour. - Group
Consumer group name used when connecting to Kafka cluster. Not required, default is empty string. - InstanceId
Id of instance in specified consumer group. Not required, default is empty string. - ParallelismDegree
Number of threads that will handle consumed messages. Not required, default is1, must be >= 1. - BufferSize
Consumer handler receives messages in batches, this parameter is used to configure maximum batch size. Not required, default is1, must be >= 1. - BufferWaitLimit
To avoid consumer starvation when batching, you can configure maximum time waiting for a single batch accumulation. If the wait time for a singe batch exceeds this threshold, the batch will be yielded with items that were already collected. Not required when BufferSize =1, otherwise must be >TimeSpan.Zero. - ReadLatest
Configures an offset selection strategy for consumer's first subscription to the topic. Whentruethe offset would default to the latest message in the topic, whenfalsethe offset will be configured to the first message in the topic. Not required, default =false.
{
"Presentation": {
"Kafka": {
"Consumers": {
"MessageName": {
"IsDisabled": false,
"Topic": "my_topic",
"ParallelismDegree": 2,
"BufferSize": 100,
"BufferWaitLimit": "00:00:01.500",
"ReadLatest": false
},
},
}
}
}In this example
BufferWaitLimitis set to 1 second and 500 milliseconds.
Use have to configure Itmo.Dev.Platform.MessagePersistence prior to configuring Kafka inbox.
To use inbox with Kafka consumer, you have to end your configuration with HandleInboxWith<> call, instead of HandleWith<> and pass a type implementing IKafkaInboxHandler<,> to it's generic parameter.
collection.AddPlatformKafka(builder => builder
.ConfigureOptions(...)
.AddConsumer(b => b
.WithKey<MessageKey>()
.WithValue<MessageValue>()
.WithConfiguration(...)
.DeserializeKeyWithNewtonsoft()
.DeserializeValueWithNewtonsoft()
.HandleInboxWith<InboxMessageHandler>()));To configure inbox, you must add an Inbox subsection in configuration you pass to the consumer builder.
{
"BatchSize": int,
"PollingDelay": string timespan,
"DefaultHandleResult": string enum
}- BatchSize
Maximum number of messages that inbox processor will poll from database. Required, must be >=1. - PollingDelay
Delay between every polls that inbox processor will execute. Required, must be >TimeSpan.Zero. - DefaultHandleResult
Result of handling inbox message when no result was explicitly set. Not required, default =Success.
{
"Presentation": {
"Kafka": {
"Consumers": {
"MessageName": {
...
"Inbox": {
"BatchSize": 100,
"PollingDelay": "00:00:00.500",
"DefaultHandleResult": "Ignored"
}
},
},
}
}
}In this example
PollingDelayis set to 500 milliseconds.
To register a producer, you have to use same builder, provided in AddPlatformKafka extension method.
Call method AddProducer in builder's call chain, after ConfigureOptions (or other AddProducer, AddConsumer) call. It accepts a delegate, used to configure producer's builder.
collection.AddPlatformKafka(builder => builder
.ConfigureOptions(...)
.AddProducer(b => b
.WithKey<MessageKey>()
.WithValue<MessageValue>()
.WithConfiguration(configuration.GetSection("Presentation:Kafka:Producers:MessageName"))
.SerializeKeyWithNewtonsoft()
.SerializeValueWithNewtonsoft()));You can configure custom deserialiser types (that have to implement IDeserializer<> interface) by using .DeserializeKeyWith<> and .DeserializeKeyWith<T>(IDeserializer<T>) (same for values). Library provides an extensions for using protobuf and Newtonsoft.Json deserialisers out of the box (.DeserializeKeyWithProto, .DeserializeKeyWithNewtonsoft)
{
"Topic": string,
"MessageMaxBytes": int
}- Topic
Name of the topic to produce to. Required. Changing this option during application execution will lead to undefined behaviour. - MessageMaxBytes
Max message size in bytes that producer can produce to topic. Not required, default =1_000_000(must be >=1)
{
"Presentation": {
"Kafka": {
"Producers": {
"MessageName": {
"Topic": "my_topic",
"MessageMaxBytes": 2000000
},
},
}
}
}Use have to configure Itmo.Dev.Platform.MessagePersistence prior to configuring Kafka inbox.
To use outbox with Kafka producer, you have to end your configuration with WithOutbox method call.
collection.AddPlatformKafka(builder => builder
.ConfigureOptions(...)
.AddProducer(b => b
.WithKey<MessageKey>()
.WithValue<MessageValue>()
.WithConfiguration(configuration.GetSection("Presentation:Kafka:Producers:MessageName"))
.SerializeKeyWithNewtonsoft()
.SerializeValueWithNewtonsoft())
.WithOutbox());To configure outbox, you must add an Outbox subsection in configuration you pass to the producer builder.
{
"BatchSize": int,
"PollingDelay": string timespan,
"DefaultHandleResult": string enum,
"Strategy": string enum
}- BatchSize
Maximum number of messages that inbox processor will poll from database. Required, must be >=1. - PollingDelay
Delay between every polls that inbox processor will execute. Required, must be >TimeSpan.Zero. - DefaultHandleResult
Result of handling inbox message when no result was explicitly set. Not required, default =Success. - Strategy
Strategy of how outbox will be used.Always– messages will always be placed into outbox before producing.Fallback– messages will be placed into outbox only if direct produce to Kafka topic fails. Not required, default =Always.
{
"Presentation": {
"Kafka": {
"Producers": {
"MessageName": {
...
"Outbox": {
"BatchSize": 100,
"PollingDelay": "00:00:00.500",
"DefaultHandleResult": "Ignored",
"Strategy": "Fallback"
}
},
},
}
}
}In this example
PollingDelayis set to 500 milliseconds.