public static SubscriptionBuilder AddReplication()

in src/Epam.Kafka.PubSub/KafkaBuilderExtensions.cs [130:165]


    public static SubscriptionBuilder<TSubKey, TSubValue> AddReplication<TSubKey, TSubValue, TPubKey, TPubValue, THandler>(
        this KafkaBuilder builder,
        string name,
        ServiceLifetime handlerLifetime = ServiceLifetime.Transient,
        Func<Lazy<ISchemaRegistryClient>, ISerializer<TPubKey>>? keySerializer = null,
        Func<Lazy<ISchemaRegistryClient>, ISerializer<TPubValue>>? valueSerializer = null,
        Action<ProducerPartitioner>? partitioner = null)
        where THandler : IConvertHandler<TPubKey, TPubValue, ConsumeResult<TSubKey, TSubValue>>
    {
        if (builder == null)
        {
            throw new ArgumentNullException(nameof(builder));
        }

        if (name == null)
        {
            throw new ArgumentNullException(nameof(name));
        }

        builder.GetOrCreateContext().AddReplication(name);

        Type handlerType = typeof(THandler);

        TryRegisterHandler(builder.Services, handlerType, handlerLifetime);

        return new ReplicationBuilder<TSubKey, TSubValue, TPubKey, TPubValue>(builder, name)
            .WithOptions(x =>
            {
                x.Replication.ConvertHandlerType = handlerType;
                x.Replication.KeyType = typeof(TPubKey);
                x.Replication.ValueType = typeof(TPubValue);
                x.Replication.KeySerializer = keySerializer;
                x.Replication.ValueSerializer = valueSerializer;
                partitioner?.Invoke(x.Replication.Partitioner);
            });
    }