in FixAntenna/NetCore/FixEngine/Storage/File/MmfIndexedMessageStorage.cs [211:289]
protected override void RetrieveMessagesImplementation(long from, long to, IMessageStorageListener listener, bool blocking)
{
if (from < 1)
{
throw new ArgumentException("From can't be <1");
}
if (to < 1)
{
throw new ArgumentException("To can't be <1");
}
FileStream readFile = null;
try
{
readFile = new FileStream(FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
for (var i = from; i <= to; i++)
{
if (IsClosed)
{
throw new InvalidOperationException("Storage is closed");
}
// read index block
long messagePosition;
int messageLength;
try
{
_indexMappedBufferLock.EnterReadLock();
var readPosition = (int)(i - 1) * IndexLength;
if (_indexMappedSize - IndexLength >= readPosition)
{
messagePosition = _indexMessagesPositionsBuffer.ReadLongBe(readPosition);
messageLength = _indexMessagesPositionsBuffer.ReadIntBe(readPosition + 8);
}
else
{
//no more data
throw new IOException("Message with seqNum " + i + " is not available.");
}
}
finally
{
_indexMappedBufferLock.ExitReadLock();
}
var message = ReadFromStorage(readFile, messagePosition, messageLength);
NotifyListener(listener, blocking, message);
}
}
catch (IOException e)
{
if (_log.IsWarnEnabled)
{
if (_log.IsDebugEnabled)
{
_log.Warn("Problem in retrieving messages from indexed message storage: " + e.ToString(), e);
}
else
{
_log.Warn("Problem in retrieving messages from indexed message storage: " + e.ToString());
}
}
if (IsClosed)
{
throw new StorageClosedException("Storage is closed. Cause: " + e.Message);
}
else
{
throw;
}
}
finally
{
readFile?.Close();
}
}