FixAntenna/Tools/Tester/Transport/FIXMessageReader.cs (177 lines of code) (raw):

// Copyright (c) 2021 EPAM Systems // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using System; using System.IO; namespace Epam.FixAntenna.Tester.Transport { public class FIXMessageReader { /// <summary> /// Length of the check sum footer. /// Always constant: tag num - 2 bytes, equal sign - 1 byte, check sum value - 3 bytes, SOH - 1 byte /// </summary> private const int CHKSUM_LEN = 7; /// <summary> /// Optimal buffer length. /// </summary> private const int OPTIMAL_BUF_LEN = 16384; /// <summary> /// Buffer for incoming bytes. /// </summary> private byte[] _buf; /// <summary> /// Writing offset. /// </summary> private int _woff = 0; /// <summary> /// Message reading offset. /// </summary> private int _mroff = 0; /// <summary> /// Source input stream. /// </summary> private Stream @in; /// <summary> /// Max allowed size of messages. /// </summary> private int _maxMsgSize; private readonly object _sync = new object(); public FIXMessageReader(Stream @in, int maxMsgSize) { this.@in = @in; this._maxMsgSize = maxMsgSize; _buf = new byte[OPTIMAL_BUF_LEN]; } public virtual byte[] ReadMessage() { lock (_sync) { // State of reader FSM int state = 0; // Reading offset int roff = _mroff; // Reading offset limit int maxroff = _maxMsgSize + roff; // Expected body length int bodylen = 0; for (; ;) { while (roff < _woff) { switch (state) { case 0: if (_buf[roff] != (byte) '8') { state = 0; } else if (roff > _mroff) { // Before analise new message flush the buffered data // which will recognized as a garbled message goto loopBreak; } else { state = 1; } break; case 1: if (_buf[roff] == (byte) '=') { state = 2; } else { state = 0; } break; case 2: if (_buf[roff] == (byte) '\x0001') { state = 3; } break; case 3: if (_buf[roff] == (byte) '9') { state = 4; } else { state = 0; } break; case 4: if (_buf[roff] == (byte) '=') { state = 5; // Clear an accumulator of expected body length bodylen = 0; } else { state = 0; } break; case 5: byte b = _buf[roff]; if (b == (byte) '\x0001') { goto loopBreak; } else if (b >= (byte) '0' && b <= (byte) '9') { bodylen = bodylen * 10 + b - (byte) '0'; } else { state = 0; } break; } roff++; // Check message size if (roff > maxroff) { throw new IOException("Too long message. Check <maxMessageSize> in configuration file."); } } // Try to read some bytes from a stream int n = @in.Read(_buf, _woff, _buf.Length - _woff); if (n <= 0) { goto loopBreak; } else { _woff += n; if (_woff == _buf.Length) { byte[] nbuf = new byte[_woff * 2]; Array.Copy(_buf, 0, nbuf, 0, _woff); _buf = nbuf; } } } loopBreak: if (state == 5) { // Calculate an expected message size roff += bodylen + CHKSUM_LEN + 1; if (roff > maxroff) { throw new IOException("Too long message. Check <maxMessageSize> in configuration file."); } if (roff > _buf.Length) { byte[] nbuf = new byte[roff]; Array.Copy(_buf, 0, nbuf, 0, _woff); _buf = nbuf; } // Read a rest of the message while (_woff < roff) { int n = @in.Read(_buf, _woff, _buf.Length - _woff); if (n <= 0) { break; } _woff += n; } } int msglen = roff - _mroff; if (msglen == 0) { throw new IOException("End of File read."); } // Prepare output message. byte[] msg = new byte[msglen]; Array.Copy(_buf, _mroff, msg, 0, msglen); // If buffer is empty... if (roff == _woff) { if (_woff <= OPTIMAL_BUF_LEN && _buf.Length > OPTIMAL_BUF_LEN) { _buf = new byte[OPTIMAL_BUF_LEN]; } _woff = 0; _mroff = 0; } else { _mroff = roff; } return msg; } } public virtual void close() { @in.Close(); } } }