void TickLoaderImpl::finishInternal()

in src/dxapi/native/tickdb/http/tickloader_http.cpp [884:1020]


void TickLoaderImpl::finishInternal(bool calledFromMainThread)
{
    if (isWriting()) {
        try {
            clientIsActive_ = false;
            {
                // Set interruption state. This signals other thread that use this loader to stop
                interruption_ = Interruption::STOPPING;

                static_assert(LOADER_STOPPING_WAIT_MIN_SLEEP_MS > 1, "LOADER_STOPPING_WAIT_MIN_SLEEP_MS > 1");
                int sleep_ms = LOADER_STOPPING_WAIT_MIN_SLEEP_MS;
                int i = LOADER_STOPPING_WAIT_TIMEOUT_MS;

                // try to stop the client that may still be calling next/send/writeXX, lock-free way
                for (; i > 0; i -= sleep_ms) {
                    this_thread::sleep_for(chrono::milliseconds(sleep_ms));
                    if (!clientIsActive_) {
                        // If not writing message body and not active, assume that class is interrupted
                        // If "I'm finished" signal is set externally, assume it is safe to dispose
                        if (isWriterFinished() || !isWritingMessageBody())
                            break;

                        // Otherwise, wait and increase delay, since it is possible that the other thread is not getting a timeslice
                        sleep_ms = sleep_ms * 3 / 2;
                        if (sleep_ms > LOADER_STOPPING_WAIT_MAX_SLEEP_MS) {
                            sleep_ms = LOADER_STOPPING_WAIT_MAX_SLEEP_MS;
                        }
                    }
                    else {
                        // Still active. Reset flag and wait.
                        clientIsActive_ = false;
                    }
                }
            }

            if (clientIsActive_) {
                DBGLOG(LOGHDR ".finish(): ERR: Client still doing something, probably going to crash!", ID);
            }

            if (Interruption::STOPPING != interruption_) {
                DBGLOG(LOGHDR ".finish(): ERR: Interruption state has unexpected value: %s", ID, Interruption(interruption_).toString());
            }

            if (isWritingMessageBody()) {
                DBGLOG(LOGHDR ".finish(): WRN: Message is still open, will be cancelled, header and %lld bytes discarded", ID,
                    (longlong)(messageWriter_.dataPtr - messageStart_ + messageWriter_.headerSize()));
                cancelMessage();
                state_ = MESSAGE_START;
            }

            if (NULL == ioStream_) {
                DBGLOG(LOGHDR ".finish(): ERR: ioStream already == NULL!", ID);
            }

            bool waitResponse = false;

            auto ios = ioStream_;
            if (interruption_ <= Interruption::STOPPING) {
                if (NULL != ios && !ios->isOutputClosed()) {
                    DBGLOG_VERBOSE(LOGHDR ".finish(): finishing writing", ID);
                    endWriting();
                    flushOutput();
                    ios->closeOutput();
                    waitResponse = true;
                }
                else {
                    DBGLOG(LOGHDR ".finish(): WRN: can't finish writing - already closed. intr: %s", ID, Interruption(interruption_).toString());
                }

                state_ = FINISHED_WRITING;
            }
            else {
                state_ = FINISHED_WRITING;
                DBGLOG(LOGHDR ".finish(): writing aborted unexpectedly. intr: %s", ID, Interruption(interruption_).toString());
            }

            // If we succesfully got here, attempt clean disconnection
            DBGLOG_VERBOSE(LOGHDR ".finish(): Trying to remove itself from the pool. Wait for server response: %s", ID, waitResponse ? "true" : "false");
            bool success = manager_.remove(this, waitResponse);

            if (!success) {
                THROW_DBGLOG(LOGHDR ".finish(): Loader is unable to remove itself from LoaderManager", ID);
            }

#if VERBOSE_LOADER >= 1
            DBGLOG_VERBOSE(LOGHDR ": Closed. Messages: %llu Cancelled: %llu Bytes: %lld\n"
                , ID, (ulonglong)nMsgWritten_, (ulonglong)nMsgCancelled_, (longlong)(NULL != ioStream_ ? ioStream_->nBytesWritten() : -1));
#endif

            cleanup();
            if (Error::NO_ERROR != error_.code) {
                DBGLOG(LOGHDR ": ERR: Loader finished with %s: %s", ID, Error(error_.code).toString(), serverErrorText().c_str());
            }
        }

        catch (const IOStreamDisconnectException &) {
            state_ = FINISHED_WRITING;
            interruption_ = Interruption::SERVER_DISCONNECTED;
            DBGLOG(LOGHDR ".finish(): WARNING: disconnected from the server side", ID);
            cleanup();
            THROW_DBGLOG_EX(TickLoaderClosedException, LOGHDR ": Unexpected disconnection %s", ID, serverErrorText().c_str());
            return;
        }

        catch (const IOStreamException &e) {
            state_ = FINISHED_WRITING;
            error_.code = Error::IO_ERROR;
            interruption_ = Interruption::SERVER_DISCONNECTED;
            cleanup();
            THROW_DBGLOG_EX(TickLoaderClosedException, LOGHDR ".finish(): IO error: %s: %s", ID, e.what(), serverErrorText().c_str());
            return;
        }

        catch (const std::exception &e) {
            state_ = FINISHED_WRITING;
            error_.code = Error::ERROR;
            interruption_ = Interruption::ERROR;
            format_string(&error_.msg, "std::exception: %s", e.what());
            DBGLOG(LOGHDR ".finish(): %s", ID, error_.msg.c_str());
            cleanup();
            throw;
        }

        catch (...) {
            state_ = FINISHED_WRITING;
            error_.code = Error::ERROR;
            interruption_ = Interruption::ERROR;
            format_string(&error_.msg, "UNKNOWN_ERROR! %s", ID, serverErrorText().c_str());
            DBGLOG(LOGHDR ".finish(): %s", ID, error_.msg.c_str());
            cleanup();
            throw;
        }
    }
    else {
        cleanup(); // Just in case
    }
}