FixAntenna/NetCore/FixEngine/Session/AbstractFixSession.cs (1,565 lines of code) (raw):

// Copyright (c) 2021 EPAM Systems // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using Epam.FixAntenna.Constants.Fixt11; using Epam.FixAntenna.NetCore.Common; using Epam.FixAntenna.NetCore.Common.Logging; using Epam.FixAntenna.NetCore.Common.Utils; using Epam.FixAntenna.NetCore.Configuration; using Epam.FixAntenna.NetCore.FixEngine.Manager; using Epam.FixAntenna.NetCore.FixEngine.Session.IoThreads; using Epam.FixAntenna.NetCore.FixEngine.Session.MessageHandler; using Epam.FixAntenna.NetCore.FixEngine.Session.Util; using Epam.FixAntenna.NetCore.FixEngine.Session.Validation; using Epam.FixAntenna.NetCore.FixEngine.Storage; using Epam.FixAntenna.NetCore.FixEngine.Storage.Queue; using Epam.FixAntenna.NetCore.FixEngine.Transport; using Epam.FixAntenna.NetCore.Message; using Epam.FixAntenna.NetCore.Validation; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Threading; using System.Threading.Tasks; using Epam.FixAntenna.NetCore.FixEngine.Scheduler; namespace Epam.FixAntenna.NetCore.FixEngine.Session { /// <summary> /// The abstract session implementation. /// Provides base functional for acceptor and initiator sessions. /// </summary> internal abstract class AbstractFixSession : IExtendedFixSession, ISessionStateListenSupport { protected readonly ILog Log; private const string DontRedefineType = ""; private const int Second = 1000; private static TimeSpan CheckHbAndTestRequestInterval = TimeSpan.FromSeconds(1); private static readonly FixMessage SeqNumResetTag = new FixMessage(); static AbstractFixSession() { SeqNumResetTag.AddTag(Tags.ResetSeqNumFlag, true); } internal AtomicBool AlreadyShuttingDown = true; internal AtomicBool AlreadySendingLogout = true; protected internal IFixTransport Transport; protected internal IQueue<FixMessageWithType> Queue; protected internal readonly HandlerChain Listener; private long[] _longAttrs = new long[ExtendedFixSessionAttribute.Values().Count]; private bool[] _boolAttrs = new bool[ExtendedFixSessionAttribute.Values().Count]; private readonly IDictionary<string, object> _attributes = new ConcurrentDictionary<string, object>(); private IMessageValidator _messageValidator; private long _established; protected internal bool Graceful; private readonly int _forceLogOffTimeout; private readonly int _logoutWaitTimeout; private int _reasonableTransmissionTimeMillis; private bool _enabledMessageRejecting; private FixMessage _testRequest = new FixMessage(); protected internal TestReqIdTimestamp TestRequestTime = new TestReqIdTimestamp(); protected internal MutableInteger SentTrNum = new MutableInteger(); internal IMessageReader Reader; internal IMessagePumper Pumper; protected internal readonly object SessionLock = new object(); private FixSessionListenerObserver _sessionStateListenerObserver; protected internal readonly ConfigurationAdapter ConfigAdapter; private PreparedMessageUtil _preparedMessageUtil; private volatile DisconnectReason _disconnectReason; private volatile bool _initialized; private bool _ignoreResetSeqNumFlagOnReset; protected internal SendingMode PreferredSendingMode; private readonly IDictionary<string, ConcurrentBag<IExtendedFixSessionAttributeListener>> _attributeListeners = new ConcurrentDictionary<string, ConcurrentBag<IExtendedFixSessionAttributeListener>>(); private readonly ISet<ITypedFixMessageListener> _outSessionLevelListeners = new HashSet<ITypedFixMessageListener>(); private protected SessionTaskScheduler Scheduler; /// <summary> /// Creates the <c>AbstractFIXSession</c>. /// </summary> public AbstractFixSession(IFixMessageFactory messageFactory, SessionParameters sessionParameters, HandlerChain fixSessionListener) { Log = LogFactory.GetLog(GetType()); FixSessionOutOfSyncListener = new FixSessionOutOfSyncListenerAdapter(this); _sessionStateListenerObserver = new FixSessionListenerObserver(this); if (Config.MaxTimeoutValue < sessionParameters.HeartbeatInterval) { // Bug 14419: Very lardge HBI cause antenna to close session Log.Error("HBI is too large - " + sessionParameters.HeartbeatInterval); throw new Exception("HBI is too large - " + sessionParameters.HeartbeatInterval); } FixSessionManager.Init(); MessageFactory = messageFactory; ConfigAdapter = new ConfigurationAdapter(sessionParameters.Configuration); PreferredSendingMode = ConfigAdapter.PreferredSendingMode; StorageFactory = ReflectStorageFactory.CreateStorageFactory(ConfigAdapter.Configuration); if (StorageFactory is IInitializable) { ((IInitializable)StorageFactory).Init(sessionParameters); } if (Log.IsDebugEnabled) { Log.Debug("Session " + sessionParameters.SessionId + " initialized with " + StorageFactory.GetType().FullName + " storage factory"); } RuntimeState = new FixSessionRuntimeState(); Parameters = sessionParameters; Queue = StorageFactory.CreateQueue(sessionParameters); _forceLogOffTimeout = ConfigAdapter.ForceLogoff; _logoutWaitTimeout = ConfigAdapter.LogoutWaitTimeout; _reasonableTransmissionTimeMillis = ConfigAdapter.HbtReasonableTransmissionTime; _enabledMessageRejecting = ConfigAdapter.IsEnableMessageRejecting; InitAttributes(); SequenceManager = GetSequenceManagerFromConfig(); if (Log.IsDebugEnabled) { Log.Debug("Session " + sessionParameters.SessionId + " initialized with " + SequenceManager.GetType().FullName + " sequence manager"); } SequenceManager.LoadStoredParameters(); Listener = fixSessionListener; Listener.Session = this; _preparedMessageUtil = new PreparedMessageUtil(Parameters); _ignoreResetSeqNumFlagOnReset = ConfigAdapter.Configuration.GetPropertyAsBoolean(Config.IgnoreResetSeqNumFlagOnReset, false); sessionParameters.PrintConfiguration(); } /// <summary> /// For testing purposes /// </summary> internal AbstractFixSession() {} private void InitSessionRuntimeState() { var configuredOutLogon = Parameters.OutgoingLoginMessage; RuntimeState.OutgoingLogon = configuredOutLogon.DeepClone(false, configuredOutLogon.IsUserOwned); } public virtual ISessionSequenceManager SequenceManager { get; } private ISessionSequenceManager GetSequenceManagerFromConfig() { var sessionSequenceManagerClass = ConfigAdapter.Configuration .GetProperty(Config.SessionSequenceManager, typeof(StandardSessionSequenceManager).FullName); try { var reflectUtil = new ReflectUtilEx(Type.GetType(sessionSequenceManagerClass)); return reflectUtil.GetInstance(new object[] { this }) as ISessionSequenceManager; } catch (Exception ex) { Log.Warn("Can not load session sequence manager: " + sessionSequenceManagerClass + ". Cause: " + ex.Message + ". Loaded default StandardSessionSequenceManager."); return new StandardSessionSequenceManager(this); } } /// <inheritdoc /> public virtual IExtendedFixSessionListener ExtendedFixSessionListener { get; private set; } = new ExtendedFixSessionListenerAdapter(); /// <summary> /// Gets storage factory /// </summary> public IStorageFactory StorageFactory { get; } /// <inheritdoc /> public virtual IRejectMessageListener RejectMessageListener { get; set; } = new RejectFixSessionListenerAdapter(); /// <inheritdoc /> public virtual IErrorHandler ErrorHandler { get; set; } = new LoggingErrorHandler(); /// <inheritdoc /> public virtual IFixSessionOutOfSyncListener FixSessionOutOfSyncListener { set; get; } /// <inheritdoc /> public IFixSessionSlowConsumerListener SlowConsumerListener { get; set; } = new SlowConsumerListenerAdapter(); /// <inheritdoc /> public virtual void MarkShutdownAsGraceful() { if (Log.IsDebugEnabled) { Log.Debug("Mark shutdown as graceful session " + SessionId); } Graceful = true; if (Pumper != null) Pumper.GracefulShutdown = true; if (Reader != null) Reader.GracefulShutdown = true; } private string SessionId { get => Parameters.SessionId.ToString(); } /// <inheritdoc /> public virtual void ClearQueue() { if (_enabledMessageRejecting) { Pumper.RejectQueueMessages(); Queue.Clear(); //return true; } else { //remove only out-of-turn (session-level) messages lock (Queue) { var storedOutOfTournMode = Queue.OutOfTurnOnlyMode; Queue.OutOfTurnOnlyMode = true; if (Log.IsDebugEnabled) { Log.Debug("Clean out-of-turn queue for session " + SessionId); } while (!Queue.IsEmpty) { try { //get priority message var msg = Queue.Poll(); if (msg.FixMessage != null) { if (Log.IsDebugEnabled) { Log.Debug("Remove out-of-turn message: " + msg.ToString()); } } } catch (Exception) { // ignore } finally { Queue.Commit(); } } Queue.OutOfTurnOnlyMode = storedOutOfTournMode; } } //return false; } /// <inheritdoc /> public virtual int QueuedMessagesCount { get { lock (Queue) { return Queue.TotalSize; } } } /// <inheritdoc /> public virtual void LockSending() { SetOutOfTurnMode(true); if (PreferredSendingMode == SendingMode.SyncNoqueue) { ((NoQueueMessagePumper)Pumper)?.Lock(); } } /// <inheritdoc /> public virtual void UnlockSending() { SetOutOfTurnMode(false); if (PreferredSendingMode == SendingMode.SyncNoqueue) { ((NoQueueMessagePumper)Pumper)?.Unlock(); } } /// <inheritdoc /> public virtual void SetOutOfTurnMode(bool mode) { Queue.OutOfTurnOnlyMode = mode; } /// <inheritdoc /> public virtual void ResetSequenceNumbers() { ResetSequenceNumbers(false); } /// <inheritdoc /> public virtual void ResetSequenceNumbers(bool checkGapFillBefore) { lock (SessionLock) { if (Log.IsDebugEnabled) { Log.Debug("Reset sequences for session " + SessionId); } if (AlreadyShuttingDown || SessionState.IsDisconnected(SessionState)) { //sessionParameters.SetOutgoingSequenceNumber(0); //sessionParameters.SetIncomingSequenceNumber(0); Parameters.IncomingSequenceNumber = 1; Parameters.OutgoingSequenceNumber = 1; //reset also RR flags SequenceManager.RemoveRangeOfRrSequence(); SaveSessionParameters(); if (_initialized) { MessageFactory.SetSessionParameters(Parameters); SequenceManager.Reinit(this); SequenceManager.InitSeqNums(RuntimeState.InSeqNum, RuntimeState.OutSeqNum); } return; } } if (Parameters.FixVersion.CompareTo(FixVersion.Fix40) == 0) { throw new InvalidOperationException("FIX40 is not resettable while online"); } if (checkGapFillBefore) { SendTestRequestForSeqNumGap(); } else { ResetSeqNumsAndSendResetLogon(); } } /// <inheritdoc /> public virtual void SetSequenceNumbers(long inSeqNum, long outSeqNum) { lock (SessionLock) { // TODO: make sure that session safely initializes if (Log.IsDebugEnabled) { Log.Debug("set new sequences " + inSeqNum + ":" + outSeqNum + " for session " + SessionId); } if (!_initialized) { if (inSeqNum > 0) { Parameters.IncomingSequenceNumber = inSeqNum; } if (outSeqNum > 0) { Parameters.OutgoingSequenceNumber = outSeqNum; } } else { if (inSeqNum > 0) { Parameters.IncomingSequenceNumber = inSeqNum; RuntimeState.InSeqNum = inSeqNum; RuntimeState.LastProcessedSeqNum = inSeqNum - 1; } if (outSeqNum > 0) { Parameters.OutgoingSequenceNumber = outSeqNum; RuntimeState.OutSeqNum = outSeqNum; } } //reset also RR flags SequenceManager.RemoveRangeOfRrSequence(); SaveSessionParameters(); } } private void SendTestRequestForSeqNumGap() { // sets the flag and send TR SetAttribute(ExtendedFixSessionAttribute.TestRequestIsSentForSeqReset.Name, "Y"); SendTestRequest(); } private void ResetSeqNumsAndSendResetLogon() { SetAttribute(ExtendedFixSessionAttribute.IntradayLogonIsSent.Name, "Y"); SequenceManager.ConfigureStateBeforeReset(); // Bug BBP-283: After sequences reset sent queued message instead Logon lock (Queue) { //set OutOfTurn mode to prevent sending messages with invalid seq Queue.OutOfTurnOnlyMode = true; SequenceManager.SetResetSeqNumFlagIntoOutgoingLogon(); //reset sequences before send logon SequenceManager.ApplyOutSeqnum(1); Log.Debug("Send reset logon"); CreateAndSendLogonMessage(); } } private void CreateAndSendLogonMessage() { var message = MessageFactory.CompleteMessage(MsgType.Logon, new FixMessage()); ClearOutOfTurnQueue(); SendMessageOutOfTurn(string.Empty, message); } private void ClearOutOfTurnQueue() { lock (Queue) { Queue.ClearOutOfTurn(removedMessage => { if (removedMessage.IsApplicationLevelMessage()) { Log.Warn("Application message '" + removedMessage + "' has been removed from 'out of turn' queue."); } else { Log.Debug("Message '" + removedMessage + "' has been removed from 'out of turn' queue."); } }); } } /// <inheritdoc /> public virtual void SetFixSessionListener(IFixSessionListener listener) { if (listener is IExtendedFixSessionListener sessionListener) { ExtendedFixSessionListener = sessionListener; } Listener.SetUserListener(new SessionStateAdapter(_sessionStateListenerObserver, listener)); } /// <inheritdoc /> public virtual void AddInSessionLevelMessageListener(IFixMessageListener listener) { Listener.AddInSessionMessageListener(listener); } /// <inheritdoc /> public void AddOutSessionLevelMessageListener(ITypedFixMessageListener listener) { _outSessionLevelListeners.Add(listener); } /// <inheritdoc /> public void AddUserGlobalMessageHandler(AbstractUserGlobalMessageHandler userMessageHandler) { Listener.AddUserGlobalMessageHandler(userMessageHandler); } /// <inheritdoc /> public virtual IMessageStorage OutgoingStorage { get; protected set; } /// <inheritdoc /> public virtual IMessageStorage IncomingStorage { get; protected set; } /// <inheritdoc /> public virtual IQueue<FixMessageWithType> MessageQueue => Queue; /// <inheritdoc /> public virtual int OutgoingQueueSize { get { lock (Queue) { return Queue.TotalSize; } } } /// <inheritdoc /> public virtual IList<IEnqueuedFixMessage> GetOutgoingQueueMessages() { lock (Queue) { var msgs = Queue.ToArray(); var outmsgs = new List<IEnqueuedFixMessage>(msgs.Length); foreach (FixMessageWithType msg in msgs) { outmsgs.Add(EnqueuedFixMessage.CopyFrom(msg)); } return outmsgs; } } /// <inheritdoc /> public virtual byte[] RetrieveSentMessage(long seqNumber) { return OutgoingStorage.RetrieveMessage(seqNumber); } /// <inheritdoc /> public virtual byte[] RetrieveReceivedMessage(long seqNumber) { return IncomingStorage.RetrieveMessage(seqNumber); } /// <inheritdoc /> public virtual void SaveSessionParameters() { SequenceManager.SaveSessionParameters(); } /// <inheritdoc /> public abstract void Connect(); /// <inheritdoc /> public abstract Task ConnectAsync(); /// <inheritdoc /> public abstract void Reject(string reason); /// <inheritdoc /> public virtual SessionParameters Parameters { get; protected set; } /// <inheritdoc /> public virtual IFixMessageFactory MessageFactory { get; } /// <inheritdoc /> public virtual SessionState SessionState { get => RuntimeState.SessionState; set { if (Log.IsDebugEnabled) { Log.Debug("Change session " + SessionId + " state: " + SessionState + "->" + value); } RuntimeState.SessionState = value; FireSessionStateChanged(value); } } /// <inheritdoc /> public FixSessionRuntimeState RuntimeState { get; } private void FireSessionStateChanged(SessionState newState) { ControlOverStateChange(newState); NotifySessionStateChanged(newState); } private void ControlOverStateChange(SessionState newState) { SessionStatusCheckerThread thread = null; switch (newState.EnumValue) { case SessionState.InnerEnum.WaitingForLogoff: { var logoutTimeout = _logoutWaitTimeout >= 0 ? _logoutWaitTimeout : Parameters.HeartbeatInterval; thread = new SessionStatusCheckerThread(this, logoutTimeout, SessionState.WaitingForLogoff, "Logoff wasn't received"); } break; case SessionState.InnerEnum.WaitingForForcedLogoff: { thread = new SessionStatusCheckerThread(this, _forceLogOffTimeout, SessionState.WaitingForForcedLogoff, "", false); break; } case SessionState.InnerEnum.WaitingForForcedDisconnect: { thread = new SessionStatusCheckerThread(this, _forceLogOffTimeout, SessionState.WaitingForForcedDisconnect, "", false); break; } case SessionState.InnerEnum.Disconnected: case SessionState.InnerEnum.DisconnectedAbnormally: OnDisconnect(); break; case SessionState.InnerEnum.Dead: UnRegisterSessionTasks(); break; } if (thread != null) { thread.Start(); if (Log.IsDebugEnabled) { Log.Debug("Session status checker thread started"); } } } private void NotifySessionStateChanged(SessionState newState) { try { Listener.OnSessionStateChange(newState); } catch (Exception e) { if (Log.IsDebugEnabled) { Log.Warn("User listener was thrown exception.", e); } else { Log.Warn("User listener was thrown exception. " + e.Message); } } } /// <inheritdoc /> public virtual bool IsStatisticEnabled => Pumper.IsStatisticEnabled && Reader.IsStatisticEnabled; public virtual bool IsHbtControlInPumper => PreferredSendingMode == SendingMode.SyncNoqueue; /// <inheritdoc /> public virtual long BytesRead => Reader.IsStatisticEnabled ? Reader.MessageStatistic.BytesProcessed : -1; /// <inheritdoc /> public virtual long IsEstablished => _established; /// <inheritdoc /> public virtual long BytesSent => Pumper.IsStatisticEnabled ? Pumper.Statistic.BytesProcessed : -1; /// <inheritdoc /> public virtual long NoOfInMessages => Reader.IsStatisticEnabled ? Reader.MessageStatistic.MessagesProcessed : -1; /// <inheritdoc /> public virtual long NoOfOutMessages => Pumper.IsStatisticEnabled ? Pumper.Statistic.MessagesProcessed : -1; /// <inheritdoc /> public virtual long LastInMessageTimestamp => Reader.MessageProcessedTimestamp; /// <inheritdoc /> public virtual long LastOutMessageTimestamp => Pumper.MessageProcessedTimestamp; /// <summary> /// Gets chain of message handlers. /// </summary> public virtual HandlerChain MessageHandlers => Listener; public virtual void OnDisconnect() { ResetInitialization(); } /// <inheritdoc /> public virtual void Init() { if (!_initialized) { _initialized = true; InitSessionRuntimeState(); InitSessionInternal(); } } public virtual void ResetInitialization() { _initialized = false; } /// <summary> /// Initializes the resources and sends a logon message. /// </summary> /// <exception cref="IOException"> - if I/O error occurred. </exception> public virtual void PrepareForConnect() { Init(); CreateAndSendLogonMessage(); } public virtual bool IsResetSeqNumFlagRequiredForInitLogon => false; /// <summary> /// Initialize session internal. /// <p/> /// <p/> /// The method is created all session object: in/out storage and pumper/reader, etc. /// </summary> public virtual void InitSessionInternal() { if (_messageValidator == null) { //TODO: move to constructor when acceptor will be able to customize dictionary from session parameters _messageValidator = SessionValidatorFactory.GetMessageValidator(Parameters); } Graceful = false; AlreadyShuttingDown = false; AlreadySendingLogout = false; SequenceManager.LoadStoredParameters(); SequenceManager.InitLastSeqNumResetTimestampOnNewSession(); ResetTestRequestFlags(); MessageFactory.SetSessionParameters(Parameters); MessageFactory.SetRuntimeState(RuntimeState); IncomingStorage?.Dispose(); IncomingStorage = StorageFactory.CreateIncomingMessageStorage(Parameters); Reader = BuildMessageReader(IncomingStorage, Listener, Transport); var inStorageSeqNum = Reader.Init(ConfigAdapter); OutgoingStorage?.Dispose(); OutgoingStorage = StorageFactory.CreateOutgoingMessageStorage(Parameters); Queue.Initialize(); Pumper?.Dispose(); Pumper = BuildMessagePumper(ConfigAdapter, Queue, OutgoingStorage, MessageFactory, Transport, SequenceManager); var nextOutStorageSeqNum = Pumper.Init(); Queue.OutOfTurnOnlyMode = true; // turn off when response received SequenceManager.Reinit(this); SequenceManager.InitSeqNums(inStorageSeqNum, nextOutStorageSeqNum); SubscribeForAttributeChanges(ExtendedFixSessionAttribute.IsResendRequestProcessed, new ExtendedFixSessionAttributeListener(this)); if (Scheduler == null) { Scheduler = new SessionTaskScheduler(Parameters); } RegisterSessionTasks(); } private class ExtendedFixSessionAttributeListener : IExtendedFixSessionAttributeListener { private readonly AbstractFixSession _outerInstance; public ExtendedFixSessionAttributeListener(AbstractFixSession outerInstance) { _outerInstance = outerInstance; } public void OnAttributeSet(object value) { } public void OnAttributeRemoved() { _outerInstance.Reader.MessageProcessedTimestamp = DateTimeHelper.CurrentMilliseconds; } } public virtual IMessageReader BuildMessageReader(IMessageStorage incomingMessageStorage, HandlerChain listener, IFixTransport transport) { return new MessageReader(this, incomingMessageStorage, listener, transport); } public virtual IMessagePumper BuildMessagePumper(ConfigurationAdapter configuration, IQueue<FixMessageWithType> queue, IMessageStorage outgoingMessageStorage, IFixMessageFactory messageFactory, IFixTransport transport, ISessionSequenceManager sequenceManager) { var preferredSendingMode = configuration.PreferredSendingMode; var slowConsumerDetectionEnabled = configuration.Configuration.GetPropertyAsBoolean(Config.SlowConsumerDetectionEnabled, false); var transportOrWrapper = slowConsumerDetectionEnabled ? new ConsumingControlTransportWrapper(this) : Transport; if (preferredSendingMode == SendingMode.Sync) { if (transport.IsBlockingSocket) { return new SyncBlockingMessagePumper(this, queue, outgoingMessageStorage, messageFactory, transportOrWrapper, sequenceManager); } return new SyncMessagePumper(this, queue, outgoingMessageStorage, messageFactory, transportOrWrapper, sequenceManager); } if (preferredSendingMode == SendingMode.SyncNoqueue) { return new NoQueueMessagePumper(this, queue, outgoingMessageStorage, messageFactory, transport, sequenceManager); } return new AsyncMessagePumper(this, queue, outgoingMessageStorage, messageFactory, transportOrWrapper, sequenceManager); } public virtual void StartSession() { if (Log.IsDebugEnabled) { Log.Debug("Starting MessageReader thread. Incoming seq number:" + RuntimeState.InSeqNum); } Reader.Start(); if (Log.IsDebugEnabled) { Log.Debug("Reader started."); } if (Log.IsDebugEnabled) { Log.Debug("Starting MessagePumper thread. Outgoing seq number:" + RuntimeState.OutSeqNum); } Pumper.Start(); if (Log.IsDebugEnabled) { Log.Debug("Pumper started"); } _established = DateTimeHelper.CurrentMilliseconds; } /// <summary> /// Disconnect the session. /// <para> /// The method sends the logof message, if response is not received during the HBI, /// shutdown the session. /// </para> /// </summary> /// <param name="reason"> the reason, if parameter is not null, the logof message will be send with 58=reason. </param> /// <seealso cref="IFixSession.Disconnect(string)"> </seealso> public virtual void Disconnect(string reason) { Disconnect(DisconnectReason.GetDefault(), reason); } /// <summary> /// Disconnect the session. Async version. /// <para> /// The method sends the logof message, if response is not received during the HBI, /// shutdown the session. /// </para> /// </summary> /// <param name="reason"> the reason, if parameter is not null, the logof message will be send with 58=reason. </param> /// <seealso cref="IFixSession.Disconnect(string)"> </seealso> public virtual async Task DisconnectAsync(string reason) { await Task.Run(() => Disconnect(reason)).ConfigureAwait(false); } /// <inheritdoc /> public virtual void Disconnect(DisconnectReason reasonType, string reasonDescription) { Disconnect(reasonType, reasonDescription, true, false, true); } /// <inheritdoc /> public virtual void ForcedDisconnect(DisconnectReason reasonType, string reasonDescription, bool continueReading) { Disconnect(reasonType, reasonDescription, true, true, continueReading); } public virtual void Disconnect(DisconnectReason reasonType, string reasonDescription, bool isGracefull, bool isForced) { Disconnect(reasonType, reasonDescription, isGracefull, isForced, true); } //TODO: change to something like flag set public virtual void Disconnect(DisconnectReason reasonType, string reasonDescription, bool isGracefull, bool isForced, bool continueReading) { Disconnect(reasonType, reasonDescription, isGracefull, isForced, continueReading, true); } //TODO: change to something like flag set public virtual void Disconnect(DisconnectReason reasonType, string reasonDescription, bool isGracefull, bool isForced, bool continueReading, bool sendLogout) { lock (SessionLock) { if (isGracefull) { MarkShutdownAsGraceful(); } if (reasonType != null) { LastDisconnectReason = reasonType; } if (sendLogout) { var sessionState = SessionState; if (sessionState == SessionState.Connected || sessionState == SessionState.WaitingForLogon || sessionState == SessionState.LogonReceived) { if (TryStartSendingLogout()) { SendLogoff(reasonDescription, isForced, continueReading); } } else if (reasonType != DisconnectReason.Reject) { //don't show warning for var message = "Disconnect while not connected (" + SessionState + "): " + reasonDescription; Log.Warn(message); } } else { SetPreLogoffSessionState(isForced, continueReading); Log.Info("Session was disconnected without logout message."); } } } /// <inheritdoc/> public virtual bool TryStartSendingLogout() { var wasSendingLogout = AlreadySendingLogout.AtomicExchange(true); return !wasSendingLogout; } /// <inheritdoc/> public virtual void Shutdown(DisconnectReason reason, bool blocking) { var wasShuttingDown = AlreadyShuttingDown.AtomicExchange(true); if (wasShuttingDown) { if (blocking) { Log.Debug($"Waiting for shutdown: {Thread.CurrentThread.Name}"); WaitUntilPumperShutdown(); WaitUntilReaderShutdown(); Log.Debug($"WorkerThread was shutdown: {Thread.CurrentThread.Name}"); CloseTransport(); // if prev call of shutdown is hang, this call release pumper and reader } } else { if (Log.IsDebugEnabled) { Log.Debug("Shutting down:" + this); } if (Thread.CurrentThread == Pumper?.WorkerThread || Thread.CurrentThread == Reader?.WorkerThread) { var shutdownWaiter = new Thread(() => InternalShutdown(reason)) { Name = "Shutdown:" + SessionId }; //temporary remove async read/write thread closing shutdownWaiter.Start(); } else { InternalShutdown(reason); } } SequenceManager.SaveProcessedSeqNumberOnShutdown(); } private void InternalShutdown(DisconnectReason reason) { lock (SessionLock) { if (Pumper != null) { Pumper.Shutdown(); Log.Debug("Pumper stopped"); } else { Log.Debug("Pumper not initialized"); } CloseTransport(); if (Reader != null) { Reader.Shutdown(); Log.Debug("Reader stopped"); } else { Log.Debug("Reader not initialized"); } SequenceManager.SaveProcessedSeqNumberOnShutdown(); if (SessionState.IsNotDisconnected(SessionState)) { if (reason != null) { LastDisconnectReason = reason; } SessionState = Graceful ? SessionState.Disconnected : SessionState.DisconnectedAbnormally; } } } /// <summary> /// Close the transport. /// </summary> private void CloseTransport() { lock (SessionLock) { try { Transport?.Close(); } catch (IOException e) { if (Log.IsDebugEnabled) { Log.Warn("Close transport thrown error. Cause: " + e.Message, e); } else { Log.Warn("Close transport thrown error. Cause: " + e.Message); } } } } private void WaitUntilReaderShutdown() { try { if (Reader?.WorkerThread != null && Thread.CurrentThread != Reader.WorkerThread) { while (Reader.WorkerThread.IsAlive) { Reader.WorkerThread.Join(10); try { Monitor.Wait(this, 10); } catch (SynchronizationLockException) { // do nothing } } } } catch (ThreadInterruptedException) { // do nothing } } private void WaitUntilPumperShutdown() { try { if (Pumper?.WorkerThread != null && Thread.CurrentThread != Pumper.WorkerThread) { while (Pumper.WorkerThread.IsAlive) { Pumper.WorkerThread.Join(10); try { Monitor.Wait(this, 10); } catch (SynchronizationLockException) { // do nothing } } } } catch (ThreadInterruptedException) { // intentionally blank } } /// <summary> /// Sends logout over "Out of turn" mode. Only sessions messages will be sends. /// </summary> private void SendLogoff(string reason, bool forced, bool continueReading) { Queue.OutOfTurnOnlyMode = true; FixMessage list = null; if (!string.IsNullOrEmpty(reason)) { list = new FixMessage(); list.AddTag(Tags.Text, reason); } SetPreLogoffSessionState(forced, continueReading); SendMessageOutOfTurn("5", list); } public virtual void SetPreLogoffSessionState(bool forced, bool continueReading) { var newState = SessionState.WaitingForLogoff; if (forced) { newState = continueReading ? SessionState.WaitingForForcedLogoff : SessionState.WaitingForForcedDisconnect; } SessionState = newState; } /// <inheritdoc /> public virtual bool SendMessage(string type, FixMessage content, FixSessionSendingType optionMask) { return SendMessageAndGetQueueSize(type, content, optionMask) == 0; } /// <inheritdoc /> public virtual int SendMessageAndGetQueueSize(string type, FixMessage content, FixSessionSendingType optionMask) { ValidateMessageType(type, content); VerifySessionState(); return Pumper.Send(type, content, optionMask); } /// <inheritdoc /> public virtual bool SendMessage(string type, FixMessage content) { ValidateMessageType(type, content); VerifySessionState(); return Pumper.Send(type, content, FixSessionSendingType.DefaultSendingOption) == 0; } private void ValidateMessageType(string type, FixMessage content) { // System.out.println("Hello"); if (string.IsNullOrEmpty(type) && (content == null || !content.IsTagExists(Tags.MsgType))) { throw new ArgumentException("invalid message or message type: " + (content == null ? null : content.ToString())); } } /// <inheritdoc /> public virtual bool SendAsIs(FixMessage message, FixSessionSendingType options) { ValidateMessage(message); VerifySessionState(); return Pumper.Send(null, message, options) == 0; } /// <inheritdoc /> public virtual bool SendAsIs(FixMessage message) { return SendAsIs(message, FixSessionSendingType.DefaultSendingOption); } private void ValidateMessage(FixMessage message) { if (message == null || !message.IsTagExists(Tags.MsgType)) { throw new ArgumentException("invalid message or message type"); } } /// <inheritdoc /> public virtual bool SendWithChanges(FixMessage content, ChangesType allowedChangesType, FixSessionSendingType options) { //TODO: make it possible to send message synchronous return SendWithChangesAndOptionGetQueueSize(content, allowedChangesType, options) == 0; } /// <inheritdoc /> public virtual int SendWithChangesAndGetQueueSize(FixMessage content, ChangesType allowedChangesType, FixSessionSendingType options) { //TODO: make it possible to send message synchronous return SendWithChangesAndOptionGetQueueSize(content, allowedChangesType, options); } /// <inheritdoc /> public virtual bool SendWithChanges(FixMessage content, ChangesType? allowedChangesType) { return SendWithChangesAndOptionGetQueueSize(content, allowedChangesType, FixSessionSendingType.DefaultSendingOption) == 0; } public virtual int SendWithChangesAndGetQueueSize(FixMessage content, ChangesType allowedChangesType) { return SendWithChangesAndOptionGetQueueSize(content, allowedChangesType, FixSessionSendingType.DefaultSendingOption); } private int SendWithChangesAndOptionGetQueueSize(FixMessage content, ChangesType? allowedChangesType, FixSessionSendingType options) { if (allowedChangesType == null) { throw new ArgumentNullException("allowedChangesType is null"); } ValidateMessage(content); VerifySessionState(); return Pumper.Send(content, allowedChangesType, options); } private protected void VerifySessionState() { if (SessionState.Dead == SessionState) { throw new InvalidOperationException("session is disposed"); } } /// <inheritdoc /> public virtual FixMessage PrepareMessage(FixMessage message, MessageStructure structure) { return _preparedMessageUtil.PrepareMessage(message, structure); } /// <inheritdoc /> public virtual FixMessage PrepareMessage(FixMessage message, string type, MessageStructure structure) { return _preparedMessageUtil.PrepareMessage(message, type, structure); } /// <inheritdoc /> public virtual FixMessage PrepareMessage(string msgType, MessageStructure userStructure) { return _preparedMessageUtil.PrepareMessage(msgType, userStructure); } /// <inheritdoc /> public virtual FixMessage PrepareMessageFromString(byte[] message, MessageStructure structure) { return _preparedMessageUtil.PrepareMessageFromString(message, structure); } /// <inheritdoc /> public virtual FixMessage PrepareMessageFromString(byte[] message, string type, MessageStructure structure) { return _preparedMessageUtil.PrepareMessageFromString(message, type, structure); } /// <inheritdoc /> public virtual bool SendMessage(FixMessage message) { return SendMessage(DontRedefineType, message, FixSessionSendingType.DefaultSendingOption); } /// <inheritdoc /> public virtual bool SendMessage(FixMessage message, FixSessionSendingType optionMask) { return SendMessage(DontRedefineType, message, optionMask); } /// <inheritdoc /> public virtual int SendMessageAndGetQueueSize(FixMessage message, FixSessionSendingType optionMask) { return SendMessageAndGetQueueSize(DontRedefineType, message, optionMask); } /// <inheritdoc /> public virtual bool SendMessageOutOfTurn(string type, FixMessage message) { ValidateMessageType(type, message); NotifyOutSessionLevelListeners(type, message); return Pumper?.SendOutOfTurn(type, message) ?? false; } private void NotifyOutSessionLevelListeners(string type, FixMessage message) { if (_outSessionLevelListeners.Count > 0) { var msgType = !string.IsNullOrEmpty(type) ? type : message.GetTagValueAsString(Tags.MsgType); if (msgType != null && RawFixUtil.IsSessionLevelType(msgType)) { foreach (var listener in _outSessionLevelListeners) { listener.OnMessage(msgType, message); } } } } /// <inheritdoc /> public virtual IMessageValidator MessageValidator { get { return _messageValidator; } } /// <inheritdoc /> public virtual void Dispose() { Shutdown(DisconnectReason.GetDefault(), true); Listener?.Dispose(); Queue.Dispose(); FixSessionManager.Instance.RemoveFixSession(this); if (SessionState != SessionState.Dead) { SessionState = SessionState.Dead; } _sessionStateListenerObserver.RemoveAllListeners(); if (StorageFactory is IClosable) { ((IClosable)StorageFactory).Close(); } Scheduler?.Shutdown(); Scheduler = null; Pumper?.Dispose(); } /// <summary> /// Send test request. /// </summary> public virtual void SendTestRequest() { var currentTime = DateTimeHelper.CurrentMilliseconds; TestRequestTime.Value = currentTime; _testRequest.Set(Tags.TestReqID, TestRequestTime.AsByteArray()); SetAttribute(ExtendedFixSessionAttribute.LastSentTestReqId.Name, TestRequestTime); var oldSentTrNum = (MutableInteger)GetAttribute(ExtendedFixSessionAttribute.SentTestReqNumberId.Name); if (oldSentTrNum == null) { SentTrNum.SetNumber(1); } else { SentTrNum.SetNumber(oldSentTrNum.GetNumber() + 1); } SetAttribute(ExtendedFixSessionAttribute.SentTestReqNumberId.Name, SentTrNum); if (Log.IsDebugEnabled) { Log.Trace("TestRequest sent: " + currentTime); } SendMessageOutOfTurn(MsgType.TestRequest, _testRequest); } /// <summary> /// The method checks if TR send or received. /// <para> /// If session is not received any messages during HB interval the HB will be send and /// If no response received session will be disconnected; /// </para> /// <para> /// This is helper method for session task. /// </para> /// </summary> public virtual void CheckHasSessionSendOrReceivedTestRequest() { var currentTime = DateTimeHelper.CurrentMilliseconds; var noOfMillisAfterLastMessage = currentTime - LastInMessageTimestamp; var hbiAndReasonableTransmissionTime = GetHbiPlusReasonableTransmissionTimeMillis(); if (noOfMillisAfterLastMessage >= hbiAndReasonableTransmissionTime) { // we didn't receive anything for HBI+RTT seconds if (!HasTestRequestBeenSent()) { // we didn't sent any test requests yet SendTestRequest(); } else { CheckForTestRequestAnswer(hbiAndReasonableTransmissionTime); } } else if (HasTestRequestBeenSent()) { CheckForTestRequestAnswer(hbiAndReasonableTransmissionTime); } } private void CheckForTestRequestAnswer(int reasonableHeartbeatTime) { var lastTimeOfAllowedInactivity = DateTimeHelper.CurrentMilliseconds - reasonableHeartbeatTime; if (HasTestRequestReplyBeenReceived()) { RestoreSessionAfterReceivedTestRequest(); } else if (LastInMessageTimestamp >= lastTimeOfAllowedInactivity) { // any message received, session state restored Log.Debug("Reset TestRequest flags because messages received after request"); ResetTestRequestFlags(); } else { var timestamp = (TestReqIdTimestamp)GetAttribute(ExtendedFixSessionAttribute.LastSentTestReqId.Name); // can't be null because of the if above var timeSinceLastHeartbeatSentInMillis = DateTimeHelper.CurrentMilliseconds - timestamp.Value; if (timeSinceLastHeartbeatSentInMillis > reasonableHeartbeatTime) { var curSentTrNum = (MutableInteger)GetAttribute(ExtendedFixSessionAttribute.SentTestReqNumberId.Name); var maxAttempts = ConfigAdapter.TestRequestsNumberUponDisconnection; if (curSentTrNum == null || curSentTrNum.GetNumber() < maxAttempts) { SendTestRequest(); } else { DisconnectTestRequestIsLost(); } } } } public virtual void CheckSessionInactiveAndSendHbt() { var currentTime = DateTimeHelper.CurrentMilliseconds; var noOfMillisAfterLastMessage = currentTime - LastOutMessageTimestamp; var hbi = Parameters.HeartbeatInterval * Second; if (noOfMillisAfterLastMessage >= hbi) { SendMessageOutOfTurn("0", null); } } /// <summary> /// Disconnect the session if test request doesn't received. /// </summary> public virtual void DisconnectTestRequestIsLost() { // String timestamp = (String) GetAttribute(LastSentTestReqID.Name); // can't be null because of the if above // long timeSinceLastHeartbeatSentInMillis = (System.currentTimeMillis() - Long.parseLong(timestamp));// / SECOND; // final int reasonableHeartbeatTime = getReasonableHeartbeatTimeMillis(); // if (timeSinceLastHeartbeatSentInMillis > reasonableHeartbeatTime) { var testRequestId = GetAttribute(ExtendedFixSessionAttribute.LastSentTestReqId.Name); if (Log.IsWarnEnabled) { Log.Warn("TestRequest reply for " + testRequestId + " is not received. Session " + SessionId + " will be terminated."); } RemoveAttribute(ExtendedFixSessionAttribute.LastSentTestReqId.Name); RemoveAttribute(ExtendedFixSessionAttribute.LastReceivedTestReqId.Name); //disconnect without sending Logout Disconnect(DisconnectReason.NoAnswer, "Test request " + testRequestId + " reply hasn't been received within specified time period", false, true, true, false); } /// <summary> /// Returns configured HBI plus configured "reasonable transmission time" /// </summary> /// <returns>HBI + RTT in milliseconds</returns> private int GetHbiPlusReasonableTransmissionTimeMillis() { var heartbeatIntervalSec = Parameters.HeartbeatInterval; return heartbeatIntervalSec * Second + _reasonableTransmissionTimeMillis; } /// <summary> /// Restore the session if test request is received. /// </summary> public virtual void RestoreSessionAfterReceivedTestRequest() { Log.Debug("TestRequest reply received"); ResetTestRequestFlags(); } public virtual void ResetTestRequestFlags() { RemoveAttribute(ExtendedFixSessionAttribute.LastReceivedTestReqId.Name); RemoveAttribute(ExtendedFixSessionAttribute.LastSentTestReqId.Name); RemoveAttribute(ExtendedFixSessionAttribute.SentTestReqNumberId.Name); } private bool HasTestRequestReplyBeenReceived() { var atrSent = GetAttribute(ExtendedFixSessionAttribute.LastSentTestReqId.Name); var atrReceived = GetAttribute(ExtendedFixSessionAttribute.LastReceivedTestReqId.Name); if (atrSent == null || atrReceived == null) { return false; } var sentReqId = (TestReqIdTimestamp)atrSent; var receivedReqId = (TagValue)atrReceived; return EqualsArrays(sentReqId.AsByteArray(), 0, sentReqId.AsByteArray().Length, receivedReqId.Buffer, receivedReqId.Offset, receivedReqId.Length); } private bool EqualsArrays(byte[] b1, int offset1, int length1, byte[] b2, int offset2, int length2) { if (length1 != length2) { return false; } for (var i = 0; i < length1; i++) { if (b1[offset1 + i] != b2[offset2 + i]) { return false; } } return true; } private bool HasTestRequestBeenSent() { return GetAttribute(ExtendedFixSessionAttribute.LastSentTestReqId.Name) != null; } private void InitAttributes() { for (var i = 0; i < _longAttrs.Length; i++) { _longAttrs[i] = Common.Constants.LongNull; } } /// <inheritdoc /> public virtual void SetAttribute(ExtendedFixSessionAttribute attr, long value) { _longAttrs[attr.Ordinal()] = value; } public virtual void RemoveAttribute(ExtendedFixSessionAttribute attr) { _longAttrs[attr.Ordinal()] = Common.Constants.LongNull; _boolAttrs[attr.Ordinal()] = false; //RemoveAttribute(attr.Name); } public virtual void RemoveLongAttribute(ExtendedFixSessionAttribute attr) { _longAttrs[attr.Ordinal()] = Common.Constants.LongNull; } /// <inheritdoc /> public virtual long GetAttributeAsLong(ExtendedFixSessionAttribute attr) { return _longAttrs[attr.Ordinal()]; } /// <inheritdoc /> public virtual void SetAttribute(ExtendedFixSessionAttribute attr, bool value) { _boolAttrs[attr.Ordinal()] = value; } /// <inheritdoc /> public virtual bool GetAttributeAsBool(ExtendedFixSessionAttribute attr) { return _boolAttrs[attr.Ordinal()]; } /// <inheritdoc /> public virtual void SetAttribute(string key, object value) { _attributes[key] = value; if (_attributeListeners.ContainsKey(key)) { var listeners = _attributeListeners[key]; foreach (var lstnr in listeners) { lstnr.OnAttributeSet(value); } } } /// <inheritdoc /> public virtual void SetAttribute(ExtendedFixSessionAttribute key, object @object) { SetAttribute(key.Name, @object); } /// <inheritdoc /> public virtual object GetAttribute(string key) { return _attributes.GetValueOrDefault(key); } /// <inheritdoc /> public virtual object GetAttribute(ExtendedFixSessionAttribute key) { return GetAttribute(key.Name); } /// <inheritdoc /> public virtual object GetAndRemoveAttribute(string key) { var result = GetAttribute(key); if (result != null) { RemoveAttribute(key); } return result; } /// <inheritdoc /> public virtual void RemoveAttribute(string key) { _attributes.Remove(key); if (_attributeListeners.ContainsKey(key)) { var listeners = _attributeListeners[key]; foreach (var lstnr in listeners) { lstnr.OnAttributeRemoved(); } } } /// <inheritdoc /> public virtual void SubscribeForAttributeChanges(ExtendedFixSessionAttribute attr, IExtendedFixSessionAttributeListener listener) { var attrName = attr.Name; if (_attributeListeners.ContainsKey(attrName)) { _attributeListeners[attrName].Add(listener); } else { _attributeListeners[attrName] = new ConcurrentBag<IExtendedFixSessionAttributeListener>(new[] { listener }); } } /// <inheritdoc /> public virtual void AddSessionStateListener(IFixSessionStateListener stateListener) { _sessionStateListenerObserver.AddListener(stateListener); } /// <inheritdoc /> public virtual void RemoveSessionStateListener(IFixSessionStateListener stateListener) { _sessionStateListenerObserver.RemoveListener(stateListener); } public override string ToString() { return Parameters.SessionId.ToString(); } public override bool Equals(object o) { if (this == o) { return true; } if (o == null || GetType() != o.GetType()) { return false; } var that = (AbstractFixSession)o; if (Parameters != null && that.Parameters != null) { if (!Parameters.SessionId.Equals(that.Parameters.SessionId)) { return false; } } return true; } public override int GetHashCode() { return Parameters?.SessionId?.GetHashCode() ?? 0; } /// <inheritdoc /> public virtual DisconnectReason LastDisconnectReason { get { return _disconnectReason; } set { if (Log.IsTraceEnabled) { Log.Trace("Set disconnect reason " + value + " for session " + SessionId); } _disconnectReason = value; } } private class ExtendedFixSessionListenerAdapter : IExtendedFixSessionListener { public virtual void OnMessageReceived(MsgBuf msgBuf) { } public virtual void OnMessageSent(byte[] bytes, int offset, int length) { } public virtual void OnSessionStateChange(SessionState sessionState) { } public virtual void OnNewMessage(FixMessage message) { } } private class RejectFixSessionListenerAdapter : IRejectMessageListener { protected internal static readonly ILog Log = LogFactory.GetLog(typeof(RejectFixSessionListenerAdapter)); public virtual void OnRejectMessage(FixMessage message) { if (Log.IsDebugEnabled) { Log.Debug("Message wasn't sent: " + message.ToPrintableString()); } } } private class FixSessionOutOfSyncListenerAdapter : IFixSessionOutOfSyncListener { private readonly AbstractFixSession _session; public FixSessionOutOfSyncListenerAdapter(AbstractFixSession session) { _session = session; } public void OnGapDetected(long lastProcessedSeqNum, long receivedSeqNum) { if (_session.Log.IsDebugEnabled) { _session.Log.Debug("Detected incoming sequence gap in session " + _session.SessionId + ": " + lastProcessedSeqNum + "-" + receivedSeqNum); } } public void OnGapClosed(long lastProcessedSeqNum) { if (_session.Log.IsDebugEnabled) { _session.Log.Debug("Incoming sequence gap in session " + _session.SessionId + " closed. Last processed seq num is " + lastProcessedSeqNum); } } public void OnResendRequestReceived(long gapStart, long gapEnd) { if (_session.Log.IsDebugEnabled) { _session.Log.Debug("Received ResendRequest message for session " + _session.SessionId + " for gap: " + gapStart + "-" + gapEnd); } } public void OnResendRequestProcessed(long gapEnd) { if (_session.Log.IsDebugEnabled) { _session.Log.Debug("Received ResendRequest message for session " + _session.SessionId + " processed. Last re-send message sequence " + "is " + gapEnd); } } public void OnResendRequestSent(long gapStart, long gapEnd) { if (_session.Log.IsDebugEnabled) { _session.Log.Debug("ResendRequest message with gap (" + gapStart + "-" + gapEnd + ") for session " + _session.SessionId + " was sent"); } } public void OnGapFillReceived(FixMessage sequenceResetGapFillMessage) { if (_session.Log.IsDebugEnabled) { _session.Log.Debug("GapFill message for session " + _session.SessionId + " is received: " + sequenceResetGapFillMessage.ToPrintableString()); } } public void OnGapFillSent(FixMessage sequenceResetGapFillMessage) { if (_session.Log.IsDebugEnabled) { _session.Log.Debug("GapFill message for session " + _session.SessionId + " is sent: " + sequenceResetGapFillMessage.ToPrintableString()); } } } private class FixSessionListenerObserver : IFixSessionListener { private readonly AbstractFixSession _session; public FixSessionListenerObserver(AbstractFixSession session) { _session = session; } internal IList<IFixSessionStateListener> FixSessionListeners = new List<IFixSessionStateListener>(); internal IFixSessionStateListener[] ExtendedFixSessionListeners = new IFixSessionStateListener[0]; public virtual void AddListener(IFixSessionStateListener extendedFixSessionListener) { lock (FixSessionListeners) { FixSessionListeners.Add(extendedFixSessionListener); var extendedFixSessionListeners = ((List<IFixSessionStateListener>)FixSessionListeners).ToArray(); ExtendedFixSessionListeners = extendedFixSessionListeners; } } public virtual void RemoveListener(IFixSessionStateListener extendedFixSessionListener) { lock (FixSessionListeners) { FixSessionListeners.Remove(extendedFixSessionListener); var extendedFixSessionListeners = ((List<IFixSessionStateListener>)FixSessionListeners).ToArray(); ExtendedFixSessionListeners = extendedFixSessionListeners; } } /// <inheritdoc /> public virtual void OnSessionStateChange(SessionState sessionState) { IFixSessionStateListener[] listeners; lock (FixSessionListeners) { listeners = ExtendedFixSessionListeners; } foreach (var listener in listeners) { try { listener.OnSessionStateChange(new StateEvent(_session, sessionState)); } catch (Exception) { } } } /// <inheritdoc /> public virtual void OnNewMessage(FixMessage message) { // no supported for observers } public virtual void RemoveAllListeners() { lock (FixSessionListeners) { FixSessionListeners.Clear(); ExtendedFixSessionListeners = Array.Empty<IFixSessionStateListener>(); } } } private class SlowConsumerListenerAdapter : IFixSessionSlowConsumerListener { protected internal static readonly ILog Log = LogFactory.GetLog(typeof(SlowConsumerListenerAdapter)); public virtual void OnSlowConsumerDetected(SlowConsumerReason reason, long expected, long current) { if (Log.IsDebugEnabled) { Log.Debug("Slow consumer detected: " + reason + "; " + expected + " / " + current); } } } private class SessionStateAdapter : IFixSessionListener { private readonly FixSessionListenerObserver _sessionListenerObserver; private readonly IFixSessionListener _listener; public SessionStateAdapter(FixSessionListenerObserver sessionListenerObserver, IFixSessionListener listener) { _sessionListenerObserver = sessionListenerObserver; _listener = listener; } /// <inheritdoc /> public virtual void OnSessionStateChange(SessionState sessionState) { _listener.OnSessionStateChange(sessionState); // todo may be move to session observers _sessionListenerObserver.OnSessionStateChange(sessionState); } /// <inheritdoc /> public virtual void OnNewMessage(FixMessage message) { _listener.OnNewMessage(message); } } private class ConsumingControlTransportWrapper : IFixTransport { private readonly AbstractFixSession _session; private readonly long _transportWriteDelayThreshold; public ConsumingControlTransportWrapper(AbstractFixSession session) { _session = session ?? throw new ArgumentNullException(nameof(session)); _transportWriteDelayThreshold = _session.ConfigAdapter.Configuration.GetPropertyAsLong(Config.SlowConsumerWriteDelayThreshold); } /// <inheritdoc /> public bool IsBlockingSocket => _session.Transport.IsBlockingSocket; /// <inheritdoc /> public void ReadMessage(MsgBuf buf) { _session.Transport.ReadMessage(buf); } /// <inheritdoc /> public void Write(byte[] message) { Write(message, 0, message.Length); } /// <inheritdoc /> public void WaitUntilReadyToWrite() { _session.Transport.WaitUntilReadyToWrite(); } /// <inheritdoc /> public void Close() { _session.Transport.Close(); } /// <inheritdoc /> public int OptimalBufferSize => _session.Transport.OptimalBufferSize; /// <inheritdoc /> public string RemoteHost { get { return _session.Transport.RemoteHost; } } /// <inheritdoc /> public int Write(byte[] message, int offset, int length) { var start = DateTimeHelper.CurrentMilliseconds; var written = _session.Transport.Write(message, offset, length); Handle(length, written, start, DateTimeHelper.CurrentMilliseconds); return written; } /// <inheritdoc /> public int Write(ByteBuffer buf, int offset, int length) { var start = DateTimeHelper.CurrentMilliseconds; var written = _session.Transport.Write(buf, offset, length); Handle(length, written, start, DateTimeHelper.CurrentMilliseconds); return written; } public void Handle(int toWrite, int written, long begin, long end) { var delay = end - begin; if (delay > _transportWriteDelayThreshold) { _session.SlowConsumerListener.OnSlowConsumerDetected(SlowConsumerReason.TransportWriteDelay, _transportWriteDelayThreshold, delay); } if (written < toWrite) { _session.SlowConsumerListener.OnSlowConsumerDetected(SlowConsumerReason.TransportWriteNotComplete, toWrite, written); } } } public void BackupStorages() { try { IncomingStorage.BackupStorage(Parameters); } catch (IOException e) { Log.Error("Error on backup incoming storage. Cause: " + e.Message, e); throw; } try { OutgoingStorage.BackupStorage(Parameters); } catch (IOException e) { Log.Error("Error on backup outgoing storage. Cause: " + e.Message, e); throw; } } /// <inheritdoc /> public long InSeqNum { get { lock (SessionLock) { if (!_initialized) { return Parameters.IncomingSequenceNumber > 0 ? Parameters.IncomingSequenceNumber : RuntimeState.InSeqNum; } return RuntimeState.InSeqNum; } } set => SetSequenceNumbers(value, -1); } /// <inheritdoc /> public long OutSeqNum { get { lock (SessionLock) { if (!_initialized) { return Parameters.OutgoingSequenceNumber > 0 ? Parameters.OutgoingSequenceNumber : RuntimeState.OutSeqNum; } return RuntimeState.OutSeqNum; } } set => SetSequenceNumbers(-1, value); } #region Scheduled tasks and methods /// <summary> /// Register session`s scheduled tasks: /// - Scheduled SeqNum reset task /// - HB /// - TestRequest tasks /// </summary> private void RegisterSessionTasks() { // required jobs Scheduler.ScheduleHeartbeat(CheckHbAndTestRequestInterval); Scheduler.ScheduleTestRequest(CheckHbAndTestRequestInterval); // SeqNum reset if configured if (ConfigAdapter.IsResetSeqNumTimeEnabled) { RegisterSeqResetTask(); } } /// <summary> /// De-register session scheduled tasks: /// - SeqNum reset /// - HB /// - TestRequest /// - Session schedule /// </summary> private void UnRegisterSessionTasks() { Scheduler?.DescheduleAllTasks(); } private void RegisterSeqResetTask() { var resetTimeString = ConfigAdapter.ResetSequenceTime; var resetTimeZoneString = ConfigAdapter.ResetSequenceTimeZone; if (!TimeSpan.TryParseExact(resetTimeString, @"h\:m\:s", null, out var resetTime)) { resetTime = TimeSpan.Zero; Log.Warn($"Cannot parse Time '{resetTimeString}', using {resetTime} instead."); } if (!DateTimeHelper.TryParseTimeZone(resetTimeZoneString, out var resetTimeZone)) { resetTimeZone = TimeZoneInfo.Utc; Log.Warn($"Cannot parse Time Zone '{resetTimeZoneString}', using UTC Time Zone instead."); } Scheduler.ScheduleSeqReset(resetTime, resetTimeZone); } #endregion } }