FixAntenna/NetCore/FixEngine/Session/IoThreads/SyncMessagePumper.cs (721 lines of code) (raw):
// Copyright (c) 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.IO;
using System.Threading;
using Epam.FixAntenna.Constants.Fixt11;
using Epam.FixAntenna.NetCore.Common;
using Epam.FixAntenna.NetCore.Common.Logging;
using Epam.FixAntenna.NetCore.Common.Utils;
using Epam.FixAntenna.NetCore.Configuration;
using Epam.FixAntenna.NetCore.FixEngine.Session.IoThreads.Bean;
using Epam.FixAntenna.NetCore.FixEngine.Session.Util;
using Epam.FixAntenna.NetCore.FixEngine.Storage;
using Epam.FixAntenna.NetCore.FixEngine.Storage.Queue;
using Epam.FixAntenna.NetCore.FixEngine.Transport;
using Epam.FixAntenna.NetCore.Helpers;
using Epam.FixAntenna.NetCore.Message;
using Epam.FixAntenna.NetCore.Message.SpecialTags;
namespace Epam.FixAntenna.NetCore.FixEngine.Session.IoThreads
{
/// <summary>
/// The message pumper writes messages to transport.
/// </summary>
/// <seealso cref="IQueue{T}"></seealso>
/// <seealso cref="FixMessageWithType"></seealso>
/// <seealso cref="IMessageStorage"></seealso>
/// <seealso cref="IFixMessageFactory"></seealso>
/// <seealso cref="IFixTransport"></seealso>
internal sealed class SyncMessagePumper : AffinitySupportThread, IMessagePumper
{
private static readonly ILog Log = LogFactory.GetLog(typeof(SyncMessagePumper));
public static readonly bool TraceEnabled = Log.IsTraceEnabled;
private const int Second = 1000;
private FixMessageWithType _tmpMessageWithType = new FixMessageWithType();
private MsgBuf _tmpMsgBuf = new MsgBuf();
private int _hbtSeconds;
private int _shutdownTimeout;
private int _waitOnSendMillis = Second;
private IExtendedFixSession _fixSession;
private readonly IQueue<FixMessageWithType> _queue;
private IMessageStorage _outgoingLog;
private IFixMessageFactory _fixMessageFactory;
private SessionParameters _sessionParameters;
private ConfigurationAdapter _configurationAdapter;
private FixSessionRuntimeState _runtimeState;
private IFixTransport _transport;
private long _messageProcessedTimestamp;
private bool _statisticEnabled;
private MessageStatistic _messageStatistic;
private int _queueThresholdSize;
private volatile bool _shutdownFlag;
private bool _enableMessageRejecting;
private int _maxMessagesToSendInBatch;
private SerializationContext _context;
private ISessionSequenceManager _sequenceManager;
private ByteBuffer _messageBuffer;
private int[] _messageStart;
private int[] _messageEnd;
internal bool IsTransportBlockingSend;
private int _dataChunkStart;
private int _dataChunkEnd;
private RawFixUtil.IRawTags _rawTags;
private IMaskedTags _maskedTags;
/// <summary>
/// Creates the <c>SyncMessagePumper</c>.
/// </summary>
/// <param name="queue"> the output queue </param>
/// <param name="messageFactory"> the output message storage </param>
/// <param name="transport"> the transport </param>
public SyncMessagePumper(IExtendedFixSession extendedFixSession, IQueue<FixMessageWithType> queue, IMessageStorage @out,
IFixMessageFactory messageFactory, IFixTransport transport, ISessionSequenceManager sequenceManager)
: base("MPThread>:" + extendedFixSession.Parameters.SessionId)
{
_fixSession = extendedFixSession;
_sessionParameters = extendedFixSession.Parameters;
_runtimeState = extendedFixSession.RuntimeState;
_queue = queue;
_outgoingLog = @out;
_hbtSeconds = _sessionParameters.HeartbeatInterval;
_shutdownTimeout = _sessionParameters.Configuration.GetPropertyAsInt(Config.WritingThreadShutdownTimeout);
if (_shutdownTimeout < 0)
{
_waitOnSendMillis = _hbtSeconds;
}
_transport = transport;
_fixMessageFactory = messageFactory;
_configurationAdapter = new ConfigurationAdapter(_sessionParameters.Configuration);
_sequenceManager = sequenceManager;
_rawTags = RawFixUtil.CreateRawTags(_sessionParameters.Configuration.GetProperty(Config.RawTags));
_maskedTags = CustomMaskedTags.Create(_sessionParameters.Configuration.GetProperty(Config.MaskedTags));
}
public long Init()
{
if (_messageBuffer != null)
{
//initialized already
throw new InvalidOperationException("SyncMessagePumper is initialized already");
}
_queueThresholdSize = _configurationAdapter.ThresholdSize;
_enableMessageRejecting = _configurationAdapter.IsEnableMessageRejecting;
_maxMessagesToSendInBatch = _configurationAdapter.MaxMessagesToSendInBatch;
_waitOnSendMillis = _configurationAdapter.GetWaitForQueuingMessages(Second);
_messageBuffer = new ByteBuffer(100 * _maxMessagesToSendInBatch);
_messageStart = new int[_maxMessagesToSendInBatch];
_messageEnd = new int[_maxMessagesToSendInBatch];
_context = new SerializationContext(_fixMessageFactory);
_statisticEnabled = _configurationAdapter.IsMessageStatisticEnabled;
if (_statisticEnabled)
{
_messageStatistic = new MessageStatistic();
}
Interlocked.Exchange(ref _messageProcessedTimestamp, DateTimeHelper.CurrentMilliseconds);
IsTransportBlockingSend = _transport.IsBlockingSocket;
return _outgoingLog.Initialize();
}
public long MessageProcessedTimestamp
{
get => Interlocked.Read(ref _messageProcessedTimestamp);
private set => Interlocked.Exchange(ref _messageProcessedTimestamp, value);
}
/// <returns> true is statistic is enabled </returns>
public bool IsStatisticEnabled => _statisticEnabled;
/// <summary>
/// Gets statistic of processed messages.
/// WARNING: Before the call to ensure that the statistics are included.
/// </summary>
/// <value> MessageStatistic </value>
/// <exception cref="InvalidOperationException"> if <c>statisticEnabled</c> is false </exception>
/// <seealso cref="MessageReader.IsStatisticEnabled"> </seealso>
public MessageStatistic Statistic
{
get
{
if (_statisticEnabled)
{
return _messageStatistic;
}
throw new InvalidOperationException("Message statistic is disabled");
}
}
public bool HasWorkQueued()
{
return !_queue.IsEmpty || HasDataChunkToTransfer();
}
public bool HasAnyWorkQueued()
{
return !_queue.IsAllEmpty || HasDataChunkToTransfer();
}
protected override void Run()
{
if (TraceEnabled)
{
Log.Trace("Start MPThread: " + _fixSession);
}
Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
try
{
var configuration = _sessionParameters.Configuration;
ApplyAffinity(configuration.GetPropertyAsInt(Config.SendCpuAffinity), configuration.GetPropertyAsInt(Config.CpuAffinity));
Thread.BeginThreadAffinity();
while (!_shutdownFlag || HasWorkQueued())
{
var bufferedMessageCount = 0;
var waitUntilReadyToSendChunk = false;
lock (_queue)
{
WaitWhileQueueIsEmptyUntilSendingOfHeartbeat();
if (HasDataChunkToTransfer())
{
waitUntilReadyToSendChunk = true;
}
}
if (waitUntilReadyToSendChunk)
{
_transport.WaitUntilReadyToWrite();
lock (_queue)
{
if (HasDataChunkToTransfer())
{
WriteToTransport(0);
continue;
}
}
}
lock (_queue)
{
//get not real queue size but requested count of messages to send
var queueSize = _queue.Size;
if (queueSize > 0)
{
if (TraceEnabled)
{
Log.Trace(_fixSession + " queue size: " + queueSize);
}
bufferedMessageCount = FillBuffer(queueSize);
}
else
{
EnqueueHeartbeatToSend();
}
if (bufferedMessageCount > 0)
{
SendMessages(bufferedMessageCount);
}
}
}
}
catch (Exception ex)
{
if (!GracefulShutdown)
{
ReportErrorAndShutdown(ex);
}
else
{
var error = "IOError in message pumper. Some messages have not been sent:" + _queue.TotalSize;
Log.Debug(error, ex);
}
}
finally
{
CloseOutgoingLog();
if (Log.IsTraceEnabled)
{
Log.Trace("Stop MPThread: " + _fixSession);
}
Thread.EndThreadAffinity();
}
}
private void ReportErrorAndShutdown(Exception ex)
{
var error = "IOError in message pumper. Some messages have not been sent:" + _queue.TotalSize;
_fixSession.ErrorHandler.OnError(error, ex);
if (Log.IsDebugEnabled)
{
Log.Warn(error, ex);
}
else
{
Log.Warn(error + ". " + ex.Message);
}
_fixSession.Shutdown(DisconnectReason.BrokenConnection, false);
}
private void CloseOutgoingLog()
{
try
{
//do synchronization to avoid closing storage during synchronous sending
lock (_queue)
{
_outgoingLog.Dispose();
}
}
catch (IOException e)
{
if (Log.IsDebugEnabled)
{
Log.Warn("Outgoing Log file cannot be closed", e);
}
else
{
Log.Warn("Outgoing Log file cannot be closed. " + e.Message);
}
}
}
private void WaitWhileQueueIsEmptyUntilSendingOfHeartbeat()
{
if (!HasWorkQueued() && !_shutdownFlag)
{
Monitor.PulseAll(_queue);
var timeout = _hbtSeconds * Second - (DateTimeHelper.CurrentMilliseconds - MessageProcessedTimestamp);
if (timeout <= 0)
{
timeout = _hbtSeconds * Second;
}
SafeWaitMilis(timeout);
}
}
/// <summary>
/// Sends the heartbeat message.
/// </summary>
private void EnqueueHeartbeatToSend()
{
if (IsHeartbeatRequired())
{
_fixSession.SendMessageOutOfTurn("0", null);
}
}
private bool IsHeartbeatRequired()
{
return !_shutdownFlag && _hbtSeconds != 0 && SessionState.IsConnected(_fixSession.SessionState) && ((DateTimeHelper.CurrentMilliseconds - MessageProcessedTimestamp) >= (_hbtSeconds * Second));
}
private void UpdateStatistic()
{
_messageStatistic.AddMessagesProcessed();
_messageStatistic.AddBytesProcessed(_messageBuffer.Offset);
}
public int FillBuffer(int queueSize)
{
if (queueSize > _maxMessagesToSendInBatch)
{
queueSize = _maxMessagesToSendInBatch;
}
var messageCount = 0;
_messageBuffer.ResetBuffer();
while (messageCount < queueSize && _messageBuffer.Offset < _transport.OptimalBufferSize)
{
_messageStart[messageCount] = _messageBuffer.Offset;
PollAndPrepareToSend(_messageBuffer);
_messageEnd[messageCount] = _messageBuffer.Offset;
messageCount++;
}
return messageCount;
}
public void SendMessages(int messageCount)
{
SendMessages(messageCount, null);
}
public bool SendMessages(int messageCount, byte[] externalBuffer)
{
var allWritten = WriteToTransport(messageCount, externalBuffer);
for (var i = 0; i < messageCount; i++)
{
try
{
_fixSession.ExtendedFixSessionListener.OnMessageSent(_messageBuffer.GetByteArray(), _messageStart[i], _messageEnd[i] - _messageStart[i]);
}
catch (Exception e)
{
if (Log.IsDebugEnabled)
{
Log.Warn("ExtendedFIXSessionListener::OnMessageSent thrown error. Cause:" + e.Message, e);
}
else
{
Log.Warn("ExtendedFIXSessionListener::OnMessageSent thrown error. Cause:" + e.Message);
}
}
}
return allWritten;
}
public void ScheduleChunkToTransfer(int start, int end)
{
_dataChunkStart = start;
_dataChunkEnd = end;
Monitor.PulseAll(_queue);
}
private void WriteToTransport(int messageCount)
{
WriteToTransport(messageCount, null);
}
private bool WriteToTransport(int messageCount, byte[] externalBuffer)
{
//TODO extract next block into separate function. Looks like for main thread it should be used without the rest part of code (messageCount = 0, externalBuffer = null)
int toWrite, written;
if (HasDataChunkToTransfer())
{
toWrite = _dataChunkEnd - _dataChunkStart;
written = _transport.Write(_messageBuffer, _dataChunkStart, toWrite);
_dataChunkStart += written;
// update statistics
if (_statisticEnabled)
{
_messageStatistic.AddBytesProcessed(written);
}
return false;
}
toWrite = _messageEnd[messageCount - 1] - _messageStart[0];
if (externalBuffer != null)
{
try
{
written = _transport.Write(externalBuffer, _messageStart[0], toWrite);
if (written < toWrite)
{
_messageBuffer.Add(externalBuffer, _messageStart[0] + written, toWrite - written);
ScheduleChunkToTransfer(_messageStart[0] + written, toWrite);
return false;
}
}
catch (IOException e)
{
_messageBuffer.Add(externalBuffer, _messageStart[0], toWrite);
ScheduleChunkToTransfer(_messageStart[0], toWrite);
//throw e;
var messages = new string[1];
messages[0] = StringHelper.NewString(externalBuffer, _messageStart[0], toWrite);
throw new TransportMessagesNotSentException(e, messages);
}
}
else
{
try
{
written = _transport.Write(_messageBuffer, _messageStart[0], toWrite);
if (written < toWrite)
{
ScheduleChunkToTransfer(_messageStart[0] + written, toWrite);
return false;
}
}
catch (IOException e)
{
ScheduleChunkToTransfer(_messageStart[0], toWrite);
//throw e;
var messages = new string[messageCount];
for (var i = 0; i < messageCount; i++)
{
messages[i] = StringHelper.NewString(_messageBuffer.GetByteArray(), _messageStart[i], _messageEnd[i]);
}
throw new TransportMessagesNotSentException(e, messages);
}
}
// update statistics
MessageProcessedTimestamp = DateTimeHelper.CurrentMilliseconds;
if (_statisticEnabled)
{
//updateStatistic();
_messageStatistic.AddMessagesProcessed(messageCount);
_messageStatistic.AddBytesProcessed(written);
}
return true;
}
//TODO: looks like we need 2 methods - for MsgBuf and for global buff
private bool PrepareToSend(MsgBuf buf, FixMessageWithType messageWithType, ByteBuffer messageBuffer)
{
var offset = messageBuffer.Offset;
var useOrigMsgBuffer = false;
var fixMessage = messageWithType.FixMessage;
var type = messageWithType.MessageType;
if (messageWithType.ChangesType != null)
{
_fixMessageFactory.Serialize(fixMessage, messageWithType.ChangesType, messageBuffer, _context);
}
else
{
_fixMessageFactory.Serialize(buf, type, fixMessage, messageBuffer, _context);
useOrigMsgBuffer = buf != null && buf.Buffer != null;
}
if (!ReferenceEquals(type, null) || messageWithType.ChangesType != null)
{
_sequenceManager.IncrementOutSeqNum();
}
if (useOrigMsgBuffer)
{
SaveMessageInOutLog(buf.Buffer, buf.Offset, buf.Length);
// postpone calling releaseMessageIfNeeded until we are done sending the message buffer to socket
// useOrigMsgBuffer can be set true only when sending 1 message synchronously (bypassing the queue)
}
else
{
var endPoint = messageBuffer.Offset;
SaveMessageInOutLog(messageBuffer, offset, endPoint - offset);
}
return useOrigMsgBuffer;
}
private void ReleaseMessageIfNeeded(FixMessageWithType messageWithType)
{
var fixMessage = messageWithType.FixMessage;
messageWithType.ReleaseInstance();
ReleaseMessageIfNeeded(fixMessage);
}
private void ReleaseMessageIfNeeded(FixMessage fixMessage)
{
if (fixMessage != null && fixMessage.NeedReleaseAfterSend)
{
fixMessage.ReleaseInstance();
}
}
public bool HasDataChunkToTransfer()
{
return _dataChunkEnd - _dataChunkStart > 0;
}
private void PollAndPrepareToSend(ByteBuffer messageBuffer)
{
var messageWithType = _queue.Poll();
var usedOrigBuffer = PrepareToSend(null, messageWithType, messageBuffer);
_queue.Commit();
if (!usedOrigBuffer)
{
ReleaseMessageIfNeeded(messageWithType);
}
}
private void SaveMessageInOutLog(ByteBuffer messageBuffer, int offset, int length)
{
_outgoingLog.AppendMessage(messageBuffer.GetByteArray(), offset, length);
}
private void SaveMessageInOutLog(byte[] buffer, int offset, int length)
{
_outgoingLog.AppendMessage(buffer, offset, length);
}
private void SafeWait(int seconds)
{
SafeWaitMilis(seconds * Second);
}
private void SafeWaitMilis(int seconds)
{
try
{
Monitor.Wait(_queue, TimeSpan.FromMilliseconds(seconds));
}
catch (ThreadInterruptedException)
{
// ignore
}
}
private void SafeWaitMilis(long seconds)
{
try
{
Monitor.Wait(_queue, TimeSpan.FromMilliseconds(seconds));
}
catch (ThreadInterruptedException)
{
// ignore
}
}
/// <summary>
/// Shutdown the pumper.
/// <p/>
/// This method calls engine before the session is close.
/// This is blocked method.
/// If queue has the messages and if message rejecting is </summary>
/// enabled, the messages are rejecting, <seealso cref="IRejectMessageListener" />
/// This methods should be called from other thread
public override void Shutdown()
{
_shutdownFlag = true;
lock (_queue)
{
Monitor.PulseAll(_queue);
}
try
{
if (WorkerThread.IsAlive)
{
if (Thread.CurrentThread != WorkerThread)
{
WorkerThread.Join(Math.Max(Second, _shutdownTimeout * Second));
if (WorkerThread.IsAlive)
{
WorkerThread.Interrupt();
}
}
}
}
catch (ThreadInterruptedException)
{
// intentionally blank
}
CloseOutgoingLog();
Log.Debug("Pumper stopped");
if (_enableMessageRejecting)
{
Log.Debug("Pumper reject message");
RejectQueueMessages();
}
}
/// <summary>
/// Reject all non send message.
/// </summary>
public void RejectQueueMessages()
{
Log.Debug("Reject queue messages");
lock (_queue)
{
while (_queue.TotalSize > 0)
{
try
{
RejectMessage(_queue.Poll());
}
catch (Exception)
{
// ignore
}
finally
{
_queue.Commit();
}
}
}
}
public void RejectFirstQueueMessage()
{
Log.Debug("Reject first queue message");
lock (_queue)
{
if (_queue.TotalSize > 0)
{
try
{
RejectMessage(_queue.Poll());
}
catch (Exception)
{
// ignore
}
finally
{
_queue.Commit();
}
}
}
}
private void RejectMessage(FixMessageWithType messageWithType)
{
if (messageWithType.FixMessage != null && messageWithType.IsApplicationLevelMessage())
{
// process only application level messages
var message = PrepareMsgForReject(messageWithType);
if (Log.IsWarnEnabled)
{
Log.Warn("Reject message: " + message);
}
_fixSession.RejectMessageListener.OnRejectMessage(message);
}
else
{
if (Log.IsDebugEnabled)
{
Log.Debug("Skip session message: " + messageWithType.ToString());
}
}
}
private FixMessage PrepareMsgForReject(FixMessageWithType messageWithType)
{
var message = messageWithType.FixMessage;
var msgType = messageWithType.MessageType;
if (!string.IsNullOrEmpty(msgType))
{
var tagIndex = message.GetTagIndex(Tags.MsgType);
if (tagIndex != FixMessage.NotFound)
{
message.SetAtIndex(tagIndex, msgType);
}
else
{
message.AddTagAtIndex(0, Tags.MsgType, msgType);
}
}
return message;
}
public bool SendOutOfTurn(string msgType, FixMessage content)
{
lock (_queue)
{
IsQueueTooBigForSessionMessages();
var added = _queue.AddOutOfTurn(FixMessageWithTypePoolFactory.GetFixMessageWithTypeFromPool(content, msgType));
if (added)
{
_queue.NotifyAllSession();
}
else
{
throw new MessageNotSentException("Message wasn't added to outgoing queue");
}
//message was queued
return false;
}
}
public int Send(string msgType, FixMessage content)
{
return Send(msgType, content, FixSessionSendingType.DefaultSendingOption);
}
/// <summary>
/// Pool the message to queue.
/// </summary>
/// <param name="msgType"> the message type </param>
/// <param name="content"> the message content </param>
public int Send(string msgType, FixMessage content, FixSessionSendingType optionMask)
{
return Send(msgType, content, null, optionMask);
}
private int Send(string msgType, FixMessage content, ChangesType? changesType, FixSessionSendingType optionMask)
{
try
{
lock (_queue)
{
var sync = (optionMask & FixSessionSendingType.SendSync) != 0;
var async = (optionMask & FixSessionSendingType.SendAsync) != 0;
// TBD! We can send messages only for connected session. Make check for session state lighter
if (optionMask != 0 && (!async || sync) && !HasAnyWorkQueued() && !_queue.OutOfTurnOnlyMode
&& SessionState.IsConnected(_fixSession.SessionState) && !_shutdownFlag)
{
_tmpMessageWithType.FixMessage = content;
_tmpMessageWithType.MessageType = msgType;
_tmpMessageWithType.ChangesType = changesType;
_tmpMsgBuf.Buffer = null;
//TODO: check how to buffer filling
var isUsingOriginalMsgBuffer = FillBuffer(_tmpMessageWithType, _tmpMsgBuf);
try
{
//TODO: and why we are passing byte[] instead of whole buffer
SendMessages(1, _tmpMsgBuf.Buffer);
}
catch (IOException ex)
{
if (Log.IsTraceEnabled)
{
Log.Debug("Sync send ERROR: " + ex.ToString(), ex);
}
else if (Log.IsDebugEnabled)
{
Log.Debug("Sync send ERROR: " + ex.ToString());
}
//there is problem with transport. Let's start pumper thread to do all "dirty" work
_queue.OutOfTurnOnlyMode = true;
Monitor.PulseAll(_queue);
_fixSession.ErrorHandler.OnError("", ex);
//throw ex;
}
if (isUsingOriginalMsgBuffer)
{
//can try to release the message after send is done
ReleaseMessageIfNeeded(content);
}
//message was sent directly only if queue is empty
return 0;
}
IsQueueTooBig();
//TODO - create pool for this
var fieldListWithType = changesType == null
? FixMessageWithTypePoolFactory.GetFixMessageWithTypeFromPool(content, msgType)
: FixMessageWithTypePoolFactory.GetFixMessageWithTypeFromPool(content, changesType);
var added = _queue.Add(fieldListWithType);
if (added)
{
_queue.NotifyAllApplication();
}
else
{
throw new MessageNotSentException("Message wasn't added to outgoing queue");
}
//message was queued
return _queue.TotalSize;
}
}
catch (IOException ex)
{
// solve deadlock:
// - TestRequestTask lock session first and then lock queue (during sending logon)
// - here we lock queue first and then we locked session during shutdown.
// For a moment here we will release queue lock here to avoid deadlock
ReportErrorAndShutdown(ex);
return _queue.TotalSize;
}
}
public bool FillBuffer(FixMessageWithType messageWithType, MsgBuf buf)
{
_messageBuffer.ResetBuffer();
_messageStart[0] = _messageBuffer.Offset;
var isUsingOriginalMsgBuffer = PrepareToSend(buf, messageWithType, _messageBuffer);
if (isUsingOriginalMsgBuffer)
{
_messageStart[0] = buf.Offset;
_messageEnd[0] = buf.Length;
}
else
{
ReleaseMessageIfNeeded(messageWithType);
_messageEnd[0] = _messageBuffer.Offset;
}
return isUsingOriginalMsgBuffer;
}
/// <summary>
/// Pool the message to queue.
/// </summary>
/// <param name="content"> the message content </param>
/// <param name="allowedChangesType"> the change type </param>
public int Send(FixMessage content, ChangesType? allowedChangesType)
{
return Send(null, content, allowedChangesType, FixSessionSendingType.DefaultSendingOption);
}
public int Send(FixMessage content, ChangesType? allowedChangesType, FixSessionSendingType optionMask)
{
return Send(null, content, allowedChangesType, optionMask);
}
/// <summary>
/// Checks the queue size. The method is blocking.
/// </summary>
private void IsQueueTooBig()
{
if (_queueThresholdSize != 0 && _queue.Size > _queueThresholdSize)
{
SafeWaitMilis(_waitOnSendMillis);
}
}
/// <summary>
/// Checks the queue size. The method is blocking.
/// </summary>
private void IsQueueTooBigForSessionMessages()
{
if (_queueThresholdSize != 0 && _queue.Size > (_queueThresholdSize + 1))
{
SafeWaitMilis(_waitOnSendMillis);
}
}
/// <summary>
/// Gets outgoing message storage.
/// </summary>
public IMessageStorage GetOutgoingMessageStorage()
{
return _outgoingLog;
}
/// <summary>
/// Sets gracefulShutdown flag.
/// </summary>
/// <value> the graceful shutdown flag </value>
public bool GracefulShutdown { get; set; }
}
}