in FixAntenna/NetCore/FixEngine/Session/IoThreads/SyncMessagePumper.cs [189:279]
protected override void Run()
{
if (TraceEnabled)
{
Log.Trace("Start MPThread: " + _fixSession);
}
Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
try
{
var configuration = _sessionParameters.Configuration;
ApplyAffinity(configuration.GetPropertyAsInt(Config.SendCpuAffinity), configuration.GetPropertyAsInt(Config.CpuAffinity));
Thread.BeginThreadAffinity();
while (!_shutdownFlag || HasWorkQueued())
{
var bufferedMessageCount = 0;
var waitUntilReadyToSendChunk = false;
lock (_queue)
{
WaitWhileQueueIsEmptyUntilSendingOfHeartbeat();
if (HasDataChunkToTransfer())
{
waitUntilReadyToSendChunk = true;
}
}
if (waitUntilReadyToSendChunk)
{
_transport.WaitUntilReadyToWrite();
lock (_queue)
{
if (HasDataChunkToTransfer())
{
WriteToTransport(0);
continue;
}
}
}
lock (_queue)
{
//get not real queue size but requested count of messages to send
var queueSize = _queue.Size;
if (queueSize > 0)
{
if (TraceEnabled)
{
Log.Trace(_fixSession + " queue size: " + queueSize);
}
bufferedMessageCount = FillBuffer(queueSize);
}
else
{
EnqueueHeartbeatToSend();
}
if (bufferedMessageCount > 0)
{
SendMessages(bufferedMessageCount);
}
}
}
}
catch (Exception ex)
{
if (!GracefulShutdown)
{
ReportErrorAndShutdown(ex);
}
else
{
var error = "IOError in message pumper. Some messages have not been sent:" + _queue.TotalSize;
Log.Debug(error, ex);
}
}
finally
{
CloseOutgoingLog();
if (Log.IsTraceEnabled)
{
Log.Trace("Stop MPThread: " + _fixSession);
}
Thread.EndThreadAffinity();
}
}