FixAntenna/NetCore/FixEngine/Acceptor/FixConnectionHandler.cs (173 lines of code) (raw):

// Copyright (c) 2021 EPAM Systems // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using System; using System.Collections.Generic; using System.IO; using System.Threading; using Epam.FixAntenna.NetCore.Common; using Epam.FixAntenna.NetCore.Common.Logging; using Epam.FixAntenna.NetCore.Configuration; using Epam.FixAntenna.NetCore.FixEngine.Manager; using Epam.FixAntenna.NetCore.FixEngine.Session; using Epam.FixAntenna.NetCore.FixEngine.Transport; using Epam.FixAntenna.NetCore.Message; namespace Epam.FixAntenna.NetCore.FixEngine.Acceptor { internal class FixConnectionHandler : IConnectionHandler { private static readonly ILog Log = LogFactory.GetLog(typeof(FixConnectionHandler)); private readonly ISessionTransportFactory _transportFactory; private readonly LogonMessageParser _logonMessageParser; private int _logonWaitTimeout = 5000; // ms private readonly Config _configuration; private readonly SessionAcceptorStrategyHandler _sessionAcceptorStrategyHandler; private IConfiguredSessionRegister _configuredSessionRegister; public FixConnectionHandler(Config configuration, SessionAcceptorStrategyHandler acceptorStrategyHandler, ISessionTransportFactory transportFactory) { _configuration = configuration; _sessionAcceptorStrategyHandler = acceptorStrategyHandler; _logonMessageParser = new LogonMessageParser(); _transportFactory = transportFactory; } public virtual void SetFixServerListener(IFixServerListener listener) { _sessionAcceptorStrategyHandler.SetSessionListener(listener); } public virtual void SetTimeout(int timeout) { _logonWaitTimeout = timeout; } public virtual void SetConfiguredSessionRegister(IConfiguredSessionRegister configuredSessionRegister) { _configuredSessionRegister = configuredSessionRegister; } public virtual void RegisterAcceptorSession(SessionParameters sessionParameters) { _configuredSessionRegister.RegisterSession(sessionParameters); } public virtual void UnregisterAcceptorSession(SessionParameters sessionParameters) { _configuredSessionRegister.UnregisterSession(sessionParameters); } /// <inheritdoc /> public virtual void OnConnect(ITransport transport) { // // TODO the MAX_MESSAGE_SIZE and VALIDATE_CHECK_SUM should be taken from session parameters // int maxMessageSize = Configuration.getGlobalConfiguration().getPropertyAsInt(Configuration.MAX_MESSAGE_SIZE); // boolean validateCheckSum = Configuration.getGlobalConfiguration().getPropertyAsBoolean(Configuration.VALIDATE_CHECK_SUM, true); // final AcceptorFIXTransport fixTransport = new AcceptorFIXTransport(transport, maxMessageSize, validateCheckSum); var config = (Config)_configuration.Clone(); var fixTransport = _transportFactory.CreateAcceptorTransport(transport, config); try { var logon = new FixMessage(); var readLogonTimeTicks = WaitForLogon(fixTransport, logon); if (readLogonTimeTicks <= 0) { return; } if (!FixMessageUtil.IsLogon(logon)) { if (Log.IsInfoEnabled) { Log.Info( $"Initial message is not a logon: {logon.ToPrintableString()} received on local endpoint: {transport.LocalEndPoint.AsString()} from remote endpoint: {transport.RemoteEndPoint.AsString()}"); } if (Log.IsDebugEnabled) { Log.Debug($"logon variable:{logon.ToPrintableString()}"); } CloseTransport(fixTransport); return; } var parseResult = _logonMessageParser.ParseLogon(config, logon, transport.RemoteEndPoint.Address.AsString(), transport.LocalEndPoint.Port); var sessionParameters = parseResult.SessionParameters; sessionParameters.LogonReadTimeTicks = readLogonTimeTicks; if (Log.IsInfoEnabled) { Log.Info( $"Logon message received on local endpoint: {transport.LocalEndPoint.AsString()} from: {transport.RemoteEndPoint.AsString()} for sessionID: {sessionParameters.SessionId}"); } // check if secured connection required var isSslConfigured = sessionParameters.Configuration.GetPropertyAsBoolean(Config.RequireSsl); var isConnectionSecure = transport.IsSecured; if (isSslConfigured && !isConnectionSecure) { Log.Error($"Session {sessionParameters.SessionId} configured as secure, but connected on unsecured connection from remote endpoint {transport.RemoteEndPoint.AsString()}."); CloseTransport(fixTransport); return; } if (Log.IsDebugEnabled) { Log.Debug($"Parameters: {sessionParameters}"); Log.Debug($"App version: {sessionParameters.AppVersion}"); } _sessionAcceptorStrategyHandler.HandleIncomingConnection(sessionParameters, fixTransport); } catch (ThreadInterruptedException) { // do nothing Log.Trace("Thread interrupted."); } catch (DuplicateSessionException e) { if (Log.IsDebugEnabled) { Log.Warn("DuplicatedSessionException", e); } else { Log.Warn(e); } CloseTransport(fixTransport); } catch (Exception e) { Log.Error("Exception:", e); CloseTransport(fixTransport); } } private long WaitForLogon(IFixTransport fixTransport, FixMessage login) { var messageWaiter = new LogonMessageWaiter(fixTransport, _logonWaitTimeout, login); if (messageWaiter.IsLogonReceived()) { var readMessageTimeTicks = messageWaiter.ReadMessageTimeTicks; return readMessageTimeTicks > 0 ? readMessageTimeTicks : DateTimeHelper.CurrentTicks; } return -1; } private void CloseTransport(IFixTransport fixTransport) { try { fixTransport.Close(); } catch (IOException e) { if (Log.IsDebugEnabled) { Log.Warn($"Ignoring exception while closing transport: {e}", e); } else { Log.Warn($"Ignoring exception while closing transport: {e}"); } } } public virtual void Dispose() { try { _sessionAcceptorStrategyHandler.CloseAllRegisteredSessions(); } catch (IOException e) { if (Log.IsDebugEnabled) { Log.Warn($"Ignoring exception while closing session: {e}", e); } else { Log.Warn($"Ignoring exception while closing session: {e}"); } } } public virtual IList<SessionParameters> GetRegisterAcceptorSession() { return _sessionAcceptorStrategyHandler.ConfiguredSessionRegister.RegisteredSessions; } } }