in FixAntenna/NetCore/FixEngine/Storage/File/SlicedIndexedMessageStorage.cs [78:166]
protected override void RetrieveMessagesImplementation(long from, long to, IMessageStorageListener listener, bool blocking)
{
if (from < 1)
{
throw new ArgumentException("From can't be <1");
}
if (to < 1)
{
throw new ArgumentException("To can't be <1");
}
FileStream readFile = null;
var currentChunk = -1;
try
{
for (var i = from; i <= to; i++)
{
var indexBlock = new ByteBuffer(IndexLength);
if (IsClosed)
{
throw new InvalidOperationException("Storage is closed");
}
var len = -1;
lock (_indexLock)
{
_index.Position = (i - 1) * IndexLength;
var tempIndexData = new byte[IndexLength];
len = _index.Read(tempIndexData, 0, tempIndexData.Length);
indexBlock.Put(tempIndexData, 0, len);
}
if (len <= 0)
{
return; //no more data
}
indexBlock.Position = 0;
var chunkId = indexBlock.GetIntBe();
var messagePosition = indexBlock.GetLongBe();
var messageLength = indexBlock.GetIntBe();
if (currentChunk != chunkId && chunkId != 0)
{
// chunkId==0 it is means that there is gap.
currentChunk = chunkId;
if (readFile != null)
{
readFile.Close();
readFile = null;
}
readFile = new FileStream(FileManager.GetFileName(currentChunk), FileMode.Open, FileAccess.Read,
FileShare.ReadWrite);
}
var message = ReadFromStorage(readFile, messagePosition, messageLength);
NotifyListener(listener, blocking, message);
}
}
catch (Exception e)
{
if (Log.IsWarnEnabled)
{
if (Log.IsDebugEnabled)
{
Log.Warn("Problem in retrieving messages from indexed message storage: " + e.ToString(), e);
}
else
{
Log.Warn("Problem in retrieving messages from indexed message storage: " + e.ToString());
}
}
if (IsClosed)
{
throw new StorageClosedException("Storage is closed. Cause: " + e.Message);
}
else
{
throw new IOException(e.Message);
}
}
finally
{
readFile?.Close();
}
}