protected override void RetrieveMessagesImplementation()

in FixAntenna/NetCore/FixEngine/Storage/File/MmfIndexedMessageStorage.cs [211:289]


		protected override void RetrieveMessagesImplementation(long from, long to, IMessageStorageListener listener, bool blocking)
		{
			if (from < 1)
			{
				throw new ArgumentException("From can't be <1");
			}

			if (to < 1)
			{
				throw new ArgumentException("To can't be <1");
			}

			FileStream readFile = null;
			try
			{
				readFile = new FileStream(FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
				for (var i = from; i <= to; i++)
				{
					if (IsClosed)
					{
						throw new InvalidOperationException("Storage is closed");
					}

					// read index block
					long messagePosition;
					int messageLength;
					try
					{
						_indexMappedBufferLock.EnterReadLock();
						var readPosition = (int)(i - 1) * IndexLength;
						if (_indexMappedSize - IndexLength >= readPosition)
						{
							messagePosition = _indexMessagesPositionsBuffer.ReadLongBe(readPosition);
							messageLength = _indexMessagesPositionsBuffer.ReadIntBe(readPosition + 8);
						}
						else
						{
							//no more data
							throw new IOException("Message with seqNum " + i + " is not available.");
						}
					}
					finally
					{
						_indexMappedBufferLock.ExitReadLock();
					}

					var message = ReadFromStorage(readFile, messagePosition, messageLength);

					NotifyListener(listener, blocking, message);
				}
			}
			catch (IOException e)
			{
				if (_log.IsWarnEnabled)
				{
					if (_log.IsDebugEnabled)
					{
						_log.Warn("Problem in retrieving messages from indexed message storage: " + e.ToString(), e);
					}
					else
					{
						_log.Warn("Problem in retrieving messages from indexed message storage: " + e.ToString());
					}
				}

				if (IsClosed)
				{
					throw new StorageClosedException("Storage is closed. Cause: " + e.Message);
				}
				else
				{
					throw;
				}
			}
			finally
			{
				readFile?.Close();
			}
		}