FixAntenna/NetCore/FixEngine/Session/InitiatorFixSession.cs (338 lines of code) (raw):

// Copyright (c) 2021 EPAM Systems // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using System; using System.IO; using System.Threading; using System.Threading.Tasks; using Epam.FixAntenna.NetCore.Configuration; using Epam.FixAntenna.NetCore.FixEngine.Manager; using Epam.FixAntenna.NetCore.FixEngine.Scheduler; using Epam.FixAntenna.NetCore.FixEngine.Scheduler.Tasks; using Epam.FixAntenna.NetCore.FixEngine.Session.MessageHandler; using Epam.FixAntenna.NetCore.FixEngine.Transport; namespace Epam.FixAntenna.NetCore.FixEngine.Session { internal class InitiatorFixSession : AbstractFixSession, IScheduledFixSession { private ISessionTransportFactory TransportFactory { get; } /// <summary> /// Creates the <c>InitiatorFIXSession</c>. /// </summary> public InitiatorFixSession(IFixMessageFactory factory, SessionParameters sessionParameters, HandlerChain fixSessionListener) : base(factory, sessionParameters, fixSessionListener) { TransportFactory = new DefaultSessionTransportFactory(); } /// <summary> /// Gets transport. /// </summary> /// <param name="host"> the host </param> /// <param name="port"> the port </param> /// <param name="sessionParameters"> the session parameters </param> protected virtual IFixTransport GetTransport(string host, int port, SessionParameters sessionParameters) { if (Log.IsDebugEnabled) { Log.Debug($"Getting transport for: {{host - {host}, port - {port}}}"); } return TransportFactory.CreateInitiatorTransport(host, port, sessionParameters); } /// <summary> /// Returns true, if session is in active state. /// </summary> protected bool IsActive { get; private set; } /// <inheritdoc /> public override void Init() { lock (SessionLock) { InitTransport(); base.Init(); } } /// <inheritdoc /> public override void Connect() { CheckForDisposed(); lock (SessionLock) { CheckForActiveAndInitTransport(); ConnectInternal(); } } public override async Task ConnectAsync() { CheckForDisposed(); CheckForActiveAndInitTransport(); await ConnectInternalAsync().ConfigureAwait(false); } private void CheckForDisposed() { if (SessionState.IsDisposed(SessionState)) { //re-register FIX session again FixSessionManager.Instance.RegisterFixSession(this); Init(); } } private void CheckForActiveAndInitTransport() { var currentState = SessionState; if (!(SessionState.IsDisposed(currentState) || SessionState.IsDisconnected(currentState))) { throw new InvalidOperationException($"Session is alive. Current session state: {currentState}"); } InitTransport(); SessionState = SessionState.Connecting; } /// <inheritdoc /> public override SessionState SessionState { set { base.SessionState = value; if (value == SessionState.WaitingForLogon) { StartLogonResponseWaiter(); } else if (value == SessionState.Connected) { CancelLogonResponseWaiter(); } } } private LogonResponseWaiter _logonResponseWaiter; private void StartLogonResponseWaiter() { CancelLogonResponseWaiter(); var logonWaitTimeout = ConfigAdapter.LogonWaitTimeout; _logonResponseWaiter = new LogonResponseWaiter(this, (int)logonWaitTimeout); _logonResponseWaiter.Start(); } private void CancelLogonResponseWaiter() { if (_logonResponseWaiter != null) { _logonResponseWaiter.Cancel(); _logonResponseWaiter.Dispose(); _logonResponseWaiter = null; } } /// <summary> /// Unsupported for InitiatorFIXSession. Always throws IllegalStateException. /// </summary> /// <exception cref="InvalidOperationException"> </exception> /// <seealso cref="IFixSession.Reject(string)"></seealso> public override void Reject(string reason) { throw new InvalidOperationException("Initiator session cannot be rejected. It should be disconnected instead of."); } protected virtual void ConnectInternal() { IsActive = true; try { ((IOutgoingFixTransport)Transport).Open(); SessionState = SessionState.WaitingForLogon; PrepareForConnect(); StartSession(); } catch (IOException e) { LastDisconnectReason = DisconnectReason.InitConnectionProblem; SessionState = SessionState.DisconnectedAbnormally; RemoveAttribute(ExtendedFixSessionAttribute.IsSendResetSeqNum.Name); if (Log.IsDebugEnabled) { Log.Debug("Connect failed", e); } else { Log.Warn($"Connect failed: {e.Message}"); } throw; } catch (Exception e) { LastDisconnectReason = DisconnectReason.InitConnectionProblem; SessionState = SessionState.DisconnectedAbnormally; if (Log.IsDebugEnabled) { Log.Debug("Session startup failed", e); } else { Log.Warn($"Session startup failed: {e.Message}"); } throw new Exception("Session startup failed", e); } } protected virtual async Task ConnectInternalAsync() { IsActive = true; try { await ((IOutgoingFixTransport)Transport).OpenAsync().ConfigureAwait(false); SessionState = SessionState.WaitingForLogon; PrepareForConnect(); StartSession(); } catch (IOException e) { LastDisconnectReason = DisconnectReason.InitConnectionProblem; SessionState = SessionState.DisconnectedAbnormally; if (Log.IsDebugEnabled) { Log.Debug("Connect failed", e); } else { Log.Warn($"Connect failed: {e.Message}"); } throw; } catch (Exception e) { LastDisconnectReason = DisconnectReason.InitConnectionProblem; SessionState = SessionState.DisconnectedAbnormally; if (Log.IsDebugEnabled) { Log.Debug("Session startup failed", e); } else { Log.Warn($"Session startup failed: {e.Message}"); } throw new Exception("Session startup failed", e); } } public virtual void InitTransport() { if (Transport == null) { Transport = GetTransport(Parameters.Host, Parameters.Port.Value, Parameters); } } /// <inheritdoc /> public override void Disconnect(string reason) { IsActive = false; base.Disconnect(reason); } public override void Dispose() { IsActive = false; base.Dispose(); } private sealed class LogonResponseWaiter : IDisposable { private readonly InitiatorFixSession _session; private readonly int _timeout; private readonly CountdownEvent _countDownLatch; private readonly Thread _worker; private bool _canceled; public LogonResponseWaiter(InitiatorFixSession session, int logonWaitTimeout) { _session = session; _timeout = logonWaitTimeout; _countDownLatch = new CountdownEvent(1); _worker = new Thread(Run) { Name = nameof(LogonResponseWaiter) }; } public void Start() { _worker.Start(); } private void Run() { if (_session.Log.IsTraceEnabled) { _session.Log.Trace("Starting..."); } try { _countDownLatch.Wait(_timeout); if (!_canceled && _session.SessionState == SessionState.WaitingForLogon) { //non gracefully disconnect session _session.Disconnect(DisconnectReason.NoAnswer, $"Logon response wasn't received during {_timeout} ms", false, false); } } catch (ThreadInterruptedException) { _session.Log.Warn($"Logon response waiter was interrupted. Session {GetSessionName()}"); } } public string GetSessionName() { var @params = _session.Parameters; return @params.SessionId.ToString(); } public void Cancel() { _canceled = true; _countDownLatch.Signal(); } private void Dispose(bool disposing) { if (disposing) { _countDownLatch.Dispose(); } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } } #region IScheduledFixSession implementation public void Schedule() { VerifySessionState(); if (Log.IsDebugEnabled) { Log.Debug($"Init session scheduler: {Parameters}"); } var startTimeExpr = ConfigAdapter.TradePeriodBegin; var stopTimeExpr = ConfigAdapter.TradePeriodEnd; var timeZone = ConfigAdapter.TradePeriodTimeZone; ValidateCronExpression(startTimeExpr, Config.TradePeriodBegin); ValidateCronExpression(stopTimeExpr, Config.TradePeriodEnd); if (startTimeExpr != null) { if (Log.IsTraceEnabled) { Log.Trace($"Add 'start' task {startTimeExpr} {timeZone.Id}: {Parameters.SessionId}"); } Scheduler.ScheduleCronTask<InitiatorSessionStartTask>(startTimeExpr, timeZone); } if (stopTimeExpr != null) { if (Log.IsTraceEnabled) { Log.Trace($"Add 'stop' task {stopTimeExpr} {timeZone.Id}: {Parameters.SessionId}"); } Scheduler.ScheduleCronTask<InitiatorSessionStopTask>(stopTimeExpr, timeZone); } if (startTimeExpr == null || stopTimeExpr == null) return; var schedule = new Schedule(startTimeExpr, stopTimeExpr, timeZone); if (!CanStartScheduledSession(schedule)) return; lock (SessionLock) { if (CanStartScheduledSession(schedule)) { Connect(); } } } private bool CanStartScheduledSession(Schedule schedule) { var isDisconnected = SessionState.IsDisconnected(SessionState); var isInsideInterval = schedule.IsInsideInterval(DateTimeOffset.UtcNow); return isDisconnected && isInsideInterval; } public void Deschedule() { if (Log.IsDebugEnabled) { Log.Debug($"Cancel session scheduler: {Parameters}"); } if (Scheduler == null) return; if (Scheduler.IsTaskScheduled<InitiatorSessionStartTask>()) { Log.Trace($"Cancel start session task: {Parameters.SessionId}"); Scheduler.DescheduleTask<InitiatorSessionStartTask>(); } if (Scheduler.IsTaskScheduled<InitiatorSessionStopTask>()) { Log.Trace($"Cancel stop session task: {Parameters.SessionId}"); Scheduler.DescheduleTask<InitiatorSessionStopTask>(); } } private void ValidateCronExpression(string timeExpr, string configParameterName) { if (timeExpr == null) return; if (MultipartCronExpression.IsValidCronExpression(timeExpr)) return; var message = $"{configParameterName} expression is invalid: {timeExpr}"; Log.Error(message); throw new ArgumentException(message); } #endregion } }