in src/dxapi/native/tickdb/http/tickloader_http.cpp [884:1020]
void TickLoaderImpl::finishInternal(bool calledFromMainThread)
{
if (isWriting()) {
try {
clientIsActive_ = false;
{
// Set interruption state. This signals other thread that use this loader to stop
interruption_ = Interruption::STOPPING;
static_assert(LOADER_STOPPING_WAIT_MIN_SLEEP_MS > 1, "LOADER_STOPPING_WAIT_MIN_SLEEP_MS > 1");
int sleep_ms = LOADER_STOPPING_WAIT_MIN_SLEEP_MS;
int i = LOADER_STOPPING_WAIT_TIMEOUT_MS;
// try to stop the client that may still be calling next/send/writeXX, lock-free way
for (; i > 0; i -= sleep_ms) {
this_thread::sleep_for(chrono::milliseconds(sleep_ms));
if (!clientIsActive_) {
// If not writing message body and not active, assume that class is interrupted
// If "I'm finished" signal is set externally, assume it is safe to dispose
if (isWriterFinished() || !isWritingMessageBody())
break;
// Otherwise, wait and increase delay, since it is possible that the other thread is not getting a timeslice
sleep_ms = sleep_ms * 3 / 2;
if (sleep_ms > LOADER_STOPPING_WAIT_MAX_SLEEP_MS) {
sleep_ms = LOADER_STOPPING_WAIT_MAX_SLEEP_MS;
}
}
else {
// Still active. Reset flag and wait.
clientIsActive_ = false;
}
}
}
if (clientIsActive_) {
DBGLOG(LOGHDR ".finish(): ERR: Client still doing something, probably going to crash!", ID);
}
if (Interruption::STOPPING != interruption_) {
DBGLOG(LOGHDR ".finish(): ERR: Interruption state has unexpected value: %s", ID, Interruption(interruption_).toString());
}
if (isWritingMessageBody()) {
DBGLOG(LOGHDR ".finish(): WRN: Message is still open, will be cancelled, header and %lld bytes discarded", ID,
(longlong)(messageWriter_.dataPtr - messageStart_ + messageWriter_.headerSize()));
cancelMessage();
state_ = MESSAGE_START;
}
if (NULL == ioStream_) {
DBGLOG(LOGHDR ".finish(): ERR: ioStream already == NULL!", ID);
}
bool waitResponse = false;
auto ios = ioStream_;
if (interruption_ <= Interruption::STOPPING) {
if (NULL != ios && !ios->isOutputClosed()) {
DBGLOG_VERBOSE(LOGHDR ".finish(): finishing writing", ID);
endWriting();
flushOutput();
ios->closeOutput();
waitResponse = true;
}
else {
DBGLOG(LOGHDR ".finish(): WRN: can't finish writing - already closed. intr: %s", ID, Interruption(interruption_).toString());
}
state_ = FINISHED_WRITING;
}
else {
state_ = FINISHED_WRITING;
DBGLOG(LOGHDR ".finish(): writing aborted unexpectedly. intr: %s", ID, Interruption(interruption_).toString());
}
// If we succesfully got here, attempt clean disconnection
DBGLOG_VERBOSE(LOGHDR ".finish(): Trying to remove itself from the pool. Wait for server response: %s", ID, waitResponse ? "true" : "false");
bool success = manager_.remove(this, waitResponse);
if (!success) {
THROW_DBGLOG(LOGHDR ".finish(): Loader is unable to remove itself from LoaderManager", ID);
}
#if VERBOSE_LOADER >= 1
DBGLOG_VERBOSE(LOGHDR ": Closed. Messages: %llu Cancelled: %llu Bytes: %lld\n"
, ID, (ulonglong)nMsgWritten_, (ulonglong)nMsgCancelled_, (longlong)(NULL != ioStream_ ? ioStream_->nBytesWritten() : -1));
#endif
cleanup();
if (Error::NO_ERROR != error_.code) {
DBGLOG(LOGHDR ": ERR: Loader finished with %s: %s", ID, Error(error_.code).toString(), serverErrorText().c_str());
}
}
catch (const IOStreamDisconnectException &) {
state_ = FINISHED_WRITING;
interruption_ = Interruption::SERVER_DISCONNECTED;
DBGLOG(LOGHDR ".finish(): WARNING: disconnected from the server side", ID);
cleanup();
THROW_DBGLOG_EX(TickLoaderClosedException, LOGHDR ": Unexpected disconnection %s", ID, serverErrorText().c_str());
return;
}
catch (const IOStreamException &e) {
state_ = FINISHED_WRITING;
error_.code = Error::IO_ERROR;
interruption_ = Interruption::SERVER_DISCONNECTED;
cleanup();
THROW_DBGLOG_EX(TickLoaderClosedException, LOGHDR ".finish(): IO error: %s: %s", ID, e.what(), serverErrorText().c_str());
return;
}
catch (const std::exception &e) {
state_ = FINISHED_WRITING;
error_.code = Error::ERROR;
interruption_ = Interruption::ERROR;
format_string(&error_.msg, "std::exception: %s", e.what());
DBGLOG(LOGHDR ".finish(): %s", ID, error_.msg.c_str());
cleanup();
throw;
}
catch (...) {
state_ = FINISHED_WRITING;
error_.code = Error::ERROR;
interruption_ = Interruption::ERROR;
format_string(&error_.msg, "UNKNOWN_ERROR! %s", ID, serverErrorText().c_str());
DBGLOG(LOGHDR ".finish(): %s", ID, error_.msg.c_str());
cleanup();
throw;
}
}
else {
cleanup(); // Just in case
}
}