FixAntenna/NetCore/FixEngine/FixServer.cs (307 lines of code) (raw):
// Copyright (c) 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Epam.FixAntenna.NetCore.Common.Logging;
using Epam.FixAntenna.NetCore.Configuration;
using Epam.FixAntenna.NetCore.FixEngine.Acceptor;
using Epam.FixAntenna.NetCore.FixEngine.Manager;
using Epam.FixAntenna.NetCore.FixEngine.Session.Util;
using Epam.FixAntenna.NetCore.FixEngine.Transport.Server;
using Epam.FixAntenna.NetCore.FixEngine.Transport.Server.Tcp;
namespace Epam.FixAntenna.NetCore.FixEngine
{
/// <summary>
/// Generic FIXServer implementation.
/// </summary>
public class FixServer
{
private static readonly ILog Log = LogFactory.GetLog(typeof(FixServer));
private readonly IConnectionHandler _connectionHandler;
private readonly string _acceptorStrategy;
private readonly IList<IFixServerStatusListener> _statusListeners = new List<IFixServerStatusListener>();
private IConnectionValidator _connectionValidator;
private protected readonly ConfigurationAdapter ConfigAdapter;
private IConfiguredSessionRegister ConfiguredSessionRegister;
/// <summary>
/// Servers bound to ports
/// </summary>
private Dictionary<int, IServer> _servers = new Dictionary<int, IServer>();
/// <summary>
/// Creates the fix server.
/// </summary>
public FixServer() : this(Config.GlobalConfiguration)
{
}
/// <summary>
/// Creates the fix server.
/// </summary>
public FixServer(Config conf)
{
ConfiguredSessionRegister = new ConfiguredSessionRegisterImpl();
ConfigAdapter = new ConfigurationAdapter(conf);
FixSessionManager.Init();
_acceptorStrategy = ConfigAdapter.ServerAcceptorStrategy;
_connectionHandler = GetConnectionHandler(conf);
var logonWaitTimeout = (int)ConfigAdapter.LogonWaitTimeout;
_connectionHandler.SetTimeout(logonWaitTimeout);
}
/// <summary>
/// Gets or sets the NIC (address) to listen to.
/// </summary>
public virtual string Nic { get; set; }
/// <summary>
/// Gets or sets the set of ports to listen to.
/// </summary>
public int[] Ports
{
get => _servers.Keys.Where(p => !ConfigAdapter.IsSslPort(p)).ToArray();
set
{
if (!IsStarted)
{
ConfigAdapter.Configuration.SetProperty(Config.Port, string.Join(",", value));
LoadPortsFromConfig();
}
else
{
Log.Error("Cannot set Ports while server(s) is running.");
}
}
}
/// <summary>
/// Gets or sets the set of secured ports to listen to.
/// </summary>
public int[] SslPorts
{
get => _servers.Keys.Where(p => ConfigAdapter.IsSslPort(p)).ToArray();
set
{
if (!IsStarted)
{
ConfigAdapter.Configuration.SetProperty(Config.SslPort, string.Join(",", value));
LoadPortsFromConfig();
}
else
{
Log.Error("Cannot set SslPorts while server(s) is running.");
}
}
}
private FixConnectionHandler GetConnectionHandler(Config configuration)
{
var connectionHandler = new FixConnectionHandler(configuration, GetAcceptorStrategy(configuration), GetTransportFactory(configuration));
connectionHandler.SetConfiguredSessionRegister(ConfiguredSessionRegister);
return connectionHandler;
}
private ISessionTransportFactory GetTransportFactory(Config configuration)
{
return new DefaultSessionTransportFactory();
}
private SessionAcceptorStrategyHandler GetAcceptorStrategy(Config configuration)
{
try
{
var type = Type.GetType(_acceptorStrategy) ?? throw new InvalidOperationException();
var acceptorStrategyHandler = (SessionAcceptorStrategyHandler)Activator.CreateInstance(type);
acceptorStrategyHandler.Init(configuration, ConfiguredSessionRegister);
return acceptorStrategyHandler;
}
catch (Exception e)
{
if (Log.IsErrorEnabled)
{
Log.Error("Can't initialize SessionAcceptorStrategyHandler. Maybe " + ConfigAdapter.ServerAcceptorStrategy + " parameter is wrong. " + e.Message);
}
throw new InvalidOperationException("Can't initialize SessionAcceptorStrategyHandler." + e, e);
}
}
/// <summary>
/// Replaces default TCPServer implementation with custom Server implementation.
/// </summary>
/// <param name="server"> implementation specified by user </param>
/// <seealso cref="IServer"> </seealso>
private void SetServer(IServer server)
{
_servers = new Dictionary<int, IServer> { { 0, server } };
}
internal void AddServer(int port, IServer server)
{
_servers.Add(port, server);
}
/// <summary>
/// Sets listener.
/// </summary>
/// <param name="listener"> - user specified listener </param>
public virtual void SetListener(IFixServerListener listener)
{
_connectionHandler.SetFixServerListener(listener);
}
public virtual void SetConnectionValidator(IConnectionValidator connectionValidator)
{
_connectionValidator = connectionValidator;
}
/// <summary>
/// Sets login timeout.
/// </summary>
/// <param name="loginWaitTimeout"> the login timeout in mils </param>
public virtual void SetLoginWaitTimeout(int loginWaitTimeout)
{
_connectionHandler.SetTimeout(loginWaitTimeout);
}
private int LoadPortsFromConfig()
{
_servers = new Dictionary<int, IServer>();
AddPorts(ConfigAdapter.Ports);
AddPorts(ConfigAdapter.SslPorts);
return _servers.Count;
}
private void AddPorts(IEnumerable<int> ports)
{
foreach (var port in ports)
{
if (_servers.ContainsKey(port))
{
Log.Warn($"Server on port {port} has been configured already. Configuration will be overriden.");
_servers[port] = new TcpServer(Nic, port, ConfigAdapter);
}
else
{
_servers.Add(port, new TcpServer(Nic, port, ConfigAdapter));
}
}
}
/// <summary>
/// Sets the only port to listen on.
/// </summary>
/// <param name="port"> port to listen on </param>
public virtual void SetPort(int port)
{
Ports = new []{port};
}
/// <summary>
/// Actually starts the servers.
/// </summary>
/// <returns> true if all servers started successfully
/// false - otherwise (server will add WARN messages with description of each problem to log) </returns>
/// <exception cref="IOException">
/// if unable to start the server at least on one port. In this case the first received exception will be thrown.
/// </exception>
/// <exception cref="InvalidOperationException">
/// if server port(s) were not set by configuration or one of those methods:
/// <see cref="SetPorts"/> or <see cref="SetPort"/> or <see cref="SetServer"/>
/// </exception>
public virtual bool Start()
{
if ((_servers == null || !_servers.Any()) && LoadPortsFromConfig() == 0)
{
throw new InvalidOperationException("Cannot start FixServer. Port(s) not configured.");
}
var startedSuccessfully = true;
var totalFail = true;
RegisterConfiguredSessions();
IOException ex = null;
foreach (var server in _servers)
{
try
{
StartServer(server.Value, server.Key);
totalFail = false;
}
catch (IOException e)
{
ex = e;
startedSuccessfully = false;
if (Log.IsDebugEnabled)
{
Log.Warn("Unable start server on port " + server.Key, e);
}
else
{
Log.Warn("Unable start server on port " + server.Key + ". " + e.Message);
}
}
}
if (totalFail && ex != null)
{
throw ex;
}
if (startedSuccessfully)
{
NotifyStarted();
}
return startedSuccessfully;
}
private void StartServer(IServer server, int port)
{
server.SetIncomingConnectionListener(GetConnectionListener());
server.Start();
if (Log.IsInfoEnabled)
{
Log.Info("Server started on port " + port + (ConfigAdapter.IsSslPort(port) ? " (secure)" : string.Empty));
}
}
private IConnectionListener GetConnectionListener()
{
IConnectionListener listener = _connectionHandler;
if (_connectionValidator != null)
{
listener = new ConnectionValidatorListener(_connectionHandler, _connectionValidator);
}
return listener;
}
public virtual void RegisterConfiguredSessions()
{
var parameters = string.IsNullOrEmpty(ConfigPath)
? SessionParametersBuilder.BuildAcceptorSessionParametersList()
: SessionParametersBuilder.BuildAcceptorSessionParametersList(ConfigPath);
foreach (var item in parameters.Values)
{
RegisterAcceptorSession(item);
}
}
/// <summary>
/// Stops the server.
/// </summary>
/// <exception cref="IOException"> if stop was unsuccessful </exception>
/// <exception cref="InvalidOperationException">
/// if servers were not set by one of those methods: <see cref="SetPorts"/> or <see cref="SetPort"/> or <see cref="SetServer"/>
/// </exception>
public virtual void Stop()
{
if (_servers == null || !_servers.Any())
{
throw new InvalidOperationException("Server is not set");
}
foreach (var server in _servers)
{
try
{
server.Value.Stop();
}
catch (IOException e)
{
if (Log.IsDebugEnabled)
{
Log.Warn("Problem with stopping server on port " + server.Key, e);
}
else
{
Log.Warn("Problem with stopping server on port " + server.Key + ". " + e.Message);
}
}
}
_connectionHandler.Dispose();
NotifyStopped();
}
/// <summary>
/// Register the session parameters of acceptor.
/// <para></para>
/// </summary>
/// <param name="sessionParameters"> the sessionParameters </param>
public virtual void RegisterAcceptorSession(SessionParameters sessionParameters)
{
if (sessionParameters.HasPort)
{
if (!ValidateAcceptorSessionPort(sessionParameters))
{
throw new ArgumentException("Server does not listen port '" + sessionParameters.Port +
"' which configured in FIX Session '" + sessionParameters.SessionId +
"'");
}
}
_connectionHandler.RegisterAcceptorSession((SessionParameters)sessionParameters.Clone());
}
private bool ValidateAcceptorSessionPort(SessionParameters sessionParameters)
{
return _servers.Keys.Any(i => i == sessionParameters.Port);
}
/// <summary>
/// Remove registered acceptor session.
/// Note: To remove registered acceptor session it's enough to pass SessionParameters with right SenderComId and
/// TargetCompId for a moment.
/// </summary>
/// <param name="sessionParameters"> </param>
public virtual void UnregisterAcceptorSession(SessionParameters sessionParameters)
{
_connectionHandler.UnregisterAcceptorSession(sessionParameters);
}
public virtual IList<SessionParameters> GetRegisterAcceptorSession()
{
return _connectionHandler.GetRegisterAcceptorSession();
}
/// <summary>
/// Path to config file
/// </summary>
/// <value> path to config file </value>
public virtual string ConfigPath { get; set; }
protected virtual bool IsStarted { get; private set; }
public virtual void AddServerStatusListener(IFixServerStatusListener statusListener)
{
_statusListeners.Add(statusListener);
}
public virtual void RemoveServerStatusListener(IFixServerStatusListener statusListener)
{
_statusListeners.Remove(statusListener);
}
private void NotifyStarted()
{
IsStarted = true;
foreach (var statusListener in _statusListeners)
{
try
{
statusListener.ServerStarted();
}
catch (Exception e)
{
Log.Debug(e.Message, e);
}
}
}
private void NotifyStopped()
{
IsStarted = false;
foreach (var statusListener in _statusListeners)
{
try
{
statusListener.ServerStopped();
}
catch (Exception e)
{
Log.Debug(e.Message, e);
}
}
}
}
}