FixAntenna/NetCore/FixEngine/Transport/FixMessageChopper.cs (663 lines of code) (raw):
// Copyright (c) 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.IO;
using System.Net;
using System.Net.Security;
using Epam.FixAntenna.NetCore.Common;
using Epam.FixAntenna.NetCore.Common.Logging;
using Epam.FixAntenna.NetCore.Common.Utils;
using Epam.FixAntenna.NetCore.Message;
namespace Epam.FixAntenna.NetCore.FixEngine.Transport
{
/// <summary>
/// Slices FIX messages from the <c>InputStream</c> or <c>Transport</c> implementations.
/// </summary>
internal sealed class FixMessageChopper : IMessageChopper
{
private static ILog _log = LogFactory.GetLog(typeof(FixMessageChopper));
private ITransport _transport;
private readonly bool _checkMessageSize;
private readonly bool _validateCheckSum;
private readonly int _maxMessageSize;
private int _readOffset;
private int _messageStartOffset;
private int _resetOffset;
private int _parsedOffset;
private int _stateChangeOffset;
private readonly int _optimalBufferLength;
private int _state;
private GarbledMessageError _error;
private byte[] _buffer;
private ByteBuffer _optimalBufferObj;
private ByteBuffer _bufferObj;
private TagValue _tempValue = new TagValue();
private readonly bool _markReadingTime;
private bool _isGarbled = false;
// TBD! retrieve it from Session object
private IFixParserListener _parserListener;
private bool _isAdvancedParseControl = false;
/// <summary>
/// Creates <c>FIXMessageChopper</c> with specified message size limit.
/// </summary>
/// <param name="inputStream"> the input stream to read from. </param>
/// <param name="maxMessageSize"> the message size limit. </param>
/// <param name="optimalBufferLength"> the optimal length of internal buffer </param>
public FixMessageChopper(Stream inputStream, int maxMessageSize, int optimalBufferLength) : this(new ReadOnlyTransport(inputStream), maxMessageSize, optimalBufferLength, true, false)
{
}
/// <summary>
/// Creates <c>FIXMessageChopper</c> with specified message size limit.
/// </summary>
/// <param name="inputStream"> the input stream to read from. </param>
/// <param name="maxMessageSize"> the message size limit. </param>
/// <param name="optimalBufferLength"> the optimal length of internal buffer </param>
/// <param name="validateCheckSum"> do not validate CheckSum(10) if this flag is set to false </param>
public FixMessageChopper(Stream inputStream, int maxMessageSize, int optimalBufferLength, bool validateCheckSum) : this(new ReadOnlyTransport(inputStream), maxMessageSize, optimalBufferLength, validateCheckSum, false)
{
}
/// <summary>
/// Creates <c>FIXMessageChopper</c> with specified message size limit.
/// </summary>
/// <param name="transport"> the transport implementation to read from. </param>
/// <param name="maxMessageSize"> the message size limit. </param>
/// <param name="optimalBufferLength"> the optimal length of internal buffer </param>
public FixMessageChopper(ITransport transport, int maxMessageSize, int optimalBufferLength) : this(transport, maxMessageSize, optimalBufferLength, true, false)
{
}
/// <summary>
/// Creates <c>FIXMessageChopper</c> with specified message size limit.
/// </summary>
/// <param name="transport"> the transport implementation to read from. </param>
/// <param name="maxMessageSize"> the message size limit. </param>
/// <param name="optimalBufferLength"> the optimal length of internal buffer </param>
/// <param name="validateCheckSum"> do not validate CheckSum(10) if this flag is set to false </param>
public FixMessageChopper(ITransport transport, int maxMessageSize, int optimalBufferLength, bool validateCheckSum, bool markInMessageTime)
{
_transport = transport;
_maxMessageSize = maxMessageSize;
_checkMessageSize = maxMessageSize > 0;
_validateCheckSum = validateCheckSum;
_optimalBufferLength = optimalBufferLength;
_optimalBufferObj = new ByteBuffer(optimalBufferLength);
_bufferObj = _optimalBufferObj;
_buffer = _bufferObj.GetByteArray();
if (_log.IsDebugEnabled)
{
_log.Debug("Initialize old message chopper " + (markInMessageTime ? "with" : "without") + " marking incoming message time");
}
_markReadingTime = markInMessageTime;
}
/// <inheritdoc />
public RawFixUtil.IRawTags RawTags { get; set; } = new DefaultRawTags();
/// <summary>
/// Returns true if last read message is garbled.
/// </summary>
/// <value> true if last read message is garbled. </value>
public bool IsMessageGarbled
{
get { return _error != null; }
}
/// <summary>
/// Gets buffer
/// </summary>
/// <returns> buffer </returns>
public byte[] GetBuffer()
{
return _buffer;
}
/// <summary>
/// Returns error of last read message if message is garbled or null otherwise.
/// </summary>
/// <value> the instance of error enum. </value>
public GarbledMessageError Error
{
get { return _error; }
}
/// <summary>
/// Returns error position of last read message if message is garbled or -1 otherwise.
/// </summary>
/// <value> the error message string. </value>
public int ErrorPosition
{
get
{
if (_error == null)
{
return -1;
}
if (GarbledMessageError.Field10InvalidChecksum == _error)
{
return _stateChangeOffset - _messageStartOffset - 4;
}
if (GarbledMessageError.Field8TagExpected == _error)
{
return _stateChangeOffset - _messageStartOffset;
}
return _stateChangeOffset - _messageStartOffset + 1;
}
}
/// <summary>
/// Read next message (garbled or non garbled) from
/// the <c>InputStream</c> or <c>Transport</c> implementations.
/// </summary>
/// <returns> the byte representation of FIX message. </returns>
/// <exception cref="IOException"> if some I/O error occurs, or end of file read, or message is too long. </exception>
public void ReadMessage(MsgBuf buf)
{
var message = buf.FixMessage;
if (message == null)
{
message = new FixMessage();
buf.FixMessage = message;
}
if (_parsedOffset == _readOffset)
{
if (_readOffset <= _optimalBufferObj.Length)
{
_buffer = _optimalBufferObj.GetByteArray();
_bufferObj = _optimalBufferObj;
}
_parsedOffset = _readOffset = 0;
}
_messageStartOffset = _parsedOffset;
var offsetLimit = _messageStartOffset + _maxMessageSize;
var parsedBodyLength = 0;
var checksum = 0;
var parsedChecksum = 0;
var parsedChecksumBytes = 0;
_isGarbled = false;
var valueStartIndex = 0;
var valueOffset = 0;
var tag = 0;
var isTagParsing = true;
var userStopParse = false;
var isHeaderTag = true;
var isAdminMsg = false;
message.SetBuffer(_buffer, _messageStartOffset, _readOffset);
MoveToNextStepAndSetResetOffset(GarbledMessageError.Field8TagExpected);
ResetState();
OnMessageStart();
do
{
if (_messageStartOffset != _parsedOffset)
{ // if we read something from transport
if (_readOffset <= _optimalBufferLength || _messageStartOffset == 0)
{
message.SetBuffer(_buffer, _messageStartOffset, _readOffset);
}
else
{
// end of buffer reached, wrap around now and move remaining data to buffer's head
Array.Copy(_buffer, _messageStartOffset, _buffer, 0, _readOffset - _messageStartOffset);
_readOffset -= _messageStartOffset;
_parsedOffset -= _messageStartOffset;
valueStartIndex -= _messageStartOffset;
valueOffset -= _messageStartOffset;
_messageStartOffset = 0;
message.ShiftBuffer(_buffer, _messageStartOffset, _readOffset);
}
offsetLimit = _messageStartOffset + _maxMessageSize;
}
while (_parsedOffset < _readOffset)
{
// Check message size
if (_checkMessageSize && _parsedOffset >= offsetLimit)
{
GetMessage(buf);
throw new GarbledMessageException(MessageChopperFields.MessageIsTooLongError + " (maxMessageSize=" + _maxMessageSize + ")");
}
var ch = (char) _buffer[_parsedOffset];
if (!_isGarbled)
{
if (isTagParsing)
{
if (ch >= '0' && ch <= '9')
{
tag = tag * 10 + (ch - '0');
}
else if (ch == '=')
{
if (RawTags.IsWithinRawTags(tag))
{
var rawLength = 0;
if (message.IsMessageIncomplete)
{
byte bv;
for (var i = valueStartIndex; i < _parsedOffset; i++)
{
bv = _buffer[i];
if (bv >= '0' && bv <= '9')
{
rawLength = rawLength * 10 + (bv - '0');
}
else if (bv == '\u0001')
{
break;
}
}
}
else
{
rawLength = RawFixUtil.GetRawTagLengthFromPreviousField(message);
}
if (rawLength > offsetLimit)
{
throw new GarbledMessageException(MessageChopperFields.RawDataLengthIsTooBigError + " (" + rawLength + ")");
}
valueOffset = _parsedOffset + rawLength;
}
valueStartIndex = _parsedOffset + 1;
isTagParsing = false;
}
else
{
ResetStateAndGetResetOffset();
}
}
else
{
if (ch == '\x0001')
{
// check if not in raw data
if (_parsedOffset > valueOffset)
{
if (_isAdvancedParseControl)
{
if (userStopParse)
{
if (isHeaderTag)
{
if (ParseRequiredTags.IsHeader(tag))
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
else
{
message.LoadTagValue(35, _tempValue);
isAdminMsg = ParseRequiredTags.IsAdminMsg(_tempValue);
isHeaderTag = false;
if (isAdminMsg)
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
}
}
else if (isAdminMsg)
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
else if (ParseRequiredTags.IsTrailer(tag))
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
}
else
{
var control = _parserListener.OnTag(tag, _buffer, valueStartIndex, _parsedOffset - valueStartIndex);
if (control == FixParserListenerParseControl.Continue)
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
else
{
// IGNORE or STOP
if (ParseRequiredTags.IsRequired(tag))
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
else
{
message.IsMessageIncomplete = true;
}
if (control == FixParserListenerParseControl.StopParse)
{
userStopParse = true;
}
}
}
}
else
{
message.Add(tag, valueStartIndex, _parsedOffset - valueStartIndex);
}
tag = 0;
isTagParsing = true;
}
}
}
}
switch (_state)
{
case 0:
checksum += ch;
if (ch == '8')
{
if (AreThereParsedDataInBuffer())
{
// Before analise new message flush the buffered data (garbled message)
goto loopBreak;
}
else
{
MoveToNextStepAndSetResetOffset(GarbledMessageError.Field8TagValueDelimiterExpected);
}
}
else
{
_isGarbled = true;
ResetState();
}
break;
case 1:
checksum += ch;
if (ch == '=')
{
MoveToNextStepAndSetResetOffset(GarbledMessageError.Field8FieldDelimiterExpected);
}
else
{
ResetStateAndGetResetOffset();
}
break;
case 2:
checksum += ch;
if (ch == '\x0001')
{
MoveToNextStepAndSetResetOffset(GarbledMessageError.Field9TagExpected);
}
else if (ch == '8')
{
ResetStateAndGetResetOffset();
}
break;
case 3:
checksum += ch;
if (ch == '9')
{
MoveToNextStepAndSetResetOffset(GarbledMessageError.Field9TagValueDelimiterExpected);
}
else
{
ResetStateAndGetResetOffset();
}
break;
case 4:
checksum += ch;
if (ch == '=')
{
MoveToNextStepAndSetResetOffset(GarbledMessageError.Field9DecimalValueExpected);
}
else
{
ResetStateAndGetResetOffset();
}
break;
case 5:
checksum += ch;
if (ch >= '0' && ch <= '9')
{
parsedBodyLength = parsedBodyLength * 10 + (ch - '0');
}
else if (ch == '\x0001' && parsedBodyLength != 0)
{
MoveToNextStepAndSetResetOffset(GarbledMessageError.Field35TagExpected);
}
else
{
ResetStateAndGetResetOffset();
}
break;
case 6:
checksum += ch;
--parsedBodyLength;
if (ch == '3')
{
MoveToNextState(GarbledMessageError.Field35TagExpected);
}
else
{
ResetStateAndGetResetOffset();
}
break;
case 7:
checksum += ch;
--parsedBodyLength;
if (ch == '5')
{
MoveToNextState(GarbledMessageError.Field35TagValueDelimiterExpected);
}
else
{
ResetStateAndGetResetOffset();
}
break;
case 8:
checksum += ch;
--parsedBodyLength;
if (ch == '=')
{
MoveToNextState(GarbledMessageError.Field35FieldDelimiterExpected);
}
else
{
ResetStateAndGetResetOffset();
}
break;
case 9:
checksum += ch;
--parsedBodyLength;
if (ch == '\x0001')
{
MoveToNextState(GarbledMessageError.InvalidTagNumber);
if (parsedBodyLength <= 0)
{
MoveToNextState(GarbledMessageError.Field10TagExpected);
}
}
break;
case 10:
checksum += ch;
--parsedBodyLength;
if (parsedBodyLength <= 0)
{
MoveToNextState(GarbledMessageError.Field10TagExpected);
}
break;
case 11:
if (ch == '1')
{
checksum &= 255;
MoveToNextState(GarbledMessageError.Field10TagExpected);
}
else
{
ResetStateAndGetResetOffset();
}
break;
case 12:
if (ch == '0')
{
MoveToNextState(GarbledMessageError.Field10TagValueDelimiterExpected);
}
else
{
ResetStateAndGetResetOffset();
}
break;
case 13:
if (ch == '=')
{
MoveToNextState(GarbledMessageError.Field10DecimalValueExpected);
}
else
{
ResetStateAndGetResetOffset();
}
break;
case 14:
if (ch >= '0' && ch <= '9')
{
parsedChecksum = parsedChecksum * 10 + (ch - '0');
if (++parsedChecksumBytes == 3)
{
MoveToNextState(GarbledMessageError.Field10FieldDelimiterExpected);
}
}
else
{
ResetStateAndGetResetOffset();
}
break;
case 15:
if (ch == '\x0001')
{
_parsedOffset++;
MoveToNextState(GarbledMessageError.Field10InvalidChecksum);
if (!_validateCheckSum || checksum == parsedChecksum)
{
MoveToNextState(null);
}
else
{
_isGarbled = true;
ResetState();
}
goto loopBreak;
}
else
{
ResetStateAndGetResetOffset();
}
break;
}
_parsedOffset++;
}
} while (ReadAvailableBytesToBuffer());
loopBreak:
if (_isGarbled && !message.IsMessageIncomplete)
{
message.IsMessageIncomplete = true;
}
OnMessageEnd();
GetMessage(buf);
}
/// <inheritdoc />
public void SetUserParserListener(IFixParserListener parserListener)
{
_isAdvancedParseControl = parserListener != null;
_parserListener = parserListener;
}
public void OnMessageStart()
{
if (_isAdvancedParseControl)
{
_parserListener.OnMessageStart();
}
}
public void OnMessageEnd()
{
if (_isAdvancedParseControl)
{
_parserListener.OnMessageEnd();
}
}
private void GetMessage(MsgBuf buf)
{
var messageLength = _parsedOffset - _messageStartOffset;
if (messageLength == 0)
{
throw new IOException(MessageChopperFields.EofReadError);
}
if (messageLength < 0)
{
throw new IOException(MessageChopperFields.ReadError);
}
//byte[] message = new byte[messageLength];
//byte[] message = getByteArrayFromPool(messageLength);
//System.arraycopy(buffer, messageStartOffset, message, 0, messageLength);
//return message;
buf.Buffer = _buffer;
buf.Offset = _messageStartOffset;
buf.Length = messageLength;
buf.MessageReadTimeInTicks = MessageReadTimeInTicks;
var fixMessage = buf.FixMessage;
if (fixMessage != null)
{
fixMessage.SetBuffer(_buffer, _messageStartOffset, messageLength);
}
}
private bool AreThereParsedDataInBuffer()
{
return _parsedOffset != _messageStartOffset;
}
/// <summary>
/// Reads array of bytes from transport.
/// </summary>
/// <returns> true if method reads some byets </returns>
/// <exception cref="IOException"> if transport returns 0 (EOF) </exception>
private bool ReadAvailableBytesToBuffer()
{
int length;
if (_readOffset < _optimalBufferLength)
{
length = _optimalBufferLength - _readOffset;
}
else
{
length = _buffer.Length - _readOffset;
if (length >= _optimalBufferLength)
{
length = _optimalBufferLength;
}
else
{
_bufferObj.Offset = _readOffset;
_bufferObj.IncreaseBuffer(_bufferObj.Length);
_buffer = _bufferObj.GetByteArray();
length = _optimalBufferLength;
}
}
var n = _transport.Read(_bufferObj, _readOffset, length);
SetReadingStartTime();
if (n == 0)
{
throw new IOException(MessageChopperFields.EofReadError);
}
_readOffset += n;
return n > 0;
}
private void ResetStateAndGetResetOffset()
{
ResetState();
_isGarbled = true;
_parsedOffset = _resetOffset;
}
private void MoveToNextStepAndSetResetOffset(GarbledMessageError errorMessage)
{
_resetOffset = _parsedOffset;
MoveToNextState(errorMessage);
}
private void MoveToNextState(GarbledMessageError errorMessage)
{
_stateChangeOffset = _parsedOffset;
_state++;
_error = errorMessage;
}
private void ResetState()
{
SetReadingStartTime();
_state = 0;
}
private void SetReadingStartTime()
{
if (_markReadingTime)
{
MessageReadTimeInTicks = DateTimeHelper.CurrentTicks;
}
}
internal class ReadOnlyTransport : ITransport
{
internal Stream InputStream;
/// <inheritdoc />
public virtual bool IsBlockingSocket => true;
/// <inheritdoc />
public virtual bool IsSecured => InputStream is SslStream sslStream && sslStream.IsAuthenticated;
/// <inheritdoc />
public ReadOnlyTransport(Stream inputStream)
{
InputStream = inputStream;
}
/// <inheritdoc />
public virtual int Write(byte[] message, int offset, int length)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public virtual void Write(byte[] message)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public virtual int Read(byte[] buffer, int offset, int length)
{
return InputStream.Read(buffer, offset, length);
}
/// <inheritdoc />
public virtual int Read(byte[] buffer)
{
return InputStream.Read(buffer, 0, buffer.Length);
}
/// <inheritdoc />
public virtual int Write(ByteBuffer buffer)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public virtual int Write(ByteBuffer buffer, int offset, int length)
{
throw new NotSupportedException();
}
/// <inheritdoc />
public virtual int Read(ByteBuffer buffer, int offset, int length)
{
return Read(buffer.GetByteArray(), offset, length);
}
/// <inheritdoc />
public virtual int Read(ByteBuffer buffer)
{
return Read(buffer.GetByteArray());
}
/// <inheritdoc />
public virtual void WaitUntilReadyToWrite()
{
}
/// <inheritdoc />
public IPEndPoint LocalEndPoint => throw new NotSupportedException();
/// <inheritdoc />
public IPEndPoint RemoteEndPoint => throw new NotSupportedException();
/// <inheritdoc />
public virtual void Open()
{
throw new NotSupportedException();
}
/// <inheritdoc />
public virtual void Close()
{
throw new NotSupportedException();
}
/// <inheritdoc />
public virtual bool IsOpen => throw new NotSupportedException();
}
/// <inheritdoc />
public long MessageReadTimeInTicks { get; private set; } = -1;
/// <inheritdoc />
public void Reset()
{
_optimalBufferObj.ResetBuffer();
_bufferObj.ResetBuffer();
_buffer = _bufferObj.GetByteArray();
_readOffset = 0;
_parsedOffset = 0;
_resetOffset = 0;
_messageStartOffset = 0;
_stateChangeOffset = 0;
_state = 0;
_error = null;
MessageReadTimeInTicks = -1;
_isGarbled = false;
}
}
}