FixAntenna/NetCore/FixEngine/Session/AcceptorFixSession.cs (236 lines of code) (raw):
// Copyright (c) 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Epam.FixAntenna.Constants.Fixt11;
using Epam.FixAntenna.NetCore.Common;
using Epam.FixAntenna.NetCore.FixEngine.Acceptor;
using Epam.FixAntenna.NetCore.FixEngine.Scheduler.Tasks;
using Epam.FixAntenna.NetCore.FixEngine.Session.MessageHandler;
using Epam.FixAntenna.NetCore.FixEngine.Transport;
using Epam.FixAntenna.NetCore.Helpers;
using Epam.FixAntenna.NetCore.Message;
namespace Epam.FixAntenna.NetCore.FixEngine.Session
{
internal sealed class AcceptorFixSession : AbstractFixSession
{
private static readonly byte[] TrueValue = "Y".AsByteArray();
public AcceptorFixSession(IFixMessageFactory messageFactory, SessionParameters sessionParameters,
HandlerChain fixSessionListener, IFixTransport transport) : base(messageFactory, sessionParameters,
fixSessionListener)
{
this.Transport = transport;
}
public void ReinitSession(SessionParameters inSessionParameters, IFixTransport transport)
{
lock (SessionLock)
{
if (SessionState.IsConnected(SessionState))
{
throw new DuplicateSessionException("Session " + Parameters.SessionId +
" is still alive. Duplicate connection from " + transport.RemoteHost);
}
//reset init flag
ResetInitialization();
var oldDetails = Parameters;
CheckNewSessionParameters(oldDetails, inSessionParameters);
UpdateSessionParameters(inSessionParameters, oldDetails);
this.Parameters =
inSessionParameters; //new SessionParameterProxy(inSessionParameters, new SessionParametersProxyAdaptorListener());
this.Transport = transport;
inSessionParameters.PrintConfiguration();
}
}
public void UpdateSessionParameters(SessionParameters newParams, SessionParameters oldParams)
{
newParams.OutgoingLoginMessage = oldParams.OutgoingLoginMessage;
newParams.LastSeqNumResetTimestamp = oldParams.LastSeqNumResetTimestamp;
newParams.FixVersionContainer = oldParams.FixVersionContainer;
newParams.AppVersionContainer = oldParams.AppVersionContainer;
newParams.Configuration = oldParams.Configuration;
newParams.IncomingSequenceNumber = oldParams.IncomingSequenceNumber;
newParams.OutgoingSequenceNumber = oldParams.OutgoingSequenceNumber;
}
private void CheckNewSessionParameters(SessionParameters oldParams, SessionParameters newParams)
{
var errors = new List<string>();
if (!oldParams.IsSimilar(newParams, errors))
{
throw new DuplicateSessionException("Duplicate acceptor session attempt: " + newParams.SessionId +
". Session details not similar to exist one: " + string.Join(Environment.NewLine, errors));
}
}
/// <inheritdoc />
public override void Connect()
{
lock (SessionLock)
{
if (SessionState.IsNotDisconnected(SessionState))
{
throw new InvalidOperationException("Cannot connect while:" + SessionState);
}
SessionState = SessionState.WaitingForLogon;
try
{
PrepareForConnect();
try
{
// TBD!
if (Log.IsDebugEnabled)
{
Log.Debug(
"IncomingLoginFixMessage: " + Parameters.IncomingLoginMessage.ToPrintableString());
}
var message = new MsgBuf(Parameters.IncomingLoginMessage.AsByteArray());
//FIXME: it needs to transfer somehow Logon reading time for registered sessions
message.MessageReadTimeInTicks = Parameters is ParsedSessionParameters parameters
? parameters.LogonReadTimeTicks
: DateTimeHelper.CurrentTicks;
MessageHandlers.OnMessage(message);
}
catch (Exception e)
{
if (Log.IsDebugEnabled)
{
Log.Warn(
"Invalid login message received:" + Parameters.IncomingLoginMessage.ToPrintableString(),
e);
}
else
{
Log.Warn("Invalid login message received:" +
Parameters.IncomingLoginMessage.ToPrintableString() + ". Reason: " + e.Message);
}
var message = Queue.Poll();
//in case of problems Logout should be sent back to notify other side about the reason
if (message != null && !MsgType.Logout.Equals(message.MessageType, StringComparison.Ordinal))
{
Pumper.RejectFirstQueueMessage();
}
}
StartSession();
}
catch (IOException e)
{
Shutdown(DisconnectReason.InitConnectionProblem, true);
if (Log.IsDebugEnabled)
{
Log.Warn("Init session failed. Cause: " + e.Message, e);
}
else
{
Log.Warn("Init session failed. Cause: " + e.Message);
}
}
}
}
public override async Task ConnectAsync()
{
await Task.Run(() => Connect()).ConfigureAwait(false);
}
/// <inheritdoc />
public override void Reject(string reason)
{
if (Log.IsDebugEnabled)
{
Log.Debug("Session " + Parameters.SessionId + " rejecting...");
}
lock (SessionLock)
{
if (SessionState.IsNotDisconnected(SessionState))
{
throw new InvalidOperationException("Cannot connect while:" + SessionState);
}
SessionState = SessionState.WaitingForLogon;
try
{
InitSessionInternal();
if (Log.IsDebugEnabled)
{
Log.Debug("Starting MessagePumper thread. Outgoing seq number:" + RuntimeState.OutSeqNum);
}
Pumper.Start();
if (Log.IsDebugEnabled)
{
Log.Debug("Pumper started");
}
SetAttribute(ExtendedFixSessionAttribute.RejectSession, true);
try
{
//TBD! reuse buffer
var buff = new MsgBuf(Parameters.IncomingLoginMessage.AsByteArray());
MessageHandlers.OnMessage(buff);
}
catch (Exception e)
{
if (Log.IsDebugEnabled)
{
Log.Warn("Invalid login message received:" + Parameters.IncomingLoginMessage.ToString(),
e);
}
else
{
Log.Warn("Invalid login message received:" + Parameters.IncomingLoginMessage.ToString() +
". Reason: " + e.Message);
}
}
//close incoming message storage manually - reader thread not started
IncomingStorage.Dispose();
var sessionState = SessionState;
if (sessionState != SessionState.WaitingForLogoff
&& sessionState != SessionState.WaitingForForcedLogoff
&& sessionState != SessionState.WaitingForForcedDisconnect
&& sessionState != SessionState.Disconnected
&& sessionState != SessionState.DisconnectedAbnormally
&& sessionState != SessionState.Dead)
{
Disconnect(DisconnectReason.Reject, reason);
}
}
catch (IOException e)
{
if (Log.IsDebugEnabled)
{
Log.Warn("Init session failed. Cause: " + e.Message, e);
}
else
{
Log.Warn("Init session failed. Cause: " + e.Message);
}
}
finally
{
RemoveAttribute(ExtendedFixSessionAttribute.RejectSession);
}
}
}
/// <inheritdoc />
public override bool IsResetSeqNumFlagRequiredForInitLogon
{
get
{
var logonMessage = Parameters.IncomingLoginMessage;
var outgoingFields = Parameters.OutgoingLoginMessage;
//TBD: ResetSeqNumFlag sending login is separated from resetting sequences
// we get ResetSeqNumFlag in incoming logon
if (logonMessage != null && ResetSeqNumFlagIsEnabled(logonMessage) &&
!(outgoingFields != null && ResetSeqNumFlagIsEnabled(outgoingFields)))
{
return true;
}
return false;
}
}
/// <inheritdoc />
private bool ResetSeqNumFlagIsEnabled(FixMessage logonMessage)
{
return FixMessageUtil.IsTagValueEquals(logonMessage, Tags.ResetSeqNumFlag, TrueValue);
}
/// <inheritdoc />
public override object GetAndRemoveAttribute(string key)
{
var result = GetAttribute(key);
if (result != null)
{
RemoveAttribute(key);
}
return result;
}
internal void ScheduleDisconnect(string cronExpression, TimeZoneInfo timeZone)
{
Scheduler.ScheduleCronTask<AcceptorSessionStopTask>(cronExpression, timeZone);
}
internal bool IsDisconnectScheduled()
{
return Scheduler.IsTaskScheduled<AcceptorSessionStopTask>();
}
}
}