FixAntenna/Tools/TestUtils/Transport/FIXMessageReader.cs (174 lines of code) (raw):
// Copyright (c) 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.IO;
namespace Epam.FixAntenna.TestUtils.Transport
{
internal class FixMessageReader
{
/// <summary>
/// Length of the check sum footer.
/// Always constant: tag num - 2 bytes, equal sign - 1 byte, check sum value - 3 bytes, SOH - 1 byte
/// </summary>
private const int CheckSumLen = 7;
/// <summary>
/// Optimal buffer length.
/// </summary>
private const int OptimalBufLen = 16384;
/// <summary>
/// Source input stream.
/// </summary>
private readonly Stream _inputStream;
/// <summary>
/// Max allowed size of messages.
/// </summary>
private readonly int _maxMsgSize;
/// <summary>
/// Buffer for incoming bytes.
/// </summary>
private byte[] _buf;
/// <summary>
/// Message reading offset.
/// </summary>
private int _messageReadingOffset;
/// <summary>
/// Writing offset.
/// </summary>
private int _writingOffset;
private readonly object _sync = new object();
public FixMessageReader(Stream inputStream, int maxMsgSize)
{
_inputStream = inputStream;
_maxMsgSize = maxMsgSize;
_buf = new byte[OptimalBufLen];
}
public virtual byte[] ReadMessage()
{
lock (_sync)
{
// State of reader FSM
var state = 0;
// Reading offset
var readingOffset = _messageReadingOffset;
// Reading offset limit
var maxReadingOffset = _maxMsgSize + readingOffset;
// Expected body length
var bodyLength = 0;
for (;;)
{
while (readingOffset < _writingOffset)
{
switch (state)
{
case 0:
if (_buf[readingOffset] != (byte)'8')
{
state = 0;
}
else if (readingOffset > _messageReadingOffset)
{
// Before analyze new message flush the buffered data
// which will recognized as a garbled message
goto loopBreak;
}
else
{
state = 1;
}
break;
case 1:
if (_buf[readingOffset] == (byte)'=')
{
state = 2;
}
else
{
state = 0;
}
break;
case 2:
if (_buf[readingOffset] == (byte)'\x0001')
{
state = 3;
}
break;
case 3:
if (_buf[readingOffset] == (byte)'9')
{
state = 4;
}
else
{
state = 0;
}
break;
case 4:
if (_buf[readingOffset] == (byte)'=')
{
state = 5;
// Clear an accumulator of expected body length
bodyLength = 0;
}
else
{
state = 0;
}
break;
case 5:
var b = _buf[readingOffset];
if (b == (byte)'\x0001')
{
goto loopBreak;
}
else if (b >= (byte)'0' && b <= (byte)'9')
{
bodyLength = bodyLength * 10 + b - (byte)'0';
}
else
{
state = 0;
}
break;
}
readingOffset++;
// Check message size
if (readingOffset > maxReadingOffset)
{
throw new IOException("Too long message. Check <maxMessageSize> in configuration file.");
}
}
// Try to read some bytes from a stream
var n = _inputStream.Read(_buf, _writingOffset, _buf.Length - _writingOffset);
if (n <= 0)
{
goto loopBreak;
}
_writingOffset += n;
if (_writingOffset == _buf.Length)
{
var nbuf = new byte[_writingOffset * 2];
Array.Copy(_buf, 0, nbuf, 0, _writingOffset);
_buf = nbuf;
}
}
loopBreak:
if (state == 5)
{
// Calculate an expected message size
readingOffset += bodyLength + CheckSumLen + 1;
if (readingOffset > maxReadingOffset)
{
throw new IOException("Too long message. Check <maxMessageSize> in configuration file.");
}
if (readingOffset > _buf.Length)
{
var nbuf = new byte[readingOffset];
Array.Copy(_buf, 0, nbuf, 0, _writingOffset);
_buf = nbuf;
}
// Read a rest of the message
while (_writingOffset < readingOffset)
{
var n = _inputStream.Read(_buf, _writingOffset, _buf.Length - _writingOffset);
if (n <= 0)
{
break;
}
_writingOffset += n;
}
}
var msgLen = readingOffset - _messageReadingOffset;
if (msgLen == 0)
{
throw new IOException("End of File read.");
}
// Prepare output message.
var msg = new byte[msgLen];
Array.Copy(_buf, _messageReadingOffset, msg, 0, msgLen);
// If buffer is empty...
if (readingOffset == _writingOffset)
{
if (_writingOffset <= OptimalBufLen && _buf.Length > OptimalBufLen)
{
_buf = new byte[OptimalBufLen];
}
_writingOffset = 0;
_messageReadingOffset = 0;
}
else
{
_messageReadingOffset = readingOffset;
}
return msg;
}
}
public virtual void Close()
{
_inputStream.Close();
}
}
}