IterationResult Session::process()

in src/tcp-proxy.cpp [612:786]


IterationResult Session::process()
{
    IterationResult retVal = IR::IDLE;

    //bool error = false;
    forn (iSide, 2) {
        if (chrwe[iSide][ER] && INVALID_SOCKET != sock[iSide]) {
            int er = socket_last_error();
            bool temperror = 0 != socket_error_is_temporary(er);
            err("%s: %s error in %s-side socket: %d", name.c_str(), temperror ? "Recoverable" : "Unrecoverable", sideNames[iSide], er);
            if (!temperror) {
                kill_connection(iSide, -9000);
            }
            retVal = IR::FAILURE;
        }
    }

    // TODO: recv/send flags
    
    // Read incoming data
    forn (iSide, 2) {
        if (chrwe[iSide][RD]) {
            Buffer &buffer = buf[iSide];
            if (HOST == iSide && isPreloadFinished && buffer.nBytesRead() < nBytesToPreload) {
                // Do not read any more data until all preloaded data is sent to client
                assert(buffer.nBytesWritten() >= nBytesToPreload);
                continue;
            }
            retVal = IR::SUCCESS;
            int result = recv(sock[iSide], buffer.writePtr(), (int)buffer.writeSize(), 0);
            if (result <= 0) {
                // Error or disconnected
                if (0 == result || !socket_error_is_temporary(socket_last_error())) {
                    kill_connection(iSide, result);
                }
            }
            else {
                uint64 t = time_ns();
                if (!buffer.maxSize()) {
                    tData1stReceivedFrom[iSide] = t;
                }
                tDataLastReceivedFrom[iSide] = t;
                buffer.commitWrite(result);
                //nBytesRecvFrom[iSide] += result;
            }
        }
    }
    
    forn(iSide, 2) {
        Buffer &buffer = buf[iSide];
        if (0 != buffer.nBytesUnread()) {
            if (sock_opened(iSide ^ 1)) {
                uint64 t = time_ns();
                if (HOST == iSide) {
                    if (!isPreloadFinished) {
                        if (buffer.nBytesWritten() < nBytesToPreload && sock_opened(iSide)) {
                            // Do not send any data to client if preload is not yet finished
                            continue;
                        }
                        isPreloadFinished = true;
                        tPreloadFinished = tData1stSentFrom[iSide] = t;
                    }
                    else {
                        if (!buffer.nBytesRead()) {
                            tData1stSentFrom[iSide] = t;
                        }
                    }
                }
                else {
                    // Client
                    if (0 == buffer.nBytesRead()) {
                        char * p = (char *)buffer.readPtr();
                        if (sock_opened(iSide) && 0 == memcmp(p, "POST /tb/xml HTTP", std::min((size_t)17, (size_t)buffer.nBytesWritten()))) {
                            uintptr match = 0;
                            static const char matchString[] = CRLFCRLF;
                            forn(i, buffer.nBytesWritten()) {
                                char c = p[i];
                                if (matchString[match++] != c) {
                                    match = 0;
                                }
                                else if (4 == match) {
                                    // Got complete header, analyse it!
                                    *buffer.writePtr() = '\0'; // terminate
                                    const char * cl;
                                    if (NULL == (cl = strstr(p, "Content-Length:"))) {
                                        goto not_select_request;
                                    }

                                    if (buffer.nBytesWritten() < (unsigned)i + strtoul(cl + 15, NULL, 10)) {
                                        continue;
                                    }

                                    // Got header and body

                                    if (NULL == strstr(p + i, "<select version")
                                        || NULL != strstr(p + i, "<live>true</live>")) {
                                        // Not preloading non-select requests and live select requests
                                        nBytesToPreload = 0;
                                    }
                                    else {
                                        iprintf("%s - Detected select request, will preload\n", name.c_str());
                                    }
                                    // Finally send to the recipient
                                    goto perform_send;
                                }
                            }
                            // HTTP request header is not yet finished (and connection not yet closed), so do not relay the data for now
                            continue;
                        }
                    not_select_request:
                        nBytesToPreload = 0;

                    perform_send:
                        if (!buffer.nBytesRead()) {
                            tData1stSentFrom[iSide] = t;
                        }
                    }
                }

                retVal = IR::SUCCESS;
                int result = send(sock[iSide ^ 1], buffer.readPtr(), (int)buffer.readSize(), 0);
                if (result <= 0) {
                    // Error or disconnected
                    if (!socket_error_is_temporary(socket_last_error())) {
                        kill_connection(iSide ^ 1, result);
                    }
                }
                else {
                    buffer.commitRead(result);
                    tDataLastSentFrom[iSide] = t;
                    //nBytesSentFrom[iSide] += result;
                }

                if (0 == buffer.nBytesUnread()) {
                    // TODO: check that when preloaded data is completely sent, the buffer read/write pointers are reset
                    buffer.reset();
                }
            }
            else {
                // If "send" connection is closed, discard all unsent data (but the buffer will remember the amount of that data)
                buffer.reset();
            }
        }
    }
        
    forn(iSide, 2) {
        // If this side is closed, opposite is opened, but we have nothing to send, close opposite
        if (!sock_opened(iSide) && sock_opened(iSide ^ 1) && 0 == buf[iSide].nBytesUnread()) {
            retVal = IR::SUCCESS;
            kill_connection(iSide ^ 1, -1);
        }
    }

    if (!sock_opened(HOST) && !sock_opened(CLIENT)) {
        uint64_t discarded[2] = { buf[CLIENT].nBytesUnreadTotal(), buf[HOST].nBytesUnreadTotal() };
        tEnd = time_ns();
        forn(iSide, 2) if (0 != discarded[iSide]) {
            err("%s discarded %llu bytes from %s due to broken connection", name.c_str(), (ulonglong)discarded[iSide], sideNames[iSide]);
        }

        if (0 == buf[CLIENT].maxSize() && 0 == buf[HOST].maxSize()) {
            iprintf("%s finished in %lfms, nothing sent or received\n", name.c_str(), 1E-6 * (tEnd - tStart));
        }
        else {
            iprintf("%s finished in %lfms,\n\t", name.c_str(), 1E-6 * (tEnd - tStart/*tData1stReceivedFrom[CLIENT]*/));
            print_buffer_stats(CLIENT);
            iprintf(",\n\t");
            print_buffer_stats(HOST);
            iprintf("\n");
        }
        return IR::FINISHED;
    }

    return retVal;
}