src/Epam.Kafka.PubSub/Publication/Topics/PublicationTopicWrapperExtensions.cs (49 lines of code) (raw):
// Copyright © 2024 EPAM Systems
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Epam.Kafka.PubSub.Common.Pipeline;
using Epam.Kafka.PubSub.Utils;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Epam.Kafka.PubSub.Publication.Topics;
internal static class PublicationTopicWrapperExtensions
{
public static IPublicationTopicWrapper<TKey, TValue> CreatePublicationTopicWrapper<TKey, TValue>(
this IKafkaFactory kafkaFactory,
IPublicationTopicWrapperOptions options,
PipelineMonitor monitor, ILogger? logger = null)
{
if (kafkaFactory == null) throw new ArgumentNullException(nameof(kafkaFactory));
if (monitor == null) throw new ArgumentNullException(nameof(monitor));
logger ??= NullLogger.Instance;
var registry = new Lazy<ISchemaRegistryClient>(() =>
kafkaFactory.GetOrCreateSchemaRegistryClient(options.GetCluster()));
ISerializer<TKey>? ks;
ISerializer<TValue>? vs;
try
{
ks = (ISerializer<TKey>?)options.GetKeySerializer()?.Invoke(registry);
vs = (ISerializer<TValue>?)options.GetValueSerializer()?.Invoke(registry);
}
catch (Exception exception)
{
exception.DoNotRetryPipeline();
throw;
}
ProducerConfig config = kafkaFactory.CreateProducerConfig(options.GetProducer());
config = config.Clone(monitor.NamePlaceholder);
if (config.All(x => x.Key != KafkaConfigExtensions.DotnetLoggerCategoryKey))
{
config.SetDotnetLoggerCategory(monitor.FullName);
}
bool implicitPreprocessor = ks != null || vs != null || config.TransactionalId != null;
IPublicationTopicWrapper<TKey, TValue> result = options.GetSerializationPreprocessor() ?? implicitPreprocessor
? new PublicationSerializeKeyAndValueTopicWrapper<TKey, TValue>(kafkaFactory, monitor,
config, options, logger,
ks, vs)
: new PublicationTopicWrapper<TKey, TValue>(kafkaFactory, monitor,
config, options, logger,
ks, vs);
return result;
}
}