in src/Epam.Kafka.PubSub/KafkaBuilderExtensions.cs [130:165]
public static SubscriptionBuilder<TSubKey, TSubValue> AddReplication<TSubKey, TSubValue, TPubKey, TPubValue, THandler>(
this KafkaBuilder builder,
string name,
ServiceLifetime handlerLifetime = ServiceLifetime.Transient,
Func<Lazy<ISchemaRegistryClient>, ISerializer<TPubKey>>? keySerializer = null,
Func<Lazy<ISchemaRegistryClient>, ISerializer<TPubValue>>? valueSerializer = null,
Action<ProducerPartitioner>? partitioner = null)
where THandler : IConvertHandler<TPubKey, TPubValue, ConsumeResult<TSubKey, TSubValue>>
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}
if (name == null)
{
throw new ArgumentNullException(nameof(name));
}
builder.GetOrCreateContext().AddReplication(name);
Type handlerType = typeof(THandler);
TryRegisterHandler(builder.Services, handlerType, handlerLifetime);
return new ReplicationBuilder<TSubKey, TSubValue, TPubKey, TPubValue>(builder, name)
.WithOptions(x =>
{
x.Replication.ConvertHandlerType = handlerType;
x.Replication.KeyType = typeof(TPubKey);
x.Replication.ValueType = typeof(TPubValue);
x.Replication.KeySerializer = keySerializer;
x.Replication.ValueSerializer = valueSerializer;
partitioner?.Invoke(x.Replication.Partitioner);
});
}