FixAntenna/NetCore/FixEngine/Transport/NewMessageChopper.cs (360 lines of code) (raw):
// Copyright (c) 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.IO;
using Epam.FixAntenna.NetCore.Common;
using Epam.FixAntenna.NetCore.Common.Logging;
using Epam.FixAntenna.NetCore.Common.Utils;
using Epam.FixAntenna.NetCore.Helpers;
using Epam.FixAntenna.NetCore.Message;
namespace Epam.FixAntenna.NetCore.FixEngine.Transport
{
/// <summary>
/// Slices and parse FIX messages from the <c>Transport</c> implementations.
/// </summary>
internal sealed class NewMessageChopper : IMessageChopper
{
private static ILog _log = LogFactory.GetLog(typeof(NewMessageChopper));
private readonly ITransport _transport;
private readonly bool _checkMessageSize;
private readonly int _maxMessageSize;
private int _readOffset;
private int _messageStartOffset;
private int _parsedOffset;
private GarbledMessageError _error;
private byte[] _buffer;
private ByteBuffer _bufferObj;
private readonly TagValue _tempValue = new TagValue();
private readonly bool _markReadingTime;
public const int SocketReadSize = SocketTransport.SocketReadSize;
// TBD! retrieve it from Session object
private IFixParserListener _parserListener;
private bool _isAdvancedParseControl = false;
/// <summary>
/// Creates <c>NewMessageChopper</c> with specified message size limit.
/// </summary>
/// <param name="transport"> the transport implementstion to read from. </param>
/// <param name="maxMessageSize"> the message size limit. </param>
/// <param name="optimalBufferLength"> the optimal length of internal buffer </param>
public NewMessageChopper(ITransport transport, int maxMessageSize, int optimalBufferLength, bool markInMessageTime) : this(transport, maxMessageSize, optimalBufferLength, markInMessageTime, null)
{
}
/// <summary>
/// Creates <c>NewMessageChopper</c> with specified message size limit.
/// </summary>
/// <param name="transport"> the transport implementstion to read from. </param>
/// <param name="maxMessageSize"> the message size limit. </param>
/// <param name="optimalBufferLength"> the optimal length of internal buffer </param>
/// <param name="parserListener"> </param>
public NewMessageChopper(ITransport transport, int maxMessageSize, int optimalBufferLength, bool markInMessageTime, IFixParserListener parserListener)
{
_transport = transport;
_maxMessageSize = maxMessageSize;
// TBD! add check of message size. The rest must be defined as garbled.
_checkMessageSize = maxMessageSize > 0;
_bufferObj = new ByteBuffer(SocketReadSize);
_buffer = _bufferObj.GetByteArray();
SetUserParserListener(parserListener);
if (_log.IsDebugEnabled)
{
_log.Debug("Initialize new message chopper " + (markInMessageTime ? "with" : "without") + " marking incoming message time");
}
_markReadingTime = markInMessageTime;
}
/// <inheritdoc />
public RawFixUtil.IRawTags RawTags { get; set; } = new DefaultRawTags();
/// <summary>
/// Returns true if last read message is garbled.
/// </summary>
/// <value> true if last read message is garbled. </value>
public bool IsMessageGarbled
{
get { return _error != null; }
}
/// <summary>
/// Gets buffer
/// </summary>
/// <returns> buffer </returns>
public byte[] GetBuffer()
{
return _buffer;
}
/// <summary>
/// Returns error of last read message if message is garbled or null otherwise.
/// </summary>
/// <value> the instance of error enum. </value>
public GarbledMessageError Error
{
get { return _error; }
}
/// <summary>
/// Returns error position of last read message if message is garbled or -1 otherwise.
/// </summary>
/// <value> the error message string. </value>
public int ErrorPosition
{
get
{
return -1;
}
}
public void ReadMessage(MsgBuf buf)
{
var message = buf.FixMessage;
if (message == null)
{
message = new FixMessage();
buf.FixMessage = message;
}
if (_parsedOffset == _readOffset)
{
_parsedOffset = _readOffset = 0; // no data left in buffer, roll back to the beginning
}
else
{
if (_readOffset > SocketReadSize / 2)
{
var availData = _readOffset - _parsedOffset;
var tail = SocketReadSize - _readOffset;
if (availData < 256 || tail < 1024) // preventive data move ?
{
Array.Copy(_buffer, _parsedOffset, _buffer, 0, availData);
_readOffset -= _parsedOffset;
_parsedOffset = 0;
}
}
}
_messageStartOffset = _parsedOffset;
while (_parsedOffset + 20 > _readOffset)
{
ReadAvailableBytesToBuffer(buf);
}
message.SetBuffer(_buffer, _messageStartOffset, _readOffset);
var valueStartIndex = 0;
var tag = 0;
var isTagParsing = true;
var readSomeBytes = false;
var userStopParse = false;
var isHeaderTag = true;
var isAdminMsg = false;
var stopParse = false;
OnMessageStart();
while (true)
{
while (_parsedOffset >= _readOffset)
{
ReadAvailableBytesToBuffer(buf);
readSomeBytes = true;
}
if (readSomeBytes)
{
readSomeBytes = false;
if (_readOffset <= SocketReadSize || _messageStartOffset == 0)
{
message.SetBuffer(_buffer, _messageStartOffset, _readOffset);
}
else
{
// end of buffer reached, wrap around now and move remaining data to buffer's head
Array.Copy(_buffer, _messageStartOffset, _buffer, 0, _readOffset - _messageStartOffset);
_readOffset -= _messageStartOffset;
_parsedOffset -= _messageStartOffset;
valueStartIndex -= _messageStartOffset;
_messageStartOffset = 0;
message.ShiftBuffer(_buffer, _messageStartOffset, _readOffset);
}
}
var b = _buffer[_parsedOffset];
if (isTagParsing)
{
if (b >= (byte)'0' && b <= (byte)'9')
{
tag = tag * 10 + (b - '0');
}
else if (b == (byte)'=')
{
if (RawTags.IsWithinRawTags(tag))
{
var rawLength = 0;
if (message.IsMessageIncomplete)
{
byte bv;
for (var i = valueStartIndex; i < _parsedOffset; i++)
{
bv = _buffer[i];
if (bv >= (byte)'0' && bv <= (byte)'9')
{
rawLength = rawLength * 10 + (bv - '0');
}
else
{
break;
}
}
}
else
{
rawLength = RawFixUtil.GetRawTagLengthFromPreviousField(message);
}
if (rawLength > _maxMessageSize)
{
throw new GarbledMessageException(MessageChopperFields.RawDataLengthIsTooBigError + " (" + rawLength + ")");
}
valueStartIndex = _parsedOffset + 1;
_parsedOffset += rawLength;
}
else
{
valueStartIndex = _parsedOffset + 1;
}
isTagParsing = false;
}
else
{
GetMessage(buf);
throw new GarbledMessageException("Invalid tag number");
}
}
else
{
if (b == (byte)'\x0001')
{
if (_isAdvancedParseControl)
{
if (userStopParse)
{
if (isHeaderTag)
{
if (ParseRequiredTags.IsHeader(tag))
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
else
{
message.LoadTagValue(35, _tempValue);
isAdminMsg = ParseRequiredTags.IsAdminMsg(_tempValue);
isHeaderTag = false;
if (isAdminMsg)
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
}
}
else if (isAdminMsg)
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
else if (!stopParse)
{
// can be stop
message.LoadTagValue(9, _tempValue);
var msgLength = (int) _tempValue.LongValue;
// | length considered from this point | length |
_parsedOffset = _tempValue.Offset + _tempValue.Length + 1 + msgLength - 1;
stopParse = true;
}
else if (tag == 10)
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
}
else
{
var control = _parserListener.OnTag(tag, _buffer, valueStartIndex, _parsedOffset - valueStartIndex);
if (control == FixParserListenerParseControl.Continue)
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
else
{
// IGNORE or STOP
if (ParseRequiredTags.IsRequired(tag))
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
else
{
message.IsMessageIncomplete = true;
}
if (control == FixParserListenerParseControl.StopParse)
{
userStopParse = true;
}
}
}
}
else
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
if (tag == 10)
{
_parsedOffset++;
message.SetBuffer(_buffer, _messageStartOffset, _parsedOffset - _messageStartOffset);
break;
}
tag = 0;
isTagParsing = true;
}
}
_parsedOffset++;
}
OnMessageEnd();
GetMessage(buf);
}
public void SetUserParserListener(IFixParserListener parserListener)
{
_isAdvancedParseControl = parserListener != null;
_parserListener = parserListener;
}
public void OnMessageStart()
{
if (_isAdvancedParseControl)
{
_parserListener.OnMessageStart();
}
}
public void OnMessageEnd()
{
if (_isAdvancedParseControl)
{
_parserListener.OnMessageEnd();
}
}
private void GetMessage(MsgBuf buf)
{
var messageLength = _parsedOffset - _messageStartOffset;
if (messageLength == 0)
{
throw new IOException(MessageChopperFields.EofReadError);
}
if (messageLength < 0)
{
throw new IOException(MessageChopperFields.ReadError);
}
buf.Buffer = _buffer;
buf.Offset = _messageStartOffset;
buf.Length = messageLength;
buf.MessageReadTimeInTicks = MessageReadTimeInTicks;
}
private bool AreThereParsedDataInBuffer()
{
return _parsedOffset != _messageStartOffset;
}
/// <summary>
/// Reads array of bytes from transport.
/// </summary>
/// <returns> true if method reads some byets </returns>
/// <exception cref="IOException"> if transport returns -1 (EOF) </exception>
private bool ReadAvailableBytesToBuffer(MsgBuf buf)
{
int length;
if (_readOffset < SocketReadSize)
{
length = SocketReadSize - _readOffset;
}
else
{
length = _buffer.Length - _readOffset;
if (length >= SocketReadSize)
{
length = SocketReadSize;
}
else
{
if (_readOffset - _messageStartOffset > _maxMessageSize)
{
//TODO: mask here
var message = StringHelper.NewString(_buffer, _messageStartOffset, _parsedOffset - _messageStartOffset);
GetMessage(buf);
throw new GarbledMessageException(MessageChopperFields.MessageIsTooLongError + " (maxMessageSize=" + _maxMessageSize + "): " + message);
}
_bufferObj.Offset = _readOffset;
_bufferObj.IncreaseBuffer(_bufferObj.Length);
_buffer = _bufferObj.GetByteArray();
length = SocketReadSize;
}
}
var n = _transport.Read(_bufferObj, _readOffset, length);
SetReadingStartTime();
if (n == 0)
{
throw new IOException(MessageChopperFields.EofReadError);
}
_readOffset += n;
return n > 0;
}
private void SetReadingStartTime()
{
if (_markReadingTime)
{
MessageReadTimeInTicks = DateTimeHelper.CurrentTicks;
}
}
public long MessageReadTimeInTicks { get; private set; } = -1;
public void Reset()
{
_messageStartOffset = 0;
_readOffset = 0;
_parsedOffset = 0;
_bufferObj.ResetBuffer();
_buffer = _bufferObj.GetByteArray();
_error = null;
MessageReadTimeInTicks = -1;
}
}
}