in src/tcp-proxy.cpp [612:786]
IterationResult Session::process()
{
IterationResult retVal = IR::IDLE;
//bool error = false;
forn (iSide, 2) {
if (chrwe[iSide][ER] && INVALID_SOCKET != sock[iSide]) {
int er = socket_last_error();
bool temperror = 0 != socket_error_is_temporary(er);
err("%s: %s error in %s-side socket: %d", name.c_str(), temperror ? "Recoverable" : "Unrecoverable", sideNames[iSide], er);
if (!temperror) {
kill_connection(iSide, -9000);
}
retVal = IR::FAILURE;
}
}
// TODO: recv/send flags
// Read incoming data
forn (iSide, 2) {
if (chrwe[iSide][RD]) {
Buffer &buffer = buf[iSide];
if (HOST == iSide && isPreloadFinished && buffer.nBytesRead() < nBytesToPreload) {
// Do not read any more data until all preloaded data is sent to client
assert(buffer.nBytesWritten() >= nBytesToPreload);
continue;
}
retVal = IR::SUCCESS;
int result = recv(sock[iSide], buffer.writePtr(), (int)buffer.writeSize(), 0);
if (result <= 0) {
// Error or disconnected
if (0 == result || !socket_error_is_temporary(socket_last_error())) {
kill_connection(iSide, result);
}
}
else {
uint64 t = time_ns();
if (!buffer.maxSize()) {
tData1stReceivedFrom[iSide] = t;
}
tDataLastReceivedFrom[iSide] = t;
buffer.commitWrite(result);
//nBytesRecvFrom[iSide] += result;
}
}
}
forn(iSide, 2) {
Buffer &buffer = buf[iSide];
if (0 != buffer.nBytesUnread()) {
if (sock_opened(iSide ^ 1)) {
uint64 t = time_ns();
if (HOST == iSide) {
if (!isPreloadFinished) {
if (buffer.nBytesWritten() < nBytesToPreload && sock_opened(iSide)) {
// Do not send any data to client if preload is not yet finished
continue;
}
isPreloadFinished = true;
tPreloadFinished = tData1stSentFrom[iSide] = t;
}
else {
if (!buffer.nBytesRead()) {
tData1stSentFrom[iSide] = t;
}
}
}
else {
// Client
if (0 == buffer.nBytesRead()) {
char * p = (char *)buffer.readPtr();
if (sock_opened(iSide) && 0 == memcmp(p, "POST /tb/xml HTTP", std::min((size_t)17, (size_t)buffer.nBytesWritten()))) {
uintptr match = 0;
static const char matchString[] = CRLFCRLF;
forn(i, buffer.nBytesWritten()) {
char c = p[i];
if (matchString[match++] != c) {
match = 0;
}
else if (4 == match) {
// Got complete header, analyse it!
*buffer.writePtr() = '\0'; // terminate
const char * cl;
if (NULL == (cl = strstr(p, "Content-Length:"))) {
goto not_select_request;
}
if (buffer.nBytesWritten() < (unsigned)i + strtoul(cl + 15, NULL, 10)) {
continue;
}
// Got header and body
if (NULL == strstr(p + i, "<select version")
|| NULL != strstr(p + i, "<live>true</live>")) {
// Not preloading non-select requests and live select requests
nBytesToPreload = 0;
}
else {
iprintf("%s - Detected select request, will preload\n", name.c_str());
}
// Finally send to the recipient
goto perform_send;
}
}
// HTTP request header is not yet finished (and connection not yet closed), so do not relay the data for now
continue;
}
not_select_request:
nBytesToPreload = 0;
perform_send:
if (!buffer.nBytesRead()) {
tData1stSentFrom[iSide] = t;
}
}
}
retVal = IR::SUCCESS;
int result = send(sock[iSide ^ 1], buffer.readPtr(), (int)buffer.readSize(), 0);
if (result <= 0) {
// Error or disconnected
if (!socket_error_is_temporary(socket_last_error())) {
kill_connection(iSide ^ 1, result);
}
}
else {
buffer.commitRead(result);
tDataLastSentFrom[iSide] = t;
//nBytesSentFrom[iSide] += result;
}
if (0 == buffer.nBytesUnread()) {
// TODO: check that when preloaded data is completely sent, the buffer read/write pointers are reset
buffer.reset();
}
}
else {
// If "send" connection is closed, discard all unsent data (but the buffer will remember the amount of that data)
buffer.reset();
}
}
}
forn(iSide, 2) {
// If this side is closed, opposite is opened, but we have nothing to send, close opposite
if (!sock_opened(iSide) && sock_opened(iSide ^ 1) && 0 == buf[iSide].nBytesUnread()) {
retVal = IR::SUCCESS;
kill_connection(iSide ^ 1, -1);
}
}
if (!sock_opened(HOST) && !sock_opened(CLIENT)) {
uint64_t discarded[2] = { buf[CLIENT].nBytesUnreadTotal(), buf[HOST].nBytesUnreadTotal() };
tEnd = time_ns();
forn(iSide, 2) if (0 != discarded[iSide]) {
err("%s discarded %llu bytes from %s due to broken connection", name.c_str(), (ulonglong)discarded[iSide], sideNames[iSide]);
}
if (0 == buf[CLIENT].maxSize() && 0 == buf[HOST].maxSize()) {
iprintf("%s finished in %lfms, nothing sent or received\n", name.c_str(), 1E-6 * (tEnd - tStart));
}
else {
iprintf("%s finished in %lfms,\n\t", name.c_str(), 1E-6 * (tEnd - tStart/*tData1stReceivedFrom[CLIENT]*/));
print_buffer_stats(CLIENT);
iprintf(",\n\t");
print_buffer_stats(HOST);
iprintf("\n");
}
return IR::FINISHED;
}
return retVal;
}