This plugin provides the nuget packages from KafkaFlow as Pibox plugin.
The containers that you need are also provided in the docker-compose.yaml file. You just need to configure the consumer( s) and the producer(s) and use them accordingly.
The docker-compose.yaml will run the following containers:
zookeeper
broker
schema-registry
control-center
To run all containers
docker-composeup
To stop and remove the currently running containers
docker-composedown
Usage
Plugin configuration
publicclassKafkaFlowExamplePlugin:IPluginServiceConfiguration {privatereadonlyIConfiguration _configuration;privatereadonlyILogger? _logger;publicKafkaFlowExamplePlugin(IConfiguration configuration,ILogger<KafkaFlowExamplePlugin>? logger) { _configuration = configuration; _logger = logger; } //Configure your consumers & producerspublicvoidConfigureServices(IServiceCollection serviceCollection) {serviceCollection.ConfigureKafka(_configuration, _logger, kafkaFlowBuilder => kafkaFlowBuilder //possible configurations: // producerConfig is none // producer is added to a Dictionary<Type, Action<IProducerConfigurationBuilder>> // type is typeof(TMessage) .AddTypedProducer<TMessage>("protobuf-topic") // producer is added to a Dictionary<Type, Action<IProducerConfigurationBuilder>> // type is typeof(TMessage) .AddTypedProducer<TMessage>("protobuf-topic", producerConfig) // producerConfig is none // producer is added to a List<(string, Action<IProducerConfigurationBuilder>) // the string added is typeof(TProducer).Name .AddProducer<TProducer>("protobuf-topic") // producer is added to a List<(string, Action<IProducerConfigurationBuilder>) // the string added is typeof(TProducer).Name .AddProducer<TProducer>("protobuf-topic", producerConfig) // consumer is added to a List<Action<IConsumerConfigurationBuilder>> .AddConsumer<TMessageHandler>("protobuf-topic","mygroup")); // consumer is added to a List<Action<IConsumerConfigurationBuilder>> // dead letter message is produced on dead letter topic in case of unsuccessful processing of the message
.AddConsumerWithDeadLetter<TMessageHandler, TMessage, TDeadLetterMessage>("protobuf-topic", "dead-letter-topic", "mygroup")
} }
Click here to read more about the IServiceCollection interface.
publicclassProtobufMessageHandler:IMessageHandler<ProtobufLogMessage>{publicTaskHandle(IMessageContext context,ProtobufLogMessage message) { // Do something }}
Consumer with dead letter message producer usage
// The DltMessageHandler inherits from IMessageHandler<TMessage> (used to create a message handler)// It also produces a dead letter message if there was an exception in ProcessMessageAsyncpublicclassProtobufDltMessageHandler:DltMessageHandler<TMessage,TDeadLetterMessage> {protectedoverrideasyncTaskProcessMessageAsync(IMessageContext context,TMessage message) { // Do something }protectedoverrideTDeadLetterMessageHandleError(IMessageContext context,TMessage message,Error error) { // Do something } }