protected override void Run()

in FixAntenna/NetCore/FixEngine/Session/IoThreads/SyncMessagePumper.cs [189:279]


		protected override void Run()
		{
			if (TraceEnabled)
			{
				Log.Trace("Start MPThread: " + _fixSession);
			}

			Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;

			try
			{
				var configuration = _sessionParameters.Configuration;

				ApplyAffinity(configuration.GetPropertyAsInt(Config.SendCpuAffinity), configuration.GetPropertyAsInt(Config.CpuAffinity));
				Thread.BeginThreadAffinity();

				while (!_shutdownFlag || HasWorkQueued())
				{
					var bufferedMessageCount = 0;
					var waitUntilReadyToSendChunk = false;

					lock (_queue)
					{
						WaitWhileQueueIsEmptyUntilSendingOfHeartbeat();

						if (HasDataChunkToTransfer())
						{
							waitUntilReadyToSendChunk = true;
						}
					}

					if (waitUntilReadyToSendChunk)
					{
						_transport.WaitUntilReadyToWrite();
						lock (_queue)
						{
							if (HasDataChunkToTransfer())
							{
								WriteToTransport(0);
								continue;
							}
						}
					}

					lock (_queue)
					{
						//get not real queue size but requested count of messages to send
						var queueSize = _queue.Size;
						if (queueSize > 0)
						{
							if (TraceEnabled)
							{
								Log.Trace(_fixSession + " queue size: " + queueSize);
							}

							bufferedMessageCount = FillBuffer(queueSize);
						}
						else
						{
							EnqueueHeartbeatToSend();
						}

						if (bufferedMessageCount > 0)
						{
							SendMessages(bufferedMessageCount);
						}
					}
				}
			}
			catch (Exception ex)
			{
				if (!GracefulShutdown)
				{
					ReportErrorAndShutdown(ex);
				}
				else
				{
					var error = "IOError in message pumper. Some messages have not been sent:" + _queue.TotalSize;
					Log.Debug(error, ex);
				}
			}
			finally
			{
				CloseOutgoingLog();
				if (Log.IsTraceEnabled)
				{
					Log.Trace("Stop MPThread: " + _fixSession);
				}
				Thread.EndThreadAffinity();
			}
		}