FixAntenna/NetCore/FixEngine/Session/MessageHandler/HandlerChain.cs (194 lines of code) (raw):
// Copyright (c) 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Collections.Generic;
using Epam.FixAntenna.NetCore.Common;
using Epam.FixAntenna.NetCore.Configuration;
using Epam.FixAntenna.NetCore.FixEngine.Session.MessageHandler.Pre;
using Epam.FixAntenna.NetCore.Message;
using Epam.FixAntenna.NetCore.Message.SpecialTags;
namespace Epam.FixAntenna.NetCore.FixEngine.Session.MessageHandler
{
/// <summary>
/// The <c>HandlerChain</c> provides the <tt>chain of responsibility</tt> pattern.
/// </summary>
internal sealed class HandlerChain : AbstractSessionMessageHandler, ICompositeMessageHandlerListener, IDisposable
{
private const string DefaultRawTags = "96, 91, 213, 349, 351, 353, 355, 357, 359, 361, 363, 365, 446, 619, 622";
private readonly CompositeMessageHandler _compositeListener = new CompositeMessageHandler();
private readonly CompositeUserMessageHandler _compositeUserListener = new CompositeUserMessageHandler();
private readonly List<ISessionMessageHandler> _messageHandlers = new List<ISessionMessageHandler>();
private readonly List<PreProcessMessageHandler> _preProcessMessageHandlers = new List<PreProcessMessageHandler>();
private readonly List<IPostProcessMessageHandler> _postProcessMessageHandlers = new List<IPostProcessMessageHandler>();
private RawFixUtil.IRawTags RawTags;
private IMaskedTags _maskedTags;
private volatile bool _disposed;
/// <summary>
/// Creates the <c>HandlerChain</c> with composite message handler.
/// </summary>
public HandlerChain()
{
_messageHandlers.Add(_compositeListener);
_messageHandlers.Add(_compositeUserListener);
_compositeUserListener.NextHandler = _compositeListener;
RawTags = RawFixUtil.CreateRawTags(DefaultRawTags);
_maskedTags = DefaultMaskedTags.Instance;
}
/// <inheritdoc />
public override IExtendedFixSession Session
{
set
{
base.Session = value;
_compositeListener.Session = value;
_compositeUserListener.Session = value;
RawTags = RawFixUtil.CreateRawTags(
Session.Parameters.Configuration
.GetProperty(Config.RawTags, DefaultRawTags));
_maskedTags = CustomMaskedTags.Create(
Session.Parameters.Configuration
.GetProperty(Config.MaskedTags));
}
}
/// <summary>
/// Adds the specific message handle.
/// </summary>
/// <param name="msgType"> the message type </param>
/// <param name="messageHandler"> the message handler </param>
public void AddSessionMessageHandler(string msgType, ISessionMessageHandler messageHandler)
{
_compositeListener.AddSessionMessageHandler(msgType, messageHandler);
}
/// <summary>
/// Adds the global message handler.
/// The handler adds to the end of handlers list.
/// </summary>
/// <param name="globalMessageHandler"> the global message handler </param>
public void AddGlobalMessageHandler(AbstractGlobalMessageHandler globalMessageHandler)
{
globalMessageHandler.Session = Session;
globalMessageHandler.NextHandler = _messageHandlers[_messageHandlers.Count - 1];
_messageHandlers.Add(globalMessageHandler);
}
public void AddUserGlobalMessageHandler(AbstractUserGlobalMessageHandler userMessageHandler)
{
_compositeUserListener.AddUserMessageHandler(userMessageHandler);
}
/// <summary>
/// Adds the global message handler.
/// The handler adds to the stert of handlers list.
/// </summary>
/// <param name="globalMessageHandler"> the global message handler </param>
public void AddGlobalPostProcessMessageHandler(AbstractGlobalPostProcessSessionMessageHandler globalMessageHandler)
{
globalMessageHandler.Session = Session;
if (_postProcessMessageHandlers.Count > 0)
{
globalMessageHandler.SetNext(_postProcessMessageHandlers[_postProcessMessageHandlers.Count - 1]);
}
_postProcessMessageHandlers.Add(globalMessageHandler);
}
public void AddGlobalPreProcessMessageHandler(PreProcessMessageHandler globalMessageHandler)
{
globalMessageHandler.Session = Session;
if (_preProcessMessageHandlers.Count > 0)
{
globalMessageHandler.SetNext(_preProcessMessageHandlers[_preProcessMessageHandlers.Count - 1]);
}
_preProcessMessageHandlers.Add(globalMessageHandler);
}
/// <inheritdoc />
public override void OnNewMessage(FixMessage message)
{
if (Session.SessionState != SessionState.WaitingForForcedDisconnect)
{
_messageHandlers[_messageHandlers.Count - 1].OnNewMessage(message);
}
else
{
throw new ArgumentException("Session is in forced logoff state. Incoming message " + message.ToPrintableString() + " was ignored.");
}
}
internal FixMessage DefMessageForParse = FixMessageFactory.NewInstanceFromPoolForEngineParse();
/// <inheritdoc />
public void OnMessage(MsgBuf messageBuf)
{
var message = messageBuf.FixMessage;
if (message == null)
{
message = RawFixUtil.GetFixMessage(DefMessageForParse, messageBuf, RawTags, false); // throw GarbledMessageException
message.SetFixVersion(GetFixVersionForMessage());
}
try
{
// TBD!
if (IsNeedPreProcess())
{
Log.Trace("Preprocess messages");
message = PreProcessMessage(message);
// TBD!
//message = message.AsByteArray();
}
OnNewMessage(message); // ProcessMessageException, IllegalArgumentException, NullPointerException, etc
}
catch (InvalidMessageException e)
{
if (e.IsCritical())
{
LogErrorToSession("Invalid message received: " + messageBuf, e);
}
else
{
LogWarnToSession("Invalid message received: " + messageBuf, e);
}
}
finally
{
DefMessageForParse.Clear();
}
OnPostProcessMessage(messageBuf);
}
private FixVersionContainer GetFixVersionForMessage()
{
var fixVersion = Session.Parameters.FixVersionContainer;
if (fixVersion.FixVersion == FixVersion.Fixt11)
{
fixVersion = Session.Parameters.AppVersionContainer;
}
return fixVersion;
}
private FixMessage PreProcessMessage(FixMessage message)
{
if (_preProcessMessageHandlers.Count > 0)
{
return _preProcessMessageHandlers[_preProcessMessageHandlers.Count - 1].PreProcessMessage(message);
}
else
{
return message;
}
}
public void OnPostProcessMessage(MsgBuf message)
{
_postProcessMessageHandlers[_postProcessMessageHandlers.Count - 1].OnPostProcessMessage(message);
}
/// <inheritdoc />
public void OnSessionStateChange(SessionState sessionState)
{
_compositeListener.OnSessionStateChange(sessionState);
}
/// <summary>
/// Sets the user message compositeListener.
/// </summary>
/// <param name="listener"> the session compositeListener </param>
public void SetUserListener(IFixSessionListener listener)
{
_compositeListener.SetUserListener(listener);
}
/// <summary>
/// Sets listener to receive session level incoming messages.
/// </summary>
/// <param name="listener"> the user listener </param>
public void AddInSessionMessageListener(IFixMessageListener listener)
{
_compositeListener.AddUserSessionMessageListener(listener);
}
public bool IsNeedPreProcess()
{
return _preProcessMessageHandlers.Count > 0;
}
public void Dispose()
{
Dispose(true);
}
private void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
if (disposing)
{
try
{
DefMessageForParse.ReleaseInstance();
}
catch (Exception t)
{
if (Log.IsDebugEnabled)
{
Log.Warn("Can't release instance: " + t.Message, t);
}
else
{
Log.Warn("Can't release instance: " + t.Message);
}
}
}
_disposed = true;
}
}
}