FixAntenna/NetCore/FixEngine/Manager/FixSessionManager.cs (444 lines of code) (raw):

// Copyright (c) 2021 EPAM Systems // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using Epam.FixAntenna.NetCore.Common.Logging; using Epam.FixAntenna.NetCore.FixEngine.Session; using Epam.FixAntenna.NetCore.FixEngine.Session.Util; using System; using System.Collections.Generic; using System.Collections.Immutable; using System.Globalization; using System.IO; using System.Text; namespace Epam.FixAntenna.NetCore.FixEngine.Manager { /// <summary> /// This singleton contains all registered FIX sessions in this application instance. /// Each class in the list backed up by the WeakReference /// so it may return bogus values for sessions that just expired and /// were removed from the list because of that. /// SessionState will be returned as DEAD. /// </summary> /// <seealso cref="SessionState.Dead">for such sessions.></seealso> internal class FixSessionManager { private static readonly ILog Log = LogFactory.GetLog(typeof(FixSessionManager)); private readonly RarelyChangeList<IFixSessionListListener> _listenersList = new RarelyChangeList<IFixSessionListListener>(); private readonly CopyOnEditArrayList<IExtendedFixSession> _sessions; private bool _running; /// <summary> /// Gets the <see cref="FixSessionManager"/> instance. /// </summary> public static FixSessionManager Instance { get; } = new FixSessionManager(); private FixSessionManager() { _sessions = new CopyOnEditArrayList<IExtendedFixSession>(); _running = true; AppDomain.CurrentDomain.ProcessExit += CurrentDomain_ProcessExit; } private void CurrentDomain_ProcessExit(object sender, EventArgs e) { Shutdown(); } /// <summary> /// Initialize new FixSessionManager. /// </summary> public static void Init() { PrintEnvironment(); } /// <summary> /// Shutdown the FixAntenna engine, i.e. FixSessionManager, LicenseManager, SchedulerManager. /// </summary> /// <remarks> /// FixSessionManager shutting down automatically when the application exits. /// Use this method only if you need to force shutdown the engine. /// </remarks> public void Shutdown() { if (_running) { _running = false; //TODO: shudown schedulers? NLog.LogManager.Shutdown(); } } private static void PrintEnvironment() { var strBuilder = new StringBuilder("System properties:\n"); var p = GetProperties(); foreach (var prop in p) { strBuilder.AppendFormat(CultureInfo.InvariantCulture, "{0} = {1}\n", prop.Key, prop.Value); } Log.Info(strBuilder.ToString()); } /// <summary> /// Returns some of system and environment properties for diagnostic. /// </summary> /// <returns></returns> private static Dictionary<string, string> GetProperties() { return new Dictionary<string, string> //TODO: casing, review list of properties { {"file.separator", Path.DirectorySeparatorChar.ToString(CultureInfo.InvariantCulture)}, {"clr.version", Environment.Version.ToString()}, {"os.arch", System.Runtime.InteropServices.RuntimeInformation.OSArchitecture.ToString()}, {"os.name", Environment.OSVersion.Platform.ToString()}, {"os.version", Environment.OSVersion.Version.ToString()}, {"user.dir", Environment.CurrentDirectory}, {"user.home", Environment.GetFolderPath(Environment.SpecialFolder.UserProfile)}, {"user.name", Environment.UserName} }; } /// <summary> /// Registers a new <c>session</c> /// </summary> /// <param name="session"> </param> public void RegisterFixSession(IExtendedFixSession session) { var sessionId = session.Parameters.SessionId.ToString(); if (Exists(sessionId)) { throw new DuplicateSessionException("Session already exists. Duplicate sessionID: " + sessionId); } lock (_sessions) { _sessions.Add(session); //TODO: check task execution?? } NotifySessionAdd(session); } /// <summary> /// Removes the <c>session</c>. /// </summary> /// <param name="session"> </param> public void RemoveFixSession(IExtendedFixSession session) { bool result; lock (_sessions) { result = _sessions.Remove(session); } if (result) { if (Log.IsInfoEnabled) { Log.Info("Session disposed: " + session); } NotifySessionRemoved(session); } } /// <summary> /// Remove all sessions. /// </summary> public void RemoveAllSessions() { var copy = _sessions.RemoveAll(); for (var i = 0; i < copy.Count; i++) { var session = copy[i]; if (Log.IsInfoEnabled) { Log.Info("Session disposed: " + session); } NotifySessionRemoved(session); } } /// <summary> /// Finds the session by sessionID. /// </summary> /// <param name="sessionId"> the unique session identifier </param> public IExtendedFixSession Locate(string sessionId) { var readCopy = _sessions.GetReadOnlyCopy(); for (var i = 0; i < readCopy.Count; i++) { var session = readCopy[i]; var parameters = session.Parameters; if (parameters.SessionId.ToString().Equals(sessionId, StringComparison.Ordinal)) { return session; } } return null; } /// <summary> /// Finds the session by sessionID. /// </summary> /// <param name="sessionId"> the unique session identifier </param> public IExtendedFixSession Locate(SessionId sessionId) { var readCopy = _sessions.GetReadOnlyCopy(); for (var i = 0; i < readCopy.Count; i++) { var session = readCopy[i]; if (session.Parameters.SessionId.Equals(sessionId)) { return session; } } return null; } public IExtendedFixSession Locate(SessionParameters @params) { return Locate(@params.SessionId.ToString()); } /// <summary> /// TODO: implement ASAP /// </summary> /// <param name="senderComId"> </param> /// <param name="senderSubId"> </param> /// <param name="senderLocationId"> </param> /// <param name="targetCompId"> </param> /// <param name="targetSubId"> </param> /// <param name="targetLocationId"> /// @return </param> public IExtendedFixSession Locate(string senderComId, string senderSubId, string senderLocationId, string targetCompId, string targetSubId, string targetLocationId) { return LocateFirst(senderComId, targetCompId); } /// <summary> /// Finds the session with senderCompID and targetCompID. It's possible that can be several such sessions /// but method will return randomly first. /// </summary> /// <param name="senderCompId"> the sender comp id </param> /// <param name="targetCompId"> the target comp id </param> public IExtendedFixSession LocateFirst(string senderCompId, string targetCompId) { var readCopy = _sessions.GetReadOnlyCopy(); for (var i = 0; i < readCopy.Count; i++) { var session = readCopy[i]; var parameters = session.Parameters; if (parameters.SenderCompId.Equals(senderCompId) && parameters.TargetCompId.Equals(targetCompId)) { return session; } } return null; } /// <summary> /// Returns true if session with senderCompID and targetCompID exists. /// </summary> /// <param name="sessionId"> the unique session identifier </param> public bool Exists(string sessionId) { var readCopy = _sessions.GetReadOnlyCopy(); for (var i = 0; i < readCopy.Count; i++) { var session = readCopy[i]; var parameters = session.Parameters; if (parameters.SessionId.ToString().Equals(sessionId, StringComparison.Ordinal)) { return true; } } return false; } /// <summary> /// Gets cloned list of sessions. /// </summary> public IList<IExtendedFixSession> SessionListCopy => _sessions.GetReadOnlyCopy(); public int SessionsCount => _sessions.Count; /// <summary> /// Register client <see cref="IFixSessionListListener"/>. /// </summary> /// <param name="listener"> </param> public void RegisterSessionManagerListener(IFixSessionListListener listener) { lock (_listenersList) { if (!_listenersList.Contain(listener)) { _listenersList.Add(listener); } } } /// <summary> /// Unregister client <see cref="IFixSessionListListener"/>. /// </summary> /// <param name="listener"> </param> public void UnregisterSessionManagerListener(IFixSessionListListener listener) { lock (_listenersList) { _listenersList.Remove(listener); } } public void NotifySessionAdd(IExtendedFixSession fixSession) { IList<IFixSessionListListener> readOnlyListeners; lock (_listenersList) { readOnlyListeners = _listenersList.ReadOnlyCopy(); } for (var i = 0; i < readOnlyListeners.Count; i++) { try { readOnlyListeners[i].OnAddSession(fixSession); } catch (Exception e) { Log.Error("Error on call onAddSession. Cause: " + e.Message, e); } } lock (_listenersList) { _listenersList.ReleaseCopy(readOnlyListeners); } } public void NotifySessionRemoved(IExtendedFixSession fixSession) { IList<IFixSessionListListener> readOnlyListeners; lock (_listenersList) { readOnlyListeners = _listenersList.ReadOnlyCopy(); } for (var i = 0; i < readOnlyListeners.Count; i++) { try { readOnlyListeners[i].OnRemoveSession(fixSession); } catch (Exception e) { Log.Error("Error on call onRemoveSession. Cause: " + e.Message, e); } } lock (_listenersList) { _listenersList.ReleaseCopy(readOnlyListeners); } } /// <summary> /// Methods close all sessions. /// <p/> /// <p/> /// </summary> public static void CloseAllSession() { var fixSessions = Instance._sessions.GetReadOnlyCopy(); for (var i = 0; i < fixSessions.Count; i++) { CloseSession(fixSessions[i]); } } private static void CloseSession(IExtendedFixSession session) { try { session.Disconnect(DisconnectReason.GetDefault().ToString()); } catch (Exception) { Log.Trace("Exception while closing session."); } } /// <summary> /// Methods dispose all sessions. /// <p/> /// <p/> /// </summary> public static void DisposeAllSession() { var fixSessions = Instance._sessions.GetReadOnlyCopy(); for (var i = 0; i < fixSessions.Count; i++) { Dispose(fixSessions[i]); } } private static void Dispose(IExtendedFixSession session) { try { session.Dispose(); } catch (Exception) { Log.Trace("Exception while disposing session."); } } /// <exception cref="IOException"> </exception> public static bool ResetSeqNums(SessionParameters @params) { var result = true; var manager = Instance; var session = manager.Locate(@params); if (session != null) { //FIX session exist session.ResetSequenceNumbers(); } else { var storageFactory = ReflectStorageFactory.CreateStorageFactory(@params.Configuration); var persistedParams = new SessionParameters(); var runtimeState = new FixSessionRuntimeState(); persistedParams.FromProperties(@params.ToProperties()); if (!storageFactory.LoadSessionParameters(persistedParams, runtimeState)) { Log.Debug("Session " + @params.SessionId + " not fount. Init properties for this session."); result = false; } persistedParams.IncomingSequenceNumber = 1; persistedParams.OutgoingSequenceNumber = 1; runtimeState.InSeqNum = 1; runtimeState.OutSeqNum = 1; runtimeState.LastProcessedSeqNum = 0; storageFactory.SaveSessionParameters(persistedParams, runtimeState); } return result; } /// <exception cref="IOException"> </exception> public static bool SetSeqNums(SessionParameters @params, int inSeqNum, int outSeqNum) { var result = true; var manager = Instance; var session = manager.Locate(@params); if (session != null) { session.SetSequenceNumbers(inSeqNum, outSeqNum); } else { var storageFactory = ReflectStorageFactory.CreateStorageFactory(@params.Configuration); var persistedParams = new SessionParameters(); persistedParams.FromProperties(@params.ToProperties()); var runtimeState = new FixSessionRuntimeState(); if (!storageFactory.LoadSessionParameters(persistedParams, runtimeState)) { Log.Debug("Session " + @params.SessionId + " not fount. Init properties for this session."); result = false; } if (inSeqNum >= 0) { persistedParams.IncomingSequenceNumber = inSeqNum; runtimeState.InSeqNum = inSeqNum; } if (outSeqNum >= 0) { persistedParams.OutgoingSequenceNumber = outSeqNum; runtimeState.OutSeqNum = outSeqNum; } storageFactory.SaveSessionParameters(persistedParams, runtimeState); } return result; } /// <summary> /// This class contains two copies of the lists. /// </summary> internal class RarelyChangeList<TE> { internal IList<TE> List = new List<TE>(); internal IList<TE> ReadOnlyCopyConflict = new List<TE>(); internal int CountUsers; /// <summary> /// Remove item from list and update(or create if the update is not possible) ReadOnly copy /// This method is not thread safe. /// </summary> public virtual bool Remove(TE item) { var isRemoved = List.Remove(item); if (isRemoved) { if (CountUsers == 0) { // nobody uses. we can modify ReadOnlyCopyConflict.Remove(item); } else { // this copy used now. create new copy ReadOnlyCopyConflict = new List<TE>(List); // reset count for new copy CountUsers = 0; } } return isRemoved; } /// <summary> /// Added item to list and update(or create if the update is not possible) ReadOnly copy /// This method is not thread safe. /// </summary> /// <param name="item"> </param> public virtual void Add(TE item) { List.Add(item); if (CountUsers == 0) { // nobody uses. we can modify ReadOnlyCopyConflict.Add(item); } else { // this copy used now. create new copy ReadOnlyCopyConflict = new List<TE>(List); } } /// <summary> /// Return ReadOnly copy of original list. /// Release the list after use to work properly. /// This method is not thread safe. /// </summary> public virtual IList<TE> ReadOnlyCopy() { CountUsers++; return ReadOnlyCopyConflict; } /// <summary> /// Release the list after use to work properly. /// This method is not thread safe. </summary> /// <param name="copy"> </param> public virtual void ReleaseCopy(IList<TE> copy) { if (ReadOnlyCopyConflict == copy) { // this is same copy CountUsers--; } } public virtual bool Contain(TE item) { return List.Contains(item); } } private class CopyOnEditArrayList<TE> { private bool _changed; private readonly object _lock = false; private readonly IList<TE> _list = new List<TE>(50); private IList<TE> _readOnlyCopy = new List<TE>(50); /// <summary> /// Remove item from list and set 'true' change flag. /// This method is not thread safe. /// </summary> public bool Remove(TE item) { var isRemoved = _list.Remove(item); if (isRemoved) { lock (_lock) { _changed = true; } } return isRemoved; } /// <summary> /// Add item to list and set 'true' change flag. /// This method is not thread safe. /// </summary> /// <param name="item"> </param> public void Add(TE item) { _list.Add(item); lock (_lock) { _changed = true; } } /// <summary> /// /// </summary> /// <returns> read only copy of original list </returns> public IList<TE> GetReadOnlyCopy() { if (_changed) { lock (_lock) { if (!_changed) { return _readOnlyCopy; } _readOnlyCopy = _list.ToImmutableList(); _changed = false; } } return _readOnlyCopy; } /// <summary> /// Clears content of the list. /// </summary> /// <returns>Read only copy of the list before clearing.</returns> public IList<TE> RemoveAll() { var result = GetReadOnlyCopy(); _list.Clear(); lock (_lock) { _changed = true; } return result; } /// <summary> /// Returns true if the list is empty. /// </summary> public bool IsEmpty => _list.Count == 0; /// <summary> /// Gets count of elements in the list. /// </summary> public int Count => _list.Count; } } }