protected override void Run()

in FixAntenna/NetCore/FixEngine/Session/IoThreads/AsyncMessagePumper.cs [166:238]


		protected override void Run()
		{
			if (TraceEnabled)
			{
				Log.Trace("Start MPThread: " + _fixSession);
			}

			Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;

			try
			{
				var configuration = _sessionParameters.Configuration;

				ApplyAffinity(configuration.GetPropertyAsInt(Config.SendCpuAffinity), configuration.GetPropertyAsInt(Config.CpuAffinity));
				Thread.BeginThreadAffinity();

				while (!_shutdownFlag || HasWorkQueued)
				{
					if (HasDataChunkToTransfer())
					{
						_transport.WaitUntilReadyToWrite();
						WriteToTransport(0);
						continue;
					}

					var bufferedMessageCount = 0;
					lock (_queue)
					{
						if (_queue.IsEmpty && !_shutdownFlag)
						{
							Monitor.PulseAll(_queue);
							SafeWait(_hbtSeconds);
						}

						//get not real queue size but requested count of messages to send
						var queueSize = _queue.Size;
						if (queueSize > 0)
						{
							if (TraceEnabled)
							{
								Log.Trace(_fixSession + " queue size: " + queueSize + " turnOff=" + _queue.OutOfTurnOnlyMode);
							}

							bufferedMessageCount = FillBuffer(queueSize);
						}
						else
						{
							EnqueueHeartbeatToSend();
						}
					}
					if (bufferedMessageCount > 0)
					{
						SendMessages(bufferedMessageCount);
					}
				}
			}
			catch (Exception ex)
			{
				if (!GracefulShutdown)
				{
					ReportErrorAndShutdown(ex);
				}
			}
			finally
			{
				CloseOutgoingLog();
				if (Log.IsTraceEnabled)
				{
					Log.Trace("Stop MPThread: " + _fixSession);
				}
				Thread.EndThreadAffinity();
			}
		}