FixAntenna/NetCore/FixEngine/Session/AutoreconnectFixSession.cs (367 lines of code) (raw):

// Copyright (c) 2021 EPAM Systems // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using System; using System.Collections.Generic; using System.IO; using System.Net; using System.Threading; using System.Threading.Tasks; using Epam.FixAntenna.NetCore.Configuration; using Epam.FixAntenna.NetCore.FixEngine.Manager; using Epam.FixAntenna.NetCore.FixEngine.Session.MessageHandler; using Epam.FixAntenna.NetCore.Message; namespace Epam.FixAntenna.NetCore.FixEngine.Session { internal class AutoreconnectFixSession : InitiatorFixSession, IBackupFixSession { private static readonly int NoAutoreconnect = int.Parse(Config.NoAutoreconnect); private readonly List<DnsEndPoint> _destinations; private volatile Thread _connectThread; private int _destinationIndex; private int _attempt; private readonly bool _enableAutoSwitchToBackupConnection; private readonly bool _cyclicSwitchBackupConnection; private readonly bool _resetBackup; private readonly bool _resetPrimary; private bool _requestedSwitch; private readonly int _maxAttempts; private readonly int _delay; private readonly object _sync = new object(); public AutoreconnectFixSession(IFixMessageFactory factory, SessionParameters sessionParameters, HandlerChain fixSessionListener) : base(factory, sessionParameters, fixSessionListener) { _destinations = new List<DnsEndPoint>(); if (sessionParameters.Host != null && sessionParameters.Port > 0) { _destinations.Add(new DnsEndPoint(sessionParameters.Host, sessionParameters.Port.Value)); } _destinations.AddRange(sessionParameters.Destinations); _maxAttempts = ConfigAdapter.GetAutoReconnectAttempts(0); _delay = ConfigAdapter.AutoReconnectDelay; _enableAutoSwitchToBackupConnection = ConfigAdapter.IsAutoSwitchToBackupConnectionEnabled; _cyclicSwitchBackupConnection = ConfigAdapter.IsCyclicSwitchBackupConnectionEnabled; _resetBackup = ConfigAdapter.IsResetOnSwitchToBackupEnabled; _resetPrimary = ConfigAdapter.IsResetOnSwitchToPrimaryEnabled; } /// <inheritdoc /> public override void InitTransport() { if (Transport == null) { if (_destinations.Count == 0) { throw new InvalidOperationException( "Endpoint configuration is wrong or missing. Check Port and Host settings."); } var initDestination = _destinations[0]; Transport = GetTransport(initDestination.Host, initDestination.Port, Parameters); } base.InitTransport(); } /// <inheritdoc /> public override void SetFixSessionListener(IFixSessionListener listener) { var clientExtendedListener = listener is IExtendedFixSessionListener sessionListener ? sessionListener : null; var sessionHandler = new ExtendedListenerImpl(this, listener, clientExtendedListener); base.SetFixSessionListener(sessionHandler); } private sealed class ExtendedListenerImpl : IExtendedFixSessionListener { private readonly AutoreconnectFixSession _session; private readonly IFixSessionListener _listener; private readonly IExtendedFixSessionListener _clientExtendedListener; private readonly object _sync = new object(); public ExtendedListenerImpl(AutoreconnectFixSession session, IFixSessionListener listener, IExtendedFixSessionListener clientExtendedListener) { _session = session; _listener = listener; _clientExtendedListener = clientExtendedListener; } /// <inheritdoc /> public void OnMessageReceived(MsgBuf msgBuf) { _clientExtendedListener?.OnMessageReceived(msgBuf); } /// <inheritdoc /> public void OnMessageSent(byte[] bytes, int offset, int length) { _clientExtendedListener?.OnMessageSent(bytes, offset, length); } public void OnSessionStateChange(SessionState sessionState) { _listener.OnSessionStateChange(sessionState); if (_session.IsActive && sessionState == SessionState.DisconnectedAbnormally) { if (_session._attempt < _session._maxAttempts || _session._maxAttempts == 0) { var previousSessionState = _session.SessionState; if (previousSessionState != SessionState.Dead) { //if user disposed session in previous callback lock (_sync) { _session.SessionState = SessionState.Reconnecting; _session.AutoChangeDestination(); _session._attempt++; _session.StartConnecting(); } } } else { if (_session._maxAttempts != NoAutoreconnect) { _session.Log.Warn( $"Unable to connect with attempt: {_session._attempt} while maxAttempts: {_session._maxAttempts}"); } } } else { if (sessionState == SessionState.Connected) { _session._attempt = 0; } } } /// <inheritdoc /> public void OnNewMessage(FixMessage message) { _listener.OnNewMessage(message); } } /// <inheritdoc /> public override void Disconnect(DisconnectReason reasonType, string reasonDescription, bool isGracefull, bool isForced, bool continueReading) { base.Disconnect(reasonType, reasonDescription, isGracefull, isForced, continueReading); _connectThread?.Interrupt(); if (SessionState == SessionState.Reconnecting) { //we need to reset this state SessionState = SessionState.Disconnected; } } /// <inheritdoc /> public override void StartSession() { if (_requestedSwitch) { if (_destinationIndex == 0) { if (_resetPrimary) { if (Log.IsDebugEnabled) { Log.Debug("Resetting sequences for primary connection"); } try { SequenceManager.ResetSeqNumForNextConnect(); } catch (IOException e) { Log.Error("Can't reset sequence numbers before connect to primary host", e); } } } else if (_destinationIndex > 0 && _resetBackup) { if (Log.IsDebugEnabled) { Log.Debug("Resetting sequences for backup connection"); } try { SequenceManager.ResetSeqNumForNextConnect(); } catch (IOException e) { Log.Error("Can't reset sequence numbers before connect to primary host", e); } } _requestedSwitch = false; } base.StartSession(); } /// <inheritdoc /> protected override void ConnectInternal() { try { base.ConnectInternal(); } catch (IOException e) { Log.Warn(e.Message); //not rethrowing exception as connection might still be in progress on other destinations } } protected override async Task ConnectInternalAsync() { try { await base.ConnectInternalAsync().ConfigureAwait(false); } catch (IOException e) { Log.Warn(e.Message); //not rethrowing exception as connection might still be in progress on other destinations } } private void StartConnecting() { _connectThread = new Thread(() => { Sleep(); if (IsActive) { ConnectInternal(); } }) { Name = "AutoreconnectThread" }; _connectThread.Start(); } private void Sleep() { if (_delay <= 0) { return; } try { Thread.Sleep(_delay); } catch (Exception) { // An exception can be thrown as the result of the invoking "interrupt" from another thread. // Ignore. } } private void AutoChangeDestination() { lock (_sync) { if (_destinations.Count == 1) { // return if single destination return; } if (_destinationIndex == 0 && !_enableAutoSwitchToBackupConnection) { //enabled primary transport but disabled automatic switch to backup return; } var newIndex = _destinationIndex + 1; if (newIndex > _destinations.Count - 1) { newIndex = 0; } if (newIndex == 0 && !_cyclicSwitchBackupConnection) { //next transport is again primary but disabled automatic switch to primary return; } ChangeDestination(newIndex); } } private void ChangeDestination(int newIndex) { lock (_sync) { if (_destinationIndex == newIndex) { Log.Warn("Can't change transport destination cause there is only one destination"); return; } if (newIndex >= _destinations.Count || newIndex < 0) { Log.Warn("Invalid destination index"); return; } _destinationIndex = newIndex; var currentDestination = _destinations[_destinationIndex]; Transport = GetTransport(currentDestination.Host, currentDestination.Port, Parameters); Log.Info($"Transport destination has changed to host {currentDestination.Host}"); _requestedSwitch = true; } } /// <inheritdoc /> public void SwitchToBackUp() { if (IsRunningOnSingleBackup) { return; } if (SessionState.IsNotDisconnected(SessionState)) { // close session and all session object (pumper,queue ect) // after this call, in our session listener, // we must handled received state(DISCONNECTED_ABNORMALLY) Disconnect(DisconnectReason.UserRequest, "Manual switch to backup connection"); Shutdown(DisconnectReason.UserRequest, true); } else { //reset initialization in case if session was disconnected ResetInitialization(); } if (_delay > 0) { if (Log.IsDebugEnabled) { Log.Debug($"Wait {_delay} ms before establish backup connection"); } try { Thread.Sleep(_delay); } catch (Exception) { // do nothing } } Log.Debug("Manual switch to backup"); _attempt = 0; var newIndex = _destinationIndex + 1; if (newIndex > _destinations.Count - 1) { newIndex = 1; } ChangeDestination(newIndex); ConnectInternal(); //FIXME: if session is disposed on disconnect - session will be removed from manager. RegisterSessionIfAbsent(); } private void RegisterSessionIfAbsent() { var fixSessionManager = FixSessionManager.Instance; if (fixSessionManager.Locate(Parameters.SessionId) == null) { fixSessionManager.RegisterFixSession(this); } } /// <inheritdoc /> public void SwitchToPrimary() { if (!IsRunningOnBackup) { return; } // close session and all session object (pumper,queue ect) // after this call, in our session listener, // we must handled received state(DISCONNECTED_ABNORMALLY) Disconnect(DisconnectReason.UserRequest, "Manual switch to primary connection"); Shutdown(DisconnectReason.UserRequest, true); if (_delay > 0) { if (Log.IsDebugEnabled) { Log.Debug($"Wait {_delay} ms before establish primary connection"); } try { Thread.Sleep(_delay); } catch (Exception) { // do nothing } } Log.Debug("Manual switch to primary"); _attempt = 0; ChangeDestination(0); ConnectInternal(); //FIXME: if session is disposed on disconnect - session will be removed from manager. RegisterSessionIfAbsent(); } /// <inheritdoc /> public bool IsRunningOnBackup { get { lock (_sync) { return _destinationIndex != 0; } } } private bool IsRunningOnSingleBackup { get { lock (_sync) { return (_destinationIndex != 0) && (_destinations.Count == 2); } } } } }