FixAntenna/NetCore/FixEngine/Session/StandardSessionSequenceManager.cs (975 lines of code) (raw):
// Copyright (c) 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.IO;
using System.Text;
using Epam.FixAntenna.Constants.Fixt11;
using Epam.FixAntenna.NetCore.Common;
using Epam.FixAntenna.NetCore.Common.Collections;
using Epam.FixAntenna.NetCore.Common.Logging;
using Epam.FixAntenna.NetCore.Configuration;
using Epam.FixAntenna.NetCore.FixEngine.Session.Util;
using Epam.FixAntenna.NetCore.FixEngine.Storage;
using Epam.FixAntenna.NetCore.Message;
namespace Epam.FixAntenna.NetCore.FixEngine.Session
{
internal sealed class StandardSessionSequenceManager : ISessionSequenceManager
{
private static readonly ILog Log = LogFactory.GetLog(typeof(StandardSessionSequenceManager));
private const long MillisInDay = 24 * 60 * 60 * 1000;
private readonly bool _logIsTraceEnabled = Log.IsTraceEnabled;
private AbstractFixSession _session;
private SessionParameters _sessionParameters;
private FixSessionRuntimeState _runtimeState;
private IStorageFactory _storageFactory;
private ConfigurationAdapter _configAdapter;
private bool _ignoreResetSeqNumFlagOnReset;
public StandardSessionSequenceManager(AbstractFixSession session)
{
Init(session);
}
/// <inheritdoc />
public void Reinit(AbstractFixSession session)
{
Init(session);
}
private void Init(AbstractFixSession session)
{
_session = session;
_storageFactory = session.StorageFactory;
_sessionParameters = session.Parameters;
_runtimeState = session.RuntimeState;
_configAdapter = session.ConfigAdapter;
SeqResendManager = CreateResendManagerForSession();
_ignoreResetSeqNumFlagOnReset = _configAdapter.Configuration.GetPropertyAsBoolean(Config.IgnoreResetSeqNumFlagOnReset, false);
}
/// <summary>
/// Gets sequence resend manager.
/// </summary>
/// <value> SequenceResendManager </value>
public ISequenceResendManager SeqResendManager { get; private set; }
private ISequenceResendManager CreateResendManagerForSession()
{
long maxRequestResend = _configAdapter.Configuration.GetPropertyAsInt(Config.MaxRequestResendInBlock, 0);
if (maxRequestResend > 0)
{
return new BlockSequenceResendManager(this, maxRequestResend);
}
return new FreeRangeSequenceResendManager(this);
}
/// <summary>
/// Gets expected sequence number.
/// <para>
/// If in the session <see cref="ExtendedFixSessionAttribute.LastRrSeqNum"/> attribute is set the seq number takes from it;
/// Otherwise if processed seq number is set in the session parameters the expected seq num takes from it;
/// Otherwise the seq number takes from incoming seq num.</para>
/// </summary>
public long GetExpectedIncomingSeqNumber()
{
var expectedSeqNum = _runtimeState.InSeqNum;
var processedSeqNum = _runtimeState.LastProcessedSeqNum;
if (processedSeqNum != 0)
{
if (_logIsTraceEnabled)
{
Log.Trace("Get expected seq num from processed seq num: " + (processedSeqNum + 1));
}
return processedSeqNum + 1; // get next seq num
}
if (_logIsTraceEnabled)
{
Log.Trace("Get expected seq num from incoming seq num: " + expectedSeqNum);
}
return expectedSeqNum;
}
private bool IsExpectedSeqNum(long? lastSeqId, long? seqNum)
{
return seqNum != null && lastSeqId.Equals(seqNum);
}
/// <inheritdoc />
public void DecrementIncomingSeqNumber()
{
var runtimeState = _session.RuntimeState;
runtimeState.DecrementInSeqNum();
}
/// <summary>
/// Store the last processed seq number.
/// <para>The method work only when last processed and last incoming seq nums are not equals.</para>
/// </summary>
public void SaveProcessedSeqNumberOnShutdown()
{
if (IsProcessedSequenceInvalid())
{
// ideal situation when last precessed seq num eq last incoming seq num,
// otherwise engine should save the last processed value.
SaveCurrentProcessedSequence();
}
}
private bool IsProcessedSequenceInvalid()
{
var processSeqNum = _runtimeState.LastProcessedSeqNum;
if (processSeqNum == 0)
{
return false;
}
var lastIncomingSeqNum = _runtimeState.InSeqNum;
return !(processSeqNum == lastIncomingSeqNum - 1);
}
/// <summary>
/// Saves the processed seq ID.
/// </summary>
/// <exception cref="IOException"> if error occurred </exception>
private void SaveCurrentProcessedSequence()
{
try
{
SaveRestoredSequences();
_session.SetAttribute(ExtendedFixSessionAttribute.DeleteLastProcessedSeqNumFromFile, true);
}
catch (Exception e)
{
_session.ErrorHandler.OnWarn("Error on save processed seq id.", e);
}
}
/// <summary>
/// Save session parameters in file.
/// If session attribute <see cref="ExtendedFixSessionAttribute.LastRrSeqNum"/> is set the incoming seq num will be saved;
/// And if <c>sessionParameters.GetProcessedIncomingSequenceNumber()</c> is set the last processed seq num will be saved;
/// </summary>
/// <exception cref="IOException"> if error occurred </exception>
public void SaveSessionParameters()
{
var sessionParameters = GetSessionParametersForSaving(_session.Parameters);
var stateToStore = new FixSessionRuntimeState();
//stateToStore.SetOutSeqNum(0);
if (!Common.Constants.IsNull(_session.GetAttributeAsLong(ExtendedFixSessionAttribute.LastRrSeqNum)))
{
stateToStore.InSeqNum = _runtimeState.InSeqNum;
}
stateToStore.LastProcessedSeqNum = _runtimeState.LastProcessedSeqNum;
_storageFactory.SaveSessionParameters(sessionParameters, stateToStore);
}
/// <summary>
/// Restore session parameters file.
/// </summary>
/// <exception cref="IOException"> if error occurred </exception>
public void SaveRestoredSequences()
{
var stateToStore = new FixSessionRuntimeState();
stateToStore.InSeqNum = 0;
stateToStore.OutSeqNum = 0;
stateToStore.LastProcessedSeqNum = _runtimeState.LastProcessedSeqNum;
_storageFactory.SaveSessionParameters(_sessionParameters, stateToStore);
}
public SessionParameters GetSessionParametersForSaving(SessionParameters sessionParameters)
{
var clonedParameters = new SessionParameters();
clonedParameters.SenderCompId = sessionParameters.SenderCompId;
clonedParameters.TargetCompId = sessionParameters.TargetCompId;
clonedParameters.SessionQualifier = sessionParameters.SessionQualifier;
clonedParameters.Port = sessionParameters.Port;
clonedParameters.Host = sessionParameters.Host;
clonedParameters.FixVersionContainer = sessionParameters.FixVersionContainer;
clonedParameters.AppVersionContainer = sessionParameters.AppVersionContainer;
clonedParameters.LastSeqNumResetTimestamp = sessionParameters.LastSeqNumResetTimestamp;
if (sessionParameters.IsCustomSessionId)
{
clonedParameters.SetSessionId(sessionParameters.SessionId.ToString());
}
clonedParameters.IncomingSequenceNumber = sessionParameters.IncomingSequenceNumber;
clonedParameters.OutgoingSequenceNumber = sessionParameters.OutgoingSequenceNumber;
return clonedParameters;
}
/// <inheritdoc />
public void RestoreSessionParameters()
{
SaveRestoredSequences();
}
private void BackupSessionStorage()
{
_session.BackupStorages();
_sessionParameters.LastSeqNumResetTimestamp = DateTimeHelper.CurrentMilliseconds;
RestoreSessionParameters();
}
/// <summary>
/// Loads sequences from file.
/// </summary>
public void LoadStoredParameters()
{
var parameters = CreateSessionParameters();
SetBasicSessionParameters(parameters);
var storedState = new FixSessionRuntimeState();
//loading persisted properties
if (_session.StorageFactory.LoadSessionParameters(parameters, storedState))
{
ProcessInSeqNum(storedState.InSeqNum);
SetSessionParametersOverrides(_session.Parameters, parameters);
}
}
private void SetBasicSessionParameters(SessionParameters parameters)
{
var sessionParametersInstance = _session.Parameters;
parameters.SenderCompId = sessionParametersInstance.SenderCompId;
parameters.TargetCompId = sessionParametersInstance.TargetCompId;
if (sessionParametersInstance.IsCustomSessionId)
{
parameters.SetSessionId(sessionParametersInstance.SessionId.ToString());
}
}
private void ProcessInSeqNum(long storedInSeqNum)
{
if (storedInSeqNum > 0)
{
_session.SetAttribute(ExtendedFixSessionAttribute.LastRrSeqNum, _runtimeState.InSeqNum);
if (Log.IsDebugEnabled)
{
Log.Debug("Attribute '" + ExtendedFixSessionAttribute.LastRrSeqNum.Name + "' is set, value - " + storedInSeqNum);
}
_runtimeState.InSeqNum = storedInSeqNum;
}
}
public void SetSessionParametersOverrides(SessionParameters sessionParameters, SessionParameters inParameters)
{
sessionParameters.LastSeqNumResetTimestamp = inParameters.LastSeqNumResetTimestamp;
if (!sessionParameters.IsSetInSeqNumsOnNextConnect)
{
sessionParameters.IncomingSequenceNumber = inParameters.IncomingSequenceNumber;
}
if (!sessionParameters.SetOutSeqNumsOnNextConnect)
{
sessionParameters.OutgoingSequenceNumber = inParameters.OutgoingSequenceNumber;
}
}
public SessionParameters CreateSessionParameters()
{
return new SessionParameters();
}
/// <summary>
/// Gets RR sequence from session attribute.
/// </summary>
/// <returns> RR num or null. </returns>
public long GetRrSequenceFromSession()
{
return _session.GetAttributeAsLong(ExtendedFixSessionAttribute.LastRrSeqNum);
}
/// <summary>
/// Saves the RR seq.
/// </summary>
/// <param name="lastRrSeq"> the resend request sequence </param>
/// <exception cref="IOException"> if error occurred </exception>
public void SaveRrSequence(long lastRrSeq)
{
try
{
_session.SetAttribute(ExtendedFixSessionAttribute.LastRrSeqNum, lastRrSeq);
var stateToStore = new FixSessionRuntimeState();
stateToStore.OutSeqNum = 0;
stateToStore.InSeqNum = lastRrSeq;
stateToStore.LastProcessedSeqNum = _runtimeState.LastProcessedSeqNum;
_storageFactory.SaveSessionParameters(_sessionParameters, stateToStore);
if (Log.IsDebugEnabled)
{
Log.Debug("Attribute '" + ExtendedFixSessionAttribute.LastRrSeqNum.Name + "' was stored, value: " + lastRrSeq);
}
}
catch (Exception e)
{
_session.ErrorHandler.OnWarn("Error on remove attribute '" + ExtendedFixSessionAttribute.LastRrSeqNum.Name + "'.", e);
}
}
/// <summary>
/// Remove RR seq from session.
/// </summary>
public void RemoveRrSequenceFromSession(long? lastSeqId)
{
_session.RemoveLongAttribute(ExtendedFixSessionAttribute.LastRrSeqNum);
if (Log.IsDebugEnabled)
{
Log.Debug("Attribute '" + ExtendedFixSessionAttribute.LastRrSeqNum.Name + "' was removed, value: " + lastSeqId.Value);
}
try
{
SaveRestoredSequences();
}
catch (IOException e)
{
_session.ErrorHandler.OnWarn("Error on restore session parameters.", e);
}
}
private void SaveRequestSequenceRange(long startOfRrRange, long endOfRrRange)
{
if (Log.IsDebugEnabled)
{
Log.Debug("Save RR range " + ExtendedFixSessionAttribute.StartOfRrRange.Name + ": " + startOfRrRange + ", " + ExtendedFixSessionAttribute.EndOfRrRange.Name + ": " + endOfRrRange);
}
_session.SetAttribute(ExtendedFixSessionAttribute.StartOfRrRange, startOfRrRange);
_session.SetAttribute(ExtendedFixSessionAttribute.EndOfRrRange, endOfRrRange);
}
/// <inheritdoc />
public void RemoveRangeOfRrSequence()
{
var end = _session.GetAttributeAsLong(ExtendedFixSessionAttribute.EndOfRrRange);
if (Log.IsDebugEnabled)
{
var start = _session.GetAttributeAsLong(ExtendedFixSessionAttribute.StartOfRrRange);
var sb = new StringBuilder("Remove RR range ");
if (Common.Constants.IsNull(end) || Common.Constants.IsNull(start))
{
sb.Append("session attributes");
}
else
{
sb.Append(ExtendedFixSessionAttribute.StartOfRrRange.Name).Append(": ").Append(start);
sb.Append(", ").Append(ExtendedFixSessionAttribute.EndOfRrRange.Name).Append(": ").Append(end);
}
Log.Debug(sb.ToString());
}
_session.RemoveLongAttribute(ExtendedFixSessionAttribute.StartOfRrRange);
_session.RemoveLongAttribute(ExtendedFixSessionAttribute.EndOfRrRange);
_session.RemoveLongAttribute(ExtendedFixSessionAttribute.RequestedEndRrRange);
_session.RemoveLongAttribute(ExtendedFixSessionAttribute.LastRrRange);
//_session.RemoveLongAttribute(ExtendedFixSessionAttribute.SimilarRrCounter); - this property we can't remove - it is requred for the case of repetitive RR (autoBreakInfiniteRRLoop.xml)
_session.RemoveLongAttribute(ExtendedFixSessionAttribute.LastRrSeqNum);
_session.FixSessionOutOfSyncListener.OnGapClosed(end);
}
/// <summary>
/// Gets start of RR range.
/// </summary>
/// <returns> start of range or -1 </returns>
public long GetStartRangeOfRrSequence()
{
var startOfRange = _session.GetAttributeAsLong(ExtendedFixSessionAttribute.StartOfRrRange);
if (Common.Constants.IsNull(startOfRange))
{
return -1;
}
return startOfRange;
}
/// <summary>
/// Gets end of RR range.
/// </summary>
/// <returns> end of range or -1 </returns>
public long GetEndRangeOfRrSequence()
{
var endOfRange = _session.GetAttributeAsLong(ExtendedFixSessionAttribute.EndOfRrRange);
if (Common.Constants.IsNull(endOfRange))
{
return -1;
}
return endOfRange;
}
/// <summary>
/// Returns true if <c>seqNum</c> in range.
/// </summary>
public bool IsSequenceInRange(long seqNum)
{
var start = GetStartRangeOfRrSequence();
var end = GetEndRangeOfRrSequence();
return seqNum >= start && (seqNum <= end || end == 0);
}
/// <summary>
/// Returns true if range exists.
/// </summary>
public bool IsRRangeExists()
{
return !Common.Constants.IsNull(_session.GetAttributeAsLong(ExtendedFixSessionAttribute.EndOfRrRange)) && !Common.Constants.IsNull(_session.GetAttributeAsLong(ExtendedFixSessionAttribute.StartOfRrRange));
}
/// <inheritdoc />
//TODO: improve method name
public bool IsRrSequenceActive()
{
var lastRrSeq = GetRrSequenceFromSession();
if (Common.Constants.IsNull(lastRrSeq))
{
return false;
}
return true;
}
/// <inheritdoc />
public void UpdateEndOfRrRange(long incomingSeqNum)
{
var endRange = GetEndRangeOfRrSequence();
if (endRange < incomingSeqNum)
{
SaveRequestSequenceRange(GetStartRangeOfRrSequence(), incomingSeqNum);
}
}
/// <inheritdoc />
public void UpdateLastRrSequence(long? msgSeqNum)
{
var lastRrSeq = GetRrSequenceFromSession();
var endOfRr = GetEndRangeOfRrSequence();
if (Log.IsTraceEnabled)
{
_session.SetAttribute(ExtendedFixSessionAttribute.LastRrSeqNum, msgSeqNum);
Log.Trace("Update RR seq num: " + msgSeqNum);
}
var enhancedResendLogic = _session.Parameters.Configuration
.GetPropertyAsBoolean(Config.EnhancedCmeResendLogic);
bool canRemoveRrSequence;
if (enhancedResendLogic)
{
canRemoveRrSequence = IsExpectedSeqNum(lastRrSeq, msgSeqNum) && msgSeqNum == endOfRr;
}
else
{
canRemoveRrSequence = IsExpectedSeqNum(lastRrSeq, msgSeqNum);
}
if (canRemoveRrSequence)
{
RemoveRrSequenceFromSession(lastRrSeq);
}
}
/// <summary>
/// Reset sequences in file.
/// </summary>
public void ResetSequencesOnRequest(long msgSeqNum)
{
ResetSequences();
var endOfSeqNumRange = GetEndRangeOfRrSequence();
if (msgSeqNum >= endOfSeqNumRange)
{
RemoveRangeOfRrSequence();
_session.SetOutOfTurnMode(false);
}
var inSeqNum = msgSeqNum - 1; // -1 because number will be incremented after this message
_runtimeState.InSeqNum = inSeqNum;
if (Log.IsTraceEnabled)
{
Log.Trace("Save processed seq num to session:" + inSeqNum);
}
_runtimeState.LastProcessedSeqNum = inSeqNum > 0 ? inSeqNum - 1 : 0;
SeqResendManager.SkipMessagesFromBufferTillSeqNum(_runtimeState.InSeqNum + 1);
}
/// <inheritdoc />
public void ResetSequencesOnLogon()
{
//it's indicate that counterparty reset their sequences - let's reset our incoming too!
_runtimeState.InSeqNum = 1;
_runtimeState.LastProcessedSeqNum = 0;
if (_session.GetAttribute(ExtendedFixSessionAttribute.IntradayLogonIsSent.Name) != null)
{
// response received, we were initiator of intraday logon
_session.RemoveAttribute(ExtendedFixSessionAttribute.IntradayLogonIsSent.Name);
}
else
{
//we are acceptor of Logon with seqnum reset
if (SessionState.IsConnected(_session.SessionState))
{
_session.ResetSequenceNumbers(false);
_session.RemoveAttribute(ExtendedFixSessionAttribute.IntradayLogonIsSent.Name);
}
}
}
private void ResetSequences()
{
try
{
_session.RemoveAttribute(ExtendedFixSessionAttribute.DeleteLastProcessedSeqNumFromFile);
if (_logIsTraceEnabled)
{
var processedSeqNum = _session.RuntimeState.LastProcessedSeqNum;
Log.Trace("Attribute '" + ExtendedFixSessionAttribute.DeleteLastProcessedSeqNumFromFile + "' was removed, value: " + processedSeqNum);
}
var lastRrSeqNum = _session.GetAttributeAsLong(ExtendedFixSessionAttribute.LastRrSeqNum);
_session.RemoveAttribute(ExtendedFixSessionAttribute.LastRrSeqNum);
if (Log.IsDebugEnabled)
{
Log.Debug("Attribute '" + ExtendedFixSessionAttribute.LastRrSeqNum.Name + "' " + (lastRrSeqNum == 0 ? "was removed from session" : ("was removed from session, value: " + lastRrSeqNum)));
}
SaveRestoredSequences();
}
catch (IOException e)
{
_session.ErrorHandler.OnWarn("Error on restore session parameters.", e);
}
}
/// <inheritdoc />
public bool DoAfterMessageProcessActions()
{
// commented, these code invalid - seq num from attribute invalid, because: ignorable message Allow to set invalid seq num
// Long incomingSeqNum = getSequenceManager().getIncomingSequenceFromSessionAttribute();
var incomingSeqNum = _runtimeState.InSeqNum;
var lastProcessedSeqNum = _runtimeState.LastProcessedSeqNum;
if (incomingSeqNum != 0 && lastProcessedSeqNum != 0)
{
if (incomingSeqNum != (lastProcessedSeqNum + 1))
{
// don't increment last processed seq num
SaveCurrentProcessedSequence();
return false; // do not increment incoming sequence
}
IncrementProcessedSequence();
return true;
}
IncrementProcessedSequence();
return true;
}
/// <summary>
/// Increment processed sequence.
/// </summary>
private void IncrementProcessedSequence()
{
//removeIncomingSequenceFromSession();
var inSeqNum = _runtimeState.InSeqNum;
if (_logIsTraceEnabled)
{
Log.Trace("Save processed seq num to session: " + inSeqNum);
}
_runtimeState.LastProcessedSeqNum = inSeqNum;
CleanDeleteLastProcessedSeqNumFromFileFlag();
}
private void CleanDeleteLastProcessedSeqNumFromFileFlag()
{
if (_session.GetAttributeAsBool(ExtendedFixSessionAttribute.DeleteLastProcessedSeqNumFromFile))
{
try
{
if (_logIsTraceEnabled)
{
Log.Trace("Remove processed seq num from file: " + _runtimeState.LastProcessedSeqNum);
}
SaveRestoredSequences();
_session.RemoveAttribute(ExtendedFixSessionAttribute.DeleteLastProcessedSeqNumFromFile);
}
catch (Exception)
{
}
}
}
private void RequestNextMessagePart(long startRange, long end, bool posDup)
{
SaveRrSequence(startRange);
_session.SetAttribute(ExtendedFixSessionAttribute.RequestedEndRrRange, end);
if (Log.IsDebugEnabled)
{
Log.Debug("Storing RequestedEndRrRange attribute:" + end);
}
if (!_session.GetAttributeAsBool(ExtendedFixSessionAttribute.RejectSession))
{
SendRequestNextMessagePart(startRange, end, posDup);
}
}
private void SendRequestNextMessagePart(long startRange, long end, bool posDup)
{
var msg = new FixMessage();
var msgFactory = _session.MessageFactory;
if (posDup)
{
msg.AddTag(Tags.PossDupFlag, true);
msg.AddTag(Tags.OrigSendingTime, msgFactory.GetCurrentSendingTime());
}
msg.SetPaddedLongTag(Tags.BeginSeqNo, startRange, msgFactory.MinSeqNumFieldsLength);
msg.SetPaddedLongTag(Tags.EndSeqNo, end, msgFactory.MinSeqNumFieldsLength);
_session.SendMessageOutOfTurn("2", msg);
_session.FixSessionOutOfSyncListener.OnResendRequestSent(startRange, end);
}
/// <inheritdoc />
public long GetCountOfSentRequests(long startRange, long endRange)
{
var currentRange = BuildLastRrRangeVal(startRange, endRange);
var val = (string)_session.GetAttribute(ExtendedFixSessionAttribute.LastRrRange);
if (val != null && currentRange.Equals(val))
{
return _session.GetAttributeAsLong(ExtendedFixSessionAttribute.SimilarRrCounter);
}
return 0;
}
private void IncreaseRrCounter(long startRange, long endRange)
{
var currentRange = BuildLastRrRangeVal(startRange, endRange);
var val = (string)_session.GetAttribute(ExtendedFixSessionAttribute.LastRrRange);
if (ReferenceEquals(val, null) || !currentRange.Equals(val))
{
_session.SetAttribute(ExtendedFixSessionAttribute.LastRrRange, currentRange);
_session.SetAttribute(ExtendedFixSessionAttribute.SimilarRrCounter, 1);
}
else
{
var rrCounter = _session.GetAttributeAsLong(ExtendedFixSessionAttribute.SimilarRrCounter);
_session.SetAttribute(ExtendedFixSessionAttribute.SimilarRrCounter, rrCounter + 1);
}
}
private string BuildLastRrRangeVal(long startRange, long end)
{
var b = new StringBuilder();
b.Append(startRange).Append('-').Append(end);
return b.ToString();
}
/// <inheritdoc />
public void RequestLostMessages(long expectedSeqNum, long incomingSeqNum, bool posDup)
{
IncreaseRrCounter(expectedSeqNum, incomingSeqNum);
SeqResendManager.RequestLostMessages(expectedSeqNum, incomingSeqNum, posDup);
}
/// <inheritdoc />
public void InitSeqNums(long inStorageSeqNum, long nextOutStorageSeqNum)
{
if (_session is InitiatorFixSession)
{
if (ApplyForceSeqNumReset())
{
return;
}
}
else
{
//acceptor
if (InLogonHasResetFlag())
{
ResetSeqNumAndOptionallySetResetSeqNumFlag();
if (Log.IsTraceEnabled)
{
Log.Trace("Reset sequences by incoming LOGON ResetSeqNum(141)");
}
return;
}
}
if (OutLogonHasResetFlag())
{
ResetSeqNumAndSetResetSeqNumFlag();
if (Log.IsTraceEnabled)
{
Log.Trace("Reset sequences by outgoing LOGON ResetSeqNum(141)");
}
return;
}
if (ApplyIntraDayReset())
{
if (Log.IsTraceEnabled)
{
Log.Trace("Apply intra day reset");
}
return;
}
if (ApplyResetByTime())
{
if (Log.IsTraceEnabled)
{
Log.Trace("Apply daily reset");
}
return;
}
if (ApplyResetOnNextConnect())
{
if (Log.IsTraceEnabled)
{
Log.Trace("Apply reset sequences for next connect (by session parameters)");
}
return;
}
InitInSeqNumFromProperties(inStorageSeqNum);
InitOutSeqNumFromProperties(nextOutStorageSeqNum);
//FIXME_NOW: may be we need more clear save method
SaveSessionParameters();
}
private bool ApplyForceSeqNumReset()
{
switch (_sessionParameters.ForceSeqNumReset)
{
case ForceSeqNumReset.Always:
ResetSeqNumAndOptionallySetResetSeqNumFlag();
if (Log.IsTraceEnabled)
{
Log.Trace("Apply force seq num reset - ALWAYS");
}
return true;
case ForceSeqNumReset.OneTime:
if (!IsSendLogonWithSeqResetNum())
{
ResetSeqNumAndOptionallySetResetSeqNumFlag();
_session.SetAttribute(ExtendedFixSessionAttribute.IsSendResetSeqNum.Name, "Y");
if (Log.IsTraceEnabled)
{
Log.Trace("Apply force seq num reset - ONETIME");
}
return true;
}
else
{
_runtimeState.OutgoingLogon = _sessionParameters.OutgoingLoginMessage;
}
break;
case ForceSeqNumReset.Never:
break;
}
return false;
}
private void ResetSeqNumAndSetResetSeqNumFlag()
{
ResetRuntimeSequences();
SetResetSeqNumFlagIntoOutgoingLogon();
}
private void ResetSeqNumAndOptionallySetResetSeqNumFlag()
{
ResetRuntimeSequences();
if (!_ignoreResetSeqNumFlagOnReset && IsInitiatorOrAcceptorWithResetFlag())
{
SetResetSeqNumFlagIntoOutgoingLogon();
}
}
private bool IsInitiatorOrAcceptorWithResetFlag()
{
return _session is InitiatorFixSession || (_session is AcceptorFixSession && InLogonHasResetFlag());
}
/// <inheritdoc />
public void SetResetSeqNumFlagIntoOutgoingLogon()
{
_runtimeState.OutgoingLogon.UpdateValue(Tags.ResetSeqNumFlag, ResetSeqNumFlagValue, IndexedStorage.MissingTagHandling.AddIfNotExists);
}
private void ResetRuntimeSequences()
{
PrepareStateForReset();
ApplyOutSeqnum(1);
ApplyInSeqNum(1);
ResetSequencesForNextConnect();
SaveSessionParameters();
}
private void PrepareStateForReset()
{
_session.BackupStorages();
_sessionParameters.LastSeqNumResetTimestamp = DateTimeHelper.CurrentMilliseconds;
RemoveRangeOfRrSequence();
SaveRestoredSequences();
}
private bool InLogonHasResetFlag()
{
return IsPresetResetFlag(_sessionParameters.IncomingLoginMessage);
}
private bool OutLogonHasResetFlag()
{
return IsPresetResetFlag(_runtimeState.OutgoingLogon);
}
private bool IsPresetResetFlag(FixMessage msg)
{
var rsnfTagIndex = msg.GetTagIndex(Tags.ResetSeqNumFlag);
return rsnfTagIndex != FixMessage.NotFound && msg.GetTagValueAsBoolAtIndex(rsnfTagIndex);
}
private bool ApplyIntraDayReset()
{
if (_configAdapter.IsIntraDeySeqNumResetEnabled)
{
ResetRuntimeSequences();
return true;
}
return false;
}
private bool ApplyResetByTime()
{
if (_configAdapter.IsResetSeqNumTimeEnabled)
{
var lastSeqNumResetTimestamp = _sessionParameters.LastSeqNumResetTimestamp;
if (IsResetTimeMissed(lastSeqNumResetTimestamp))
{
ResetSeqNumAndOptionallySetResetSeqNumFlag();
return true;
}
}
return false;
}
private bool ApplyResetOnNextConnect()
{
var inSeqNumsForNextConnect = _sessionParameters.IncomingSequenceNumber;
var outSeqNumsForNextConnect = _sessionParameters.OutgoingSequenceNumber;
if (inSeqNumsForNextConnect == 1 && outSeqNumsForNextConnect == 1)
{
ResetSeqNumAndOptionallySetResetSeqNumFlag();
//FIXME_NOW: may be we need more clear save method
SaveSessionParameters();
return true;
}
return false;
}
private void InitInSeqNumFromProperties(long inStorageSeqNum)
{
if (_sessionParameters.IsSetInSeqNumsOnNextConnect)
{
if (Log.IsTraceEnabled)
{
Log.Trace("Init in sequence with configured value: " + _sessionParameters.IncomingSequenceNumber);
}
_runtimeState.InSeqNum = _sessionParameters.IncomingSequenceNumber;
//reset last processed seq num when reset in seq num
_runtimeState.LastProcessedSeqNum = 0;
_sessionParameters.DisableInSeqNumsOnNextConnect();
//todo - save session parameters
}
else if (_runtimeState.InSeqNum == FixSessionRuntimeState.InitSeqNum)
{
if (Log.IsTraceEnabled)
{
Log.Trace("Init in sequence from storage sequence: " + inStorageSeqNum);
}
_runtimeState.InSeqNum = inStorageSeqNum;
}
}
private void InitOutSeqNumFromProperties(long nextOutStorageSeqNum)
{
if (_sessionParameters.SetOutSeqNumsOnNextConnect)
{
if (Log.IsTraceEnabled)
{
Log.Trace("Init out sequence with configured value: " + _sessionParameters.OutgoingSequenceNumber);
}
_runtimeState.OutSeqNum = _sessionParameters.OutgoingSequenceNumber;
_sessionParameters.DisableOutSeqNumsOnNextConnect();
//todo - save session parameters
}
else if (_runtimeState.OutSeqNum == FixSessionRuntimeState.InitSeqNum)
{
if (Log.IsTraceEnabled)
{
Log.Trace("Init out sequence from storage sequence: " + nextOutStorageSeqNum);
}
_runtimeState.OutSeqNum = nextOutStorageSeqNum;
}
}
private void ResetSequencesForNextConnect()
{
_sessionParameters.DisableInSeqNumsOnNextConnect();
_sessionParameters.DisableOutSeqNumsOnNextConnect();
}
private static readonly byte[] ResetSeqNumFlagValue = { (byte)'Y' };
private bool IsSendLogonWithSeqResetNum()
{
return _session.GetAttribute(ExtendedFixSessionAttribute.IsSendResetSeqNum.Name) != null;
}
/// <inheritdoc />
public void ConfigureStateBeforeReset()
{
RestoreSessionParameters();
// backup session
BackupSessionStorage();
// sets new seq nums
//reset also RR flags
RemoveRangeOfRrSequence();
_sessionParameters.LastSeqNumResetTimestamp = DateTimeHelper.CurrentMilliseconds;
}
/// <inheritdoc />
public bool IsResetTimeMissed(long lastResetTime)
{
if (lastResetTime != 0)
{
var resetSequenceTimeInUserTimestamp = _configAdapter.ResetSequenceTimeInUserTimestamp;
var nextStart = AdjustTimestamp(resetSequenceTimeInUserTimestamp);
return nextStart - lastResetTime >= MillisInDay;
}
return false;
}
private long AdjustTimestamp(long timestamp)
{
if (DateTimeHelper.CurrentMilliseconds > timestamp)
{
timestamp += MillisInDay;
}
return timestamp;
}
/// <inheritdoc />
public void InitLastSeqNumResetTimestampOnNewSession()
{
if (_configAdapter.IsResetSeqNumTimeEnabled)
{
var lastSeqNumResetTimestamp = _sessionParameters.LastSeqNumResetTimestamp;
if (lastSeqNumResetTimestamp == 0 && _sessionParameters.IncomingSequenceNumber <= 1 && _sessionParameters.OutgoingSequenceNumber <= 1)
{
_sessionParameters.LastSeqNumResetTimestamp = DateTimeHelper.CurrentMilliseconds;
SaveSessionParameters();
}
}
}
/// <inheritdoc />
public void ResetSeqNumForNextConnect()
{
ConfigureStateBeforeReset();
_session.RemoveAttribute(ExtendedFixSessionAttribute.IsSendResetSeqNum.Name);
_sessionParameters.IncomingSequenceNumber = 1;
_sessionParameters.OutgoingSequenceNumber = 1;
ApplyResetOnNextConnect();
}
/// <inheritdoc />
public void ApplyOutSeqnum(long outSeqNum)
{
_runtimeState.OutSeqNum = outSeqNum;
}
/// <inheritdoc />
public void ApplyInSeqNum(long inSeqNum)
{
_runtimeState.InSeqNum = inSeqNum;
_runtimeState.LastProcessedSeqNum = inSeqNum - 1;
}
/// <inheritdoc />
public void IncrementOutSeqNum()
{
_runtimeState.IncrementOutSeqNum();
if (_sessionParameters.SetOutSeqNumsOnNextConnect)
{
_sessionParameters.DisableOutSeqNumsOnNextConnect();
SaveRestoredSequences();
}
}
internal abstract class AbstractSequenceResendManager : ISequenceResendManager
{
/// <inheritdoc />
public abstract void SendRequestForResend(long seqNum, bool posDup);
/// <inheritdoc />
public abstract bool IsBlockResendSupported(long seqNum);
/// <inheritdoc />
public abstract void RequestLostMessages(long expectedSeqNum, long incomingSeqNum, bool posDup);
private readonly StandardSessionSequenceManager _sequenceManager;
internal IBoundedQueue<FixMessage> BufferedMessages;
public AbstractSequenceResendManager(StandardSessionSequenceManager sequenceManager)
{
_sequenceManager = sequenceManager;
var bufferSize = _sequenceManager._configAdapter.Configuration.GetPropertyAsInt(Config.SequenceResendManagerMessageBufferSize, 32);
BufferedMessages = new SimpleBoundedQueue<FixMessage>(bufferSize);
}
/// <inheritdoc />
public virtual bool IsBufferEmpty
{
get { return BufferedMessages.IsEmpty; }
}
/// <inheritdoc />
public virtual bool PutMessageIntoBuffer(FixMessage message)
{
if (BufferedMessages.Offer(message))
{
if (Log.IsInfoEnabled)
{
Log.Info("Message with sequence number \"" + message.GetTagAsInt(Tags.MsgSeqNum) + "\" put into buffer. Buffer size is " + BufferedMessages.Size);
}
return true;
}
if (Log.IsInfoEnabled)
{
Log.Info("Buffer is full. Buffer size is " + BufferedMessages.Size);
}
return false;
}
/// <inheritdoc />
public virtual FixMessage TakeMessageFromBuffer()
{
if (Log.IsDebugEnabled)
{
Log.Debug("SequenceResendBuffer size is " + BufferedMessages.Size);
}
var message = BufferedMessages.Poll();
if (null != message)
{
if (Log.IsInfoEnabled)
{
Log.Info("Message " + message.ToPrintableString() + " taken from buffer.");
}
}
else
{
if (Log.IsInfoEnabled)
{
Log.Info("SequenceResendBuffer is empty");
}
}
return message;
}
/// <inheritdoc />
public bool IsMessageProcessingFromBufferStarted { get; set; }
/// <inheritdoc />
public virtual bool IsRrRangeActive
{
get
{
return !Common.Constants.IsNull(
_sequenceManager._session.GetAttributeAsLong(ExtendedFixSessionAttribute.StartOfRrRange)) &&
!Common.Constants.IsNull(
_sequenceManager._session.GetAttributeAsLong(ExtendedFixSessionAttribute.EndOfRrRange));
}
}
/// <inheritdoc />
public virtual void SkipMessagesFromBufferTillSeqNum(long seqNum)
{
while ((!BufferedMessages.IsEmpty) && BufferedMessages.Peek().MsgSeqNumber < seqNum)
{
var message = BufferedMessages.Remove();
if (Log.IsDebugEnabled)
{
Log.Debug("Skipped from buffer: " + message);
}
}
}
}
internal class BlockSequenceResendManager : AbstractSequenceResendManager
{
private readonly StandardSessionSequenceManager _sequenceManager;
internal long MaxRequestResendBlock;
public BlockSequenceResendManager(StandardSessionSequenceManager sequenceManager, long maxRequestResendBlock) : base(sequenceManager)
{
_sequenceManager = sequenceManager;
MaxRequestResendBlock = maxRequestResendBlock;
}
public virtual bool IsNeedPartResendForMessage()
{
var start = _sequenceManager.GetStartRangeOfRrSequence();
var end = _sequenceManager.GetEndRangeOfRrSequence();
return (end - start) > MaxRequestResendBlock && MaxRequestResendBlock != 0;
}
public override bool IsBlockResendSupported(long seqNum)
{
if (MaxRequestResendBlock == 0)
{
return false;
}
var startRange = _sequenceManager.GetStartRangeOfRrSequence();
var endRange = _sequenceManager.GetEndRangeOfRrSequence();
return seqNum < endRange && (seqNum - startRange + 1) % MaxRequestResendBlock == 0;
}
public override void SendRequestForResend(long startRange, bool posDup)
{
_sequenceManager.SaveRrSequence(startRange);
var endRange = GetEndOfRange(startRange);
_sequenceManager.RequestNextMessagePart(startRange, endRange, posDup);
}
public virtual long GetEndOfRange(long startRange)
{
var endRange = _sequenceManager.GetEndRangeOfRrSequence();
if (endRange - startRange >= MaxRequestResendBlock && MaxRequestResendBlock != 0)
{
var endOfFrame = endRange;
if (startRange + MaxRequestResendBlock - 1 < endOfFrame)
{
endOfFrame = startRange + MaxRequestResendBlock - 1;
}
endRange = endOfFrame;
}
return endRange;
}
public override void RequestLostMessages(long expectedSeqNum, long incomingSeqNum, bool posDup)
{
var fixVersion = _sequenceManager._session.Parameters.FixVersion;
long endOfSequence = 0;
if (fixVersion == FixVersion.Fix40 || fixVersion == FixVersion.Fix41)
{
endOfSequence = _sequenceManager._session.MessageFactory.GetEndSequenceNumber();
}
_sequenceManager.SaveRequestSequenceRange(expectedSeqNum, incomingSeqNum);
if (IsNeedPartResendForMessage())
{
SendRequestForResend(expectedSeqNum, posDup);
}
else
{
_sequenceManager.RequestNextMessagePart(expectedSeqNum, endOfSequence, posDup);
}
}
}
internal class FreeRangeSequenceResendManager : AbstractSequenceResendManager
{
private readonly StandardSessionSequenceManager _sequenceManager;
public FreeRangeSequenceResendManager(StandardSessionSequenceManager sequenceManager) : base(sequenceManager)
{
_sequenceManager = sequenceManager;
}
public override bool IsBlockResendSupported(long seqNum)
{
return false;
}
public override void SendRequestForResend(long seqNum, bool posDup)
{
throw new NotSupportedException();
}
public override void RequestLostMessages(long expectedSeqNum, long incomingSeqNum, bool posDup)
{
var fixVersion = _sequenceManager._session.Parameters.FixVersion;
long endOfSequence = 0;
if (fixVersion == FixVersion.Fix40 || fixVersion == FixVersion.Fix41)
{
endOfSequence = _sequenceManager._session.MessageFactory.GetEndSequenceNumber();
}
_sequenceManager.SaveRequestSequenceRange(expectedSeqNum, incomingSeqNum);
_sequenceManager.RequestNextMessagePart(expectedSeqNum, endOfSequence, posDup);
}
}
}
}