FixAntenna/NetCore/FixEngine/Session/IoThreads/NoQueueMessagePumper.cs (385 lines of code) (raw):

// Copyright (c) 2021 EPAM Systems // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using System; using System.IO; using System.Threading; using Epam.FixAntenna.NetCore.Common; using Epam.FixAntenna.NetCore.Common.Logging; using Epam.FixAntenna.NetCore.Common.Utils; using Epam.FixAntenna.NetCore.FixEngine.Session.IoThreads.Bean; using Epam.FixAntenna.NetCore.FixEngine.Session.Util; using Epam.FixAntenna.NetCore.FixEngine.Storage; using Epam.FixAntenna.NetCore.FixEngine.Storage.Queue; using Epam.FixAntenna.NetCore.FixEngine.Transport; using Epam.FixAntenna.NetCore.Helpers; using Epam.FixAntenna.NetCore.Message; namespace Epam.FixAntenna.NetCore.FixEngine.Session.IoThreads { internal class NoQueueMessagePumper : IMessagePumper { private static readonly ILog Log = LogFactory.GetLog(typeof(NoQueueMessagePumper)); private IExtendedFixSession _fixSession; private IFixMessageFactory _fixMessageFactory; private IMessageStorage _outgoingLog; private IFixTransport _transport; private SessionParameters _sessionParameters; private SerializationContext _context; private ISessionSequenceManager _sequenceManager; private ByteBuffer _messageBuffer; private readonly object _lock = new object(); private readonly IQueue<FixMessageWithType> _queue; private bool _statisticEnabled; private MessageStatistic _messageStatistic; private long _messageProcessedTimestamp; private volatile bool _started; public Thread WorkerThread => Thread.CurrentThread; public NoQueueMessagePumper(IExtendedFixSession extendedFixSession, IQueue<FixMessageWithType> queue, IMessageStorage @out, IFixMessageFactory messageFactory, IFixTransport transport, ISessionSequenceManager sequenceManager) { _fixSession = extendedFixSession; _queue = queue; _sessionParameters = extendedFixSession.Parameters; _messageBuffer = new ByteBuffer(); _outgoingLog = @out; _fixMessageFactory = messageFactory; _transport = transport; _sequenceManager = sequenceManager; } public long Init() { var configurationAdapter = new ConfigurationAdapter(_sessionParameters.Configuration); _statisticEnabled = configurationAdapter.IsMessageStatisticEnabled; if (_statisticEnabled) { _messageStatistic = new MessageStatistic(); } _context = new SerializationContext(_fixMessageFactory); return _outgoingLog.Initialize(); } public bool GracefulShutdown { get; set; } public void RejectQueueMessages() { Log.Debug("Reject queue messages"); try { Lock(); while (_queue.TotalSize > 0) { try { RejectMessage(_queue.Poll()); } catch (Exception e) { Log.Error("Failed to reject message in queue", e); } finally { _queue.Commit(); } } } finally { Unlock(); } } public void RejectFirstQueueMessage() { Log.Debug("Reject first queue message"); try { Lock(); if (_queue.TotalSize > 0) { try { RejectMessage(_queue.Poll()); } catch (Exception e) { Log.Error("Failed to reject message in queue", e); } finally { _queue.Commit(); } } } finally { Unlock(); } } private void RejectMessage(FixMessageWithType messageWithType) { if (messageWithType.FixMessage != null && messageWithType.IsApplicationLevelMessage()) { // process only application level messages var message = messageWithType.PrepareMsgForReject(); if (Log.IsWarnEnabled) { Log.Warn("Reject message: " + message); } _fixSession.RejectMessageListener.OnRejectMessage(message); } else { if (Log.IsDebugEnabled) { Log.Debug("Skip session message: " + messageWithType.ToString()); } } } public bool IsStatisticEnabled => _statisticEnabled; public MessageStatistic Statistic => _messageStatistic; public long MessageProcessedTimestamp => Interlocked.Read(ref _messageProcessedTimestamp); public bool SendOutOfTurn(string msgType, FixMessage content) { try { Lock(); return SerializeAndSendOrPutInQueueIfNotStarted(msgType, content); } finally { Unlock(); } } public void Start() { try { Lock(); while (!_queue.IsEmpty) { var fieldListWithType = _queue.Poll(); SerializeAndSend(fieldListWithType.MessageType, fieldListWithType.FixMessage, null, false); _queue.Commit(); } _started = true; UpdateMessageProcessedTimestamp(); } finally { Unlock(); } } public void Shutdown() { try { Lock(); _started = false; CloseOutgoingLog(); Log.Debug("Pumper stopped"); } finally { Unlock(); } } private void CloseOutgoingLog() { try { _outgoingLog.Dispose(); } catch (IOException e) { if (Log.IsDebugEnabled) { Log.Warn("Outgoing Log file cannot be closed", e); } else { Log.Warn("Outgoing Log file cannot be closed. " + e.Message); } } } public int Send(string type, FixMessage content, FixSessionSendingType optionMask) { try { Lock(); SendOrWait(type, content, null); return 0; } finally { Unlock(); } } public int Send(FixMessage content, ChangesType? allowedChangesType) { try { Lock(); SendOrWait(null, content, allowedChangesType); return 0; } finally { Unlock(); } } public int Send(FixMessage content, ChangesType? allowedChangesType, FixSessionSendingType optionMask) { return Send(content, allowedChangesType); } public int Send(string type, FixMessage message) { Send(type, message, FixSessionSendingType.DefaultSendingOption); return 0; } public void SendMessages(int messageCount) { throw new InvalidOperationException("Should not be used in the context"); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { } private bool SendOrWait(string msgType, FixMessage content, ChangesType? changesType) { try { while (!_queue.IsAllEmpty) { Monitor.Wait(_lock); } while (_started && _queue.OutOfTurnOnlyMode) { Monitor.Wait(_lock, 1); } } catch (ThreadInterruptedException) { Log.Warn("Thread was interrupted while waiting for the end of initialization."); } return SerializeAndSend(msgType, content, changesType, true); } private bool SerializeAndSendOrPutInQueueIfNotStarted(string msgType, FixMessage content) { if (_started) { return SerializeAndSend(msgType, content, null, false); } return PutInQueue(msgType, content); } private bool SerializeAndSend(string msgType, FixMessage content, ChangesType? changesType, bool checkState) { if (checkState) { if (!_started) { throw new InvalidOperationException("Pumper is not started, it cannot send any messages in this state."); } if (((AbstractFixSession)_fixSession).AlreadySendingLogout) { throw new InvalidOperationException("Session is shutting down, cannot send any messages in this state."); } } try { var offset = _messageBuffer.Offset; if (changesType != null) { _fixMessageFactory.Serialize(content, changesType, _messageBuffer, _context); } else { _fixMessageFactory.Serialize(null, msgType, content, _messageBuffer, _context); } var len = _messageBuffer.Offset - offset; long written = _transport.Write(_messageBuffer, offset, len); if (written != len) { throw new IOException("Transport sent only part of message (" + written + " vs " + len + ")"); } if (msgType != null || changesType != null) { try { _sequenceManager.IncrementOutSeqNum(); } catch (IOException e) { Log.Error("Failed to increment sequence", e); return false; } } UpdateMessageProcessedTimestamp(); _outgoingLog.AppendMessage(_messageBuffer.GetByteArray(), offset, len); try { _fixSession.ExtendedFixSessionListener.OnMessageSent(_messageBuffer.GetByteArray(), offset, len); } catch (Exception e) { if (Log.IsDebugEnabled) { Log.Warn("ExtendedFIXSessionListener::OnMessageSent thrown error. Cause:" + e.Message, e); } else { Log.Warn("ExtendedFIXSessionListener::OnMessageSent thrown error. Cause:" + e.Message); } } } catch (IOException e) { Log.Error("Failed to send message: " + e.Message, e); ReportErrorAndShutdown(e); throw new MessageNotSentException("Message wasn't sent due to error", e); } finally { _messageBuffer.ResetBuffer(); } return true; } private void ReportErrorAndShutdown(Exception ex) { var error = "IOError in message pumper. Some messages have not been sent: " + (_queue.TotalSize > 0 ? _queue.TotalSize : 1); _fixSession.ErrorHandler.OnError(error, ex); if (Log.IsDebugEnabled) { Log.Warn(error, ex); } else { Log.Warn(error + ". " + ex.Message); } _fixSession.Shutdown(DisconnectReason.BrokenConnection, false); } private bool PutInQueue(string msgType, FixMessage content) { var type = string.IsNullOrEmpty(msgType) ? content.MsgType : msgType.AsByteArray(); if (type == null || !RawFixUtil.IsSessionLevelType(type)) { throw new InvalidOperationException("Application message not allowed while connection is not initialized"); } return _queue.AddOutOfTurn(FixMessageWithTypePoolFactory.GetFixMessageWithTypeFromPool(content, msgType)); } private void UpdateMessageProcessedTimestamp() { Interlocked.Exchange(ref _messageProcessedTimestamp, DateTimeHelper.CurrentMilliseconds); } public virtual void Lock() { Monitor.Enter(_lock); } public virtual void Unlock() { Monitor.Pulse(_lock); Monitor.Exit(_lock); } public void Join() { } } }