in src/Epam.Kafka/Options/KafkaClusterOptions.cs [47:74]
public KafkaClusterOptions WithOAuthHandler(Func<string?, OAuthRefreshResult> createToken, bool throwIfAlreadySet = false)
{
if (createToken == null)
{
throw new ArgumentNullException(nameof(createToken));
}
this.OauthHandlerThrow = throwIfAlreadySet;
this.OauthHandler = (client, s) =>
{
#pragma warning disable CA1031 // catch all exceptions and invoke error handler according to kafka client requirements
try
{
OAuthRefreshResult result = createToken(s);
client.OAuthBearerSetToken(result.TokenValue,
Timestamp.DateTimeToUnixTimestampMs(result.ExpiresAt.UtcDateTime), result.PrincipalName,
result.Extensions);
}
catch (Exception exception)
{
client.OAuthBearerSetTokenFailure(exception.Message);
}
#pragma warning restore CA1031
};
return this;
}