FixAntenna/NetCore/FixEngine/Session/IoThreads/MessageReader.cs (327 lines of code) (raw):

// Copyright (c) 2021 EPAM Systems // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using System; using System.IO; using System.Threading; using Epam.FixAntenna.NetCore.Common; using Epam.FixAntenna.NetCore.Common.Logging; using Epam.FixAntenna.NetCore.Configuration; using Epam.FixAntenna.NetCore.FixEngine.Session.IoThreads.Bean; using Epam.FixAntenna.NetCore.FixEngine.Session.MessageHandler; using Epam.FixAntenna.NetCore.FixEngine.Session.MessageHandler.Global; using Epam.FixAntenna.NetCore.FixEngine.Session.Util; using Epam.FixAntenna.NetCore.FixEngine.Storage; using Epam.FixAntenna.NetCore.FixEngine.Transport; using Epam.FixAntenna.NetCore.Message; namespace Epam.FixAntenna.NetCore.FixEngine.Session.IoThreads { /// <summary> /// The <c>MessageReader</c> reads message from transport. /// </summary> /// <seealso cref="IMessageStorage"></seealso> /// <seealso cref="IFixTransport"></seealso> internal sealed class MessageReader : AffinitySupportThread, IMessageReader { private static readonly ILog Log = LogFactory.GetLog(typeof(MessageReader)); private const int Second = 1000; private readonly IExtendedFixSession _fixSession; private readonly IFixTransport _transport; private readonly ICompositeMessageHandlerListener _compositeListener; private volatile bool _gracefulShutdown; private volatile bool _shutdown; private long _messageProcessedTimestamp; private bool _statisticEnabled; private MessageStatistic _messageStatistic; private readonly FixMessage _mainMessageForParse = FixMessageFactory.NewInstanceFromPoolForEngineParse(); private readonly int _shutdownTimeout; /// <summary> /// Creates <c>MessageReader</c>. /// </summary> /// <param name="messageStorage"> the message storage </param> /// <param name="session"> the session </param> /// <param name="compositeFixSessionListener"> the session listener </param> /// <param name="transport"> the transport </param> public MessageReader(IExtendedFixSession session, IMessageStorage messageStorage, ICompositeMessageHandlerListener compositeFixSessionListener, IFixTransport transport) : base("MRThread<:" + session.Parameters.SessionId) { _fixSession = session; IncomingMessageStorage = messageStorage; _compositeListener = compositeFixSessionListener; _transport = transport; _shutdownTimeout = session.Parameters.Configuration.GetPropertyAsInt(Config.ReadingThreadShutdownTimeout); if (_shutdownTimeout < 0) { _shutdownTimeout = session.Parameters.HeartbeatInterval; } } public long Init(ConfigurationAdapter configurationAdapter) { var number = IncomingMessageStorage.Initialize(); _statisticEnabled = configurationAdapter.IsMessageStatisticEnabled; if (_statisticEnabled) { _messageStatistic = new MessageStatistic(); } Interlocked.Exchange(ref _messageProcessedTimestamp, DateTimeHelper.CurrentMilliseconds); return number; } private ISessionSequenceManager SequenceManager => ((AbstractFixSession)_fixSession).SequenceManager; /// <summary> /// Shutdown the reader. /// </summary> public override void Shutdown() { if (!WorkerThread.IsAlive) { CloseStorage(); } _shutdown = true; try { try { if (WorkerThread.IsAlive) { if (Thread.CurrentThread != WorkerThread) { WorkerThread.Join(Math.Max(Second, _shutdownTimeout * Second)); if (WorkerThread.IsAlive) { WorkerThread.Interrupt(); } } } } catch (ThreadInterruptedException e) { Log.Debug("Problem with closing of MessageReader", e); // intentionally blank } CloseStorage(); Log.Debug("Reader stopped"); } catch (Exception e) { Log.Debug("Problem with closing of MessageReader", e); } } public long MessageProcessedTimestamp { get => Interlocked.Read(ref _messageProcessedTimestamp); set => Interlocked.Exchange(ref _messageProcessedTimestamp, value); } /// <returns> true is statistic is enabled </returns> public bool IsStatisticEnabled => _statisticEnabled; /// <summary> /// Gets statistic of processed messages. /// WARNING: Before the call to ensure that the statistics are included. /// </summary> /// <value> MessageStatistic </value> /// <exception cref="InvalidOperationException"> if <c>statisticEnabled</c> is false </exception> /// <seealso cref="IsStatisticEnabled()"> </seealso> public MessageStatistic MessageStatistic { get { if (_statisticEnabled) { return _messageStatistic; } throw new InvalidOperationException("Message statistic is disabled"); } } protected override void Dispose(bool disposing) { base.Dispose(disposing); try { _mainMessageForParse.ReleaseInstance(); } catch (Exception t) { if (Log.IsDebugEnabled) { Log.Warn("Can't release instance: " + t.Message, t); } else { Log.Warn("Can't release instance: " + t.Message); } } } private FixVersionContainer GetFixVersionForMessage() { var fixVersion = _fixSession.Parameters.FixVersionContainer; if (fixVersion.FixVersion == FixVersion.Fixt11) { fixVersion = _fixSession.Parameters.AppVersionContainer; } return fixVersion; } protected override void Run() { if (Log.IsTraceEnabled) { Log.Trace("Start MRThread: " + _fixSession); } var configuration = _fixSession.Parameters.Configuration; ApplyAffinity(configuration.GetPropertyAsInt(Config.RecvCpuAffinity), configuration.GetPropertyAsInt(Config.CpuAffinity)); Thread.BeginThreadAffinity(); var buf = new MsgBuf(); buf.FixMessage = _mainMessageForParse; _mainMessageForParse.SetFixVersion(GetFixVersionForMessage()); while (!_shutdown) { try { ProcessBufferedMessages(); _transport.ReadMessage(buf); _compositeListener.OnMessage(buf); //TBD! do timing only if required MessageProcessedTimestamp = DateTimeHelper.CurrentMilliseconds; if (_statisticEnabled) { UpdateStatistic(buf); } } catch (GarbledMessageException ex) { LogGarbledMessageException(ex, buf); } catch (SkipMessageException ex) { LogSkipMessageException(ex, buf); } catch (ArgumentException ex) { LogArgumentException(ex, buf); } catch (SequenceToLowException ex) { LogSequenceToLowException(ex, buf); } catch (SystemHandlerException ex) { LogSystemHandlerException(ex); } catch (Exception ex) { if (!GracefulShutdown) { ReportErrorAndShutdown(ex); } break; } finally { _mainMessageForParse.Clear(); } } CloseStorage(); if (Log.IsTraceEnabled) { Log.Trace("Stop MRThread: " + _fixSession); } Thread.EndThreadAffinity(); } private void LogGarbledMessageException(GarbledMessageException ex, MsgBuf buf) { if (ex.IsCritical()) { // garbled message, ignore and read next one _fixSession.ErrorHandler.OnError("Garbled message detected and will be ignored: " + buf, ex); } else { _fixSession.ErrorHandler.OnWarn("Garbled message detected and will be ignored: " + buf, ex); } } private static void LogSkipMessageException(SkipMessageException ex, MsgBuf buf) { if (ex.IsLogToLoggingSystem()) { if (Log.IsDebugEnabled) { Log.Info("Skip message: " + buf + (ex.Message == null ? "" : ". Reason: " + ex.Message), ex); } else { Log.Info("Skip message: " + buf + (ex.Message == null ? "" : ". Reason: " + ex.Message)); } } } private static void LogArgumentException(ArgumentException ex, MsgBuf buf) { if (Log.IsDebugEnabled) { Log.Warn("Invalid message received: " + buf, ex); } else { Log.Warn("Invalid message received: " + buf + ". Reason: " + ex.Message); } } private void LogSequenceToLowException(SequenceToLowException ex, MsgBuf buf) { _fixSession.ErrorHandler.OnError("Invalid message received: " + buf, ex); } private static void LogSystemHandlerException(SystemHandlerException ex) { if (Log.IsDebugEnabled) { Log.Warn("System problem detected: " + ex, ex); } else { Log.Warn("System problem detected: " + ex); } } /// <summary> /// Handles messages buffered during processing filling the gap /// </summary> /// <exception cref="Exception"> </exception> private void ProcessBufferedMessages() { var sequenceResendManager = SequenceManager.SeqResendManager; if (!sequenceResendManager.IsRrRangeActive && !sequenceResendManager.IsBufferEmpty) { try { sequenceResendManager.IsMessageProcessingFromBufferStarted = true; do { var bufferedMessage = sequenceResendManager.TakeMessageFromBuffer(); if (null != bufferedMessage && !IsIgnoredBufferedMessage(bufferedMessage)) { _compositeListener.OnMessage(new MsgBuf(bufferedMessage.AsByteArray())); } } while (!sequenceResendManager.IsBufferEmpty); } finally { sequenceResendManager.IsMessageProcessingFromBufferStarted = false; } } } private bool IsIgnoredBufferedMessage(FixMessage bufferedMessage) { var expectedSeqNum = SequenceManager.GetExpectedIncomingSeqNumber(); var msgSeqNum = bufferedMessage.MsgSeqNumber; return msgSeqNum < expectedSeqNum; } private void CloseStorage() { try { IncomingMessageStorage.Dispose(); } catch (IOException ex) { if (Log.IsDebugEnabled) { Log.Warn("Incoming log file cannot be closed", ex); } else { Log.Warn("Incoming log file cannot be closed. " + ex.Message); } } } private void UpdateStatistic(MsgBuf buf) { _messageStatistic.AddMessagesProcessed(); _messageStatistic.AddBytesProcessed(buf.Length); } private void ReportErrorAndShutdown(Exception e) { var error = "Abrupt session " + _fixSession + " termination."; Log.Warn(error); try { _transport.Close(); } catch (Exception ex) { Log.Error("Error to close transport", ex); } _fixSession.ErrorHandler.OnError(error, e); _fixSession.Shutdown(DisconnectReason.BrokenConnection, false); } /// <summary> /// Gets incoming message storage. /// </summary> public IMessageStorage IncomingMessageStorage { get; } /// <summary> /// Sets graceful shutdown flag. /// </summary> /// <value> the graceful shutdown flag </value> public bool GracefulShutdown { get => _gracefulShutdown; set => _gracefulShutdown = value; } } }