in FixAntenna/NetCore/FixEngine/Session/IoThreads/SyncBlockingMessagePumper.cs [726:857]
private int Send(string msgType, FixMessage content, ChangesType? changesType, FixSessionSendingType optionMask)
{
try
{
bool isUsingOriginalMsgBuffer;
lock (_queue)
{
var sync = (optionMask & FixSessionSendingType.SendSync) != 0;
var async = (optionMask & FixSessionSendingType.SendAsync) != 0;
// TBD! We can send messages only for connected session. Make check for session state lighter
if (optionMask != 0 && (!async || sync) && !HasAnyWorkQueued() && !_queue.OutOfTurnOnlyMode && SessionState.IsConnected(_fixSession.SessionState) && !_shutdownFlag)
{
var allowBufferWriting = _msgBufferSemaphore.Wait(0);
if (allowBufferWriting)
{
try
{
_tmpMessageWithType.FixMessage = content;
_tmpMessageWithType.MessageType = msgType;
_tmpMessageWithType.ChangesType = changesType;
_tmpMsgBuf.Buffer = null;
isUsingOriginalMsgBuffer = FillBuffer(_tmpMessageWithType, _tmpMsgBuf);
}
catch (IOException)
{
AddToQueue(msgType, content, changesType);
throw;
}
finally
{
_msgBufferSemaphore.Release();
}
}
else
{
//buffer is locked by other process - add messages to queue
AddToQueue(msgType, content, changesType);
//message was queued
return _queue.TotalSize;
}
}
else
{
AddToQueue(msgType, content, changesType);
//message was queued
return _queue.TotalSize;
}
}
try
{
try
{
_msgBufferSemaphore.Wait();
//TODO: and wy we are passing byte[] instead of whole buffer
SendMessages(1, _tmpMsgBuf.Buffer);
}
catch (ThreadInterruptedException e)
{
if (Log.IsTraceEnabled)
{
Log.Debug("Sync send ERROR: " + e.ToString(), e);
}
else if (Log.IsDebugEnabled)
{
Log.Debug("Sync send ERROR: " + e.ToString());
}
}
finally
{
_msgBufferSemaphore.Release();
//sending with queue may be postponed by semaphore
lock (_queue)
{
if (!_queue.IsEmpty)
{
Monitor.PulseAll(_queue);
}
}
}
}
catch (IOException ex)
{
if (Log.IsTraceEnabled)
{
Log.Debug("Sync send ERROR: " + ex.ToString(), ex);
}
else if (Log.IsDebugEnabled)
{
Log.Debug("Sync send ERROR: " + ex.ToString());
}
lock (_queue)
{
//there is problem with transport. Let's start pumper thread to do all "dirty" work
_queue.OutOfTurnOnlyMode = true;
Monitor.PulseAll(_queue);
}
//throw ex;
}
if (isUsingOriginalMsgBuffer)
{
//can try to release the message after send is done
ReleaseMessageIfNeeded(content);
}
//message was sent directly only if queue is empty
return 0;
}
catch (IOException ex)
{
// solve deadlock:
// - TestRequestTask lock session first and then lock queue (during sending logon)
// - here we lock queue first and then we locked session during shutdown.
// For a moment here we will release queue lock here to avoid deadlock
ReportErrorAndShutdown(ex);
}
catch (Exception ex)
{
// close session if there is a problem with sending data
ReportErrorAndShutdown(ex);
throw;
}
return _queue.TotalSize;
}