FixAntenna/NetCore/FixEngine/iLinkFailover/MsgwFixSession.cs (233 lines of code) (raw):

// Copyright (c) 2021 EPAM Systems // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using System; using System.IO; using System.Threading; using System.Threading.Tasks; using Epam.FixAntenna.NetCore.Common.Utils; using Epam.FixAntenna.NetCore.FixEngine.Session; using Epam.FixAntenna.NetCore.FixEngine.Session.MessageHandler; using Epam.FixAntenna.NetCore.FixEngine.Transport; using Epam.FixAntenna.NetCore.Message; namespace Epam.FixAntenna.NetCore.FixEngine.iLinkFailover { internal class MsgwFixSession : AutoreconnectFixSession { private TransportWrapper _transportWrapper; private volatile bool _switching; public MsgwFixSession(IFixMessageFactory factory, SessionParameters sessionParameters, HandlerChain fixSessionListener) : base(factory, sessionParameters, fixSessionListener) { _transportWrapper = new TransportWrapper(this); } public override void Connect() { ResetToUndefinedFTI(); base.Connect(); } public virtual void SwitchFromBackup(MsgwFixSession backupSession, FixMessage message) { Log.Debug("switching to new primary session(" + Parameters.SessionId + ")..."); // to be thread safe we switch transport in this thread var cloneMsg = (FixMessage)message.Clone(); var exPrimaryTransport = GetTransport(); var newPrimaryTransport = backupSession.GetTransport(); backupSession.SetTransport(exPrimaryTransport); // this is thread of backup reader session. mark backup session as shutdown to be sure this thread not read new messages backupSession.MarkShutdownAsGraceful(); backupSession.Shutdown(DisconnectReason.ClosedByCounterparty, true); // release this session's thread. and start new thread to switching to new Primary var thread = new Thread(() => { try { _switching = true; MarkShutdownAsGraceful(); Shutdown(DisconnectReason.ClosedByCounterparty, true); SetTransport(newPrimaryTransport, cloneMsg); Connect(); SessionState = SessionState.Connected; } catch (IOException e) { if (Log.IsDebugEnabled) { Log.Warn(e.Message, e); } else { Log.Warn(e.Message); } } finally { _switching = false; } Log.Debug("switched"); }) { Name = Parameters.SessionId + "-switcher" }; thread.Start(); } public virtual void SetTransportFrom(MsgwFixSession backupSession) { Log.Info("Changing transport of primary session " + Parameters.SessionId + " to: " + backupSession.GetTransport().RemoteHost); var exPrimaryTransport = GetTransport(); var newPrimaryTransport = backupSession.GetTransport(); backupSession.SetTransport(exPrimaryTransport); var thread = new Thread(() => { try { Log.Debug("shutdown primary session"); MarkShutdownAsGraceful(); Shutdown(DisconnectReason.ClosedByCounterparty, true); var seqNumOut = RuntimeState.OutSeqNum - 1; Log.Debug("revert SeqNum to " + seqNumOut); SequenceManager.ApplyOutSeqnum(seqNumOut); Log.Debug("connect primary session with new transport"); SetTransport(newPrimaryTransport); Connect(); Log.Debug("primary session has been connected"); } catch (IOException e) { if (Log.IsDebugEnabled) { Log.Warn("Exception while switching transport or connection primary session: " + e.Message, e); } else { Log.Warn("Exception while switching transport or connection primary session: " + e.Message); } } }) { Name = Parameters.SessionId + "-starter" }; thread.Start(); } private void ResetToUndefinedFTI() { var sessionParameters = Parameters; var senderCompId = sessionParameters.SenderCompId; if (senderCompId.EndsWith(Fti.Backup.GetValue(), StringComparison.Ordinal) || senderCompId.EndsWith(Fti.Primary.GetValue(), StringComparison.Ordinal)) { sessionParameters.SenderCompId = senderCompId.Substring(0, senderCompId.Length - 1) + Fti.Undefined.GetValue(); MessageFactory.SetSessionParameters(sessionParameters); } } public virtual bool IsSwitching => _switching; protected override IFixTransport GetTransport(string host, int port, SessionParameters sessionParameters) { _transportWrapper.Transport = base.GetTransport(host, port, sessionParameters); return _transportWrapper; } public virtual IFixTransport GetTransport() { InitTransport(); return _transportWrapper.Transport; } public override void PrepareForConnect() { if (IsSwitching) { // do not send logon msg. just init Init(); } else { base.PrepareForConnect(); } } public virtual void SetTransport(IFixTransport transport) { SetTransport(transport, null); } public virtual void SetTransport(IFixTransport transport, FixMessage nextMessage) { _transportWrapper.NextMessage = nextMessage; _transportWrapper.Transport = transport; } internal class TransportWrapper : IFixTransport, IOutgoingFixTransport { private readonly MsgwFixSession _session; public TransportWrapper(MsgwFixSession session) { _session = session; } internal FixMessage NextMessage { get; set; } public virtual IFixTransport Transport { get; set; } public virtual bool IsBlockingSocket => Transport.IsBlockingSocket; public virtual void ReadMessage(MsgBuf buf) { try { if (NextMessage == null) { Transport.ReadMessage(buf); } else { SetMessageToBuffer(buf); } } catch (IOException) { if (NextMessage == null) { throw; } else { // ignore Exception SetMessageToBuffer(buf); } } } public virtual void SetMessageToBuffer(MsgBuf buf) { var rawMsg = NextMessage.AsByteArray(); buf.Buffer = rawMsg; buf.Length = rawMsg.Length; buf.Offset = 0; buf.FixMessage.Add(NextMessage); NextMessage = null; } public virtual void Write(byte[] message) { Transport.Write(message); } public virtual int Write(byte[] message, int offset, int length) { return Transport.Write(message, offset, length); } public virtual int Write(ByteBuffer buf, int offset, int length) { return Transport.Write(buf, offset, length); } public virtual void WaitUntilReadyToWrite() { Transport.WaitUntilReadyToWrite(); } public virtual void Close() { Transport.Close(); } public int OptimalBufferSize => Transport.OptimalBufferSize; public string RemoteHost => Transport.RemoteHost; public virtual void Open() { if (!_session.IsSwitching) { ((IOutgoingFixTransport)Transport).Open(); } else { _session.Log.Debug("ignore the opening of transport - switching in progress"); } } public async Task OpenAsync() { if (!_session.IsSwitching) { await ((IOutgoingFixTransport)Transport).OpenAsync().ConfigureAwait(false); } else { _session.Log.Debug("ignore the opening of transport - switching in progress"); } } } } }