in FixAntenna/NetCore/FixEngine/Session/IoThreads/AsyncMessagePumper.cs [166:238]
protected override void Run()
{
if (TraceEnabled)
{
Log.Trace("Start MPThread: " + _fixSession);
}
Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
try
{
var configuration = _sessionParameters.Configuration;
ApplyAffinity(configuration.GetPropertyAsInt(Config.SendCpuAffinity), configuration.GetPropertyAsInt(Config.CpuAffinity));
Thread.BeginThreadAffinity();
while (!_shutdownFlag || HasWorkQueued)
{
if (HasDataChunkToTransfer())
{
_transport.WaitUntilReadyToWrite();
WriteToTransport(0);
continue;
}
var bufferedMessageCount = 0;
lock (_queue)
{
if (_queue.IsEmpty && !_shutdownFlag)
{
Monitor.PulseAll(_queue);
SafeWait(_hbtSeconds);
}
//get not real queue size but requested count of messages to send
var queueSize = _queue.Size;
if (queueSize > 0)
{
if (TraceEnabled)
{
Log.Trace(_fixSession + " queue size: " + queueSize + " turnOff=" + _queue.OutOfTurnOnlyMode);
}
bufferedMessageCount = FillBuffer(queueSize);
}
else
{
EnqueueHeartbeatToSend();
}
}
if (bufferedMessageCount > 0)
{
SendMessages(bufferedMessageCount);
}
}
}
catch (Exception ex)
{
if (!GracefulShutdown)
{
ReportErrorAndShutdown(ex);
}
}
finally
{
CloseOutgoingLog();
if (Log.IsTraceEnabled)
{
Log.Trace("Stop MPThread: " + _fixSession);
}
Thread.EndThreadAffinity();
}
}