in analyzers/src/json/abstract_tcp_service.cpp [138:192]
void AbstractTcpService::runListener()
{
while(_isRunning.load())
{
// Accepting incoming connection on socket
struct timespec acceptDuration;
fillDuration(acceptDuration);
fd_set readDescriptorsSet;
FD_ZERO(&readDescriptorsSet);
FD_SET(_serverSocket, &readDescriptorsSet);
int descriptorsCount = pselect(_serverSocket + 1, &readDescriptorsSet, NULL, NULL, &acceptDuration, NULL);
if(descriptorsCount == 0)
{
// Timeout expired
continue;
}
else if(descriptorsCount < 0)
{
std::system_error e{errno, std::system_category(), "Awaiting for incoming connection on server socket error"};
LOG("ERROR: %s", e.what());
#ifdef __gnu_linux__
// Several first pselect(2) calls cause "Interrupted system call" error (errno == EINTR)
// if drop privileges option is used on Linux (see https://access.redhat.com/solutions/165483)
if(errno == EINTR)
{
continue;
}
#endif
throw e;
}
// Extracting and returning pending connection
int pendingSocketDescriptor = accept(_serverSocket, NULL, NULL);
if(pendingSocketDescriptor < 0)
{
std::system_error e{errno, std::system_category(), "Accepting incoming connection on server socket error"};
LOG("ERROR: %s", e.what());
throw e;
}
// Create and enqueue task
std::unique_ptr<AbstractTask> newTask{createTask(pendingSocketDescriptor)};
{
std::unique_lock<std::mutex> lock(_tasksQueueMutex);
if(_tasksQueue.size() < MaxTasksQueueSize)
{
_tasksQueue.push(newTask.get());
newTask.release();
_tasksQueueCond.notify_one();
}
else
{
LOG("ERROR: TCP-service tasks queue overload has been detected")
}
}
}
}