legacy/src/server/slimrds/data/process.c (133 lines of code) (raw):
#include "process.h"
#include "protocol/data/resp_include.h"
#include <buffer/cc_dbuf.h>
#include <cc_debug.h>
#include <cc_print.h>
#define SLIMRDS_PROCESS_MODULE_NAME "slimrds::process"
#define OVERSIZE_ERR_MSG "oversized value, cannot be stored"
#define OOM_ERR_MSG "server is out of memory"
#define CMD_ERR_MSG "command not supported"
#define OTHER_ERR_MSG "unknown server error"
typedef void (* command_fn)(struct response *, struct request *, struct command *cmd);
static command_fn command_registry[REQ_SENTINEL];
static bool process_init = false;
process_metrics_st *process_metrics = NULL;
void
process_setup(process_options_st *options, process_metrics_st *metrics)
{
log_info("set up the %s module", SLIMRDS_PROCESS_MODULE_NAME);
if (process_init) {
log_warn("%s has already been setup, overwrite",
SLIMRDS_PROCESS_MODULE_NAME);
}
process_metrics = metrics;
if (options != NULL) {
allow_flush = option_bool(&options->allow_flush);
}
command_registry[REQ_PING] = cmd_ping;
command_registry[REQ_BITMAP_DELETE] = cmd_bitmap_delete;
command_registry[REQ_BITMAP_CREATE] = cmd_bitmap_create;
command_registry[REQ_BITMAP_SET] = cmd_bitmap_set;
command_registry[REQ_BITMAP_GET] = cmd_bitmap_get;
process_init = true;
}
void
process_teardown(void)
{
log_info("tear down the %s module", SLIMRDS_PROCESS_MODULE_NAME);
if (!process_init) {
log_warn("%s has never been setup", SLIMRDS_PROCESS_MODULE_NAME);
}
command_registry[REQ_PING] = cmd_ping;
allow_flush = ALLOW_FLUSH;
process_metrics = NULL;
process_init = false;
}
void
process_request(struct response *rsp, struct request *req)
{
struct command cmd;
command_fn func = command_registry[req->type];
log_verb("processing req %p, write rsp to %p", req, rsp);
INCR(process_metrics, process_req);
if (func == NULL) {
struct element *reply = (struct element *)array_push(rsp->token);
log_warn("command is recognized but not implemented");
rsp->type = reply->type = ELEM_ERR;
reply->bstr = str2bstr(RSP_ERR_NOSUPPORT);
INCR(process_metrics, process_ex);
return;
}
cmd = command_table[req->type];
cmd.nopt = ((struct element *)array_first(req->token))->num - cmd.narg;
log_verb("processing command '%.*s' with %d optional arguments",
cmd.bstr.len, cmd.bstr.data, cmd.nopt);
func(rsp, req, &cmd);
}
int
slimrds_process_read(struct buf **rbuf, struct buf **wbuf, void **data)
{
parse_rstatus_e status;
struct request *req; /* data should be NULL or hold a req pointer */
struct response *rsp;
req = request_borrow();
rsp = response_borrow();
if (req == NULL || rsp == NULL) {
goto error;
}
/* keep parse-process-compose until running out of data in rbuf */
while (buf_rsize(*rbuf) > 0) {
request_reset(req);
response_reset(rsp);
/* stage 1: parsing */
log_verb("%"PRIu32" bytes left", buf_rsize(*rbuf));
status = parse_req(req, *rbuf);
if (status == PARSE_EUNFIN) {
buf_lshift(*rbuf);
goto done;
}
if (status != PARSE_OK) {
/* parsing errors are all client errors, since we don't know
* how to recover from client errors in this condition (we do not
* have a valid request so we don't know where the invalid request
* ends), we should close the connection
*/
log_warn("illegal request received, status: %d", status);
INCR(process_metrics, process_ex);
INCR(process_metrics, process_client_ex);
goto error;
}
/* stage 2: processing- check for quit, allocate response(s), process */
/* quit is special, no response expected */
if (req->type == REQ_QUIT) {
log_info("peer called quit");
goto error;
}
/* actual processing */
process_request(rsp, req);
/* stage 3: write response(s) if necessary */
/* noreply means no need to write to buffers */
if (compose_rsp(wbuf, rsp) < 0) {
log_error("composing rsp erred");
INCR(process_metrics, process_ex);
INCR(process_metrics, process_server_ex);
goto error;
}
/* logging, clean-up */
}
done:
request_return(&req);
response_return(&rsp);
return 0;
error:
request_return(&req);
response_return(&rsp);
return -1;
}
int
slimrds_process_write(struct buf **rbuf, struct buf **wbuf, void **data)
{
log_verb("post-write processing");
buf_lshift(*rbuf);
dbuf_shrink(rbuf);
buf_lshift(*wbuf);
dbuf_shrink(wbuf);
return 0;
}
int
slimrds_process_error(struct buf **rbuf, struct buf **wbuf, void **data)
{
log_verb("post-error processing");
/* normalize buffer size */
buf_reset(*rbuf);
dbuf_shrink(rbuf);
buf_reset(*wbuf);
dbuf_shrink(wbuf);
return 0;
}