legacy/src/server/twemcache/data/process.c (692 lines of code) (raw):

#include "process.h" #include "hotkey/hotkey.h" #include "protocol/data/memcache_include.h" #include "storage/slab/slab.h" #include <cc_array.h> #include <cc_debug.h> #include <cc_print.h> #include <time/cc_timer.h> #define TWEMCACHE_PROCESS_MODULE_NAME "twemcache::process" #define OVERSIZE_ERR_MSG "oversized value, cannot be stored" #define DELTA_ERR_MSG "value is not a number" #define OOM_ERR_MSG "server is out of memory" #define CMD_ERR_MSG "command not supported" #define OTHER_ERR_MSG "unknown server error" typedef enum put_rstatus { PUT_OK, PUT_PARTIAL, PUT_ERROR, } put_rstatus_e; static bool process_init = false; static process_metrics_st *process_metrics = NULL; static bool allow_flush = ALLOW_FLUSH; static bool prefill = PREFILL; static uint32_t prefill_ksize; static char prefill_kbuf[UINT8_MAX]; /* slab implementation has klen as unint8_t */ static uint32_t prefill_vsize; /* val_buf size is arbitrary , update if want to warm up with larger objects */ static char prefill_vbuf[ITEM_SIZE_MAX]; static uint64_t prefill_nkey; static void _prefill_slab(void) { struct duration d; struct bstring key, val; item_rstatus_e istatus; struct item *it; duration_reset(&d); key.len = prefill_ksize; key.data = prefill_kbuf; val.len = prefill_vsize; val.data = prefill_vbuf; duration_start(&d); for (uint32_t i = 0; i < prefill_nkey; ++i) { /* print fixed-length key with leading 0's for padding */ cc_snprintf(&prefill_kbuf, key.len + 1, "%.*d", key.len, i); /* fill val, use the same value as key for now */ cc_snprintf(&prefill_vbuf, val.len + 1, "%.*d", val.len, i); /* insert into slab/heap */ istatus = item_reserve(&it, &key, &val, val.len, DATAFLAG_SIZE, time_convert_proc_sec((time_i)INT32_MAX)); ASSERT(istatus == ITEM_OK); item_insert(it, &key); } duration_stop(&d); log_info("prefilling slab with %"PRIu64" keys, of key len %"PRIu32" & val " "len %"PRIu32", in %.3f seconds", prefill_nkey, prefill_ksize, prefill_vsize, duration_sec(&d)); } void process_setup(process_options_st *options, process_metrics_st *metrics) { log_info("set up the %s module", TWEMCACHE_PROCESS_MODULE_NAME); if (process_init) { log_warn("%s has already been setup, overwrite", TWEMCACHE_PROCESS_MODULE_NAME); } process_metrics = metrics; if (options != NULL) { allow_flush = option_bool(&options->allow_flush); prefill = option_bool(&options->prefill); prefill_ksize = (uint32_t)option_uint(&options->prefill_ksize); prefill_vsize = (uint32_t)option_uint(&options->prefill_vsize); prefill_nkey = (uint64_t)option_uint(&options->prefill_nkey); } if (prefill) { _prefill_slab(); } process_init = true; } void process_teardown(void) { log_info("tear down the %s module", TWEMCACHE_PROCESS_MODULE_NAME); if (!process_init) { log_warn("%s has never been setup", TWEMCACHE_PROCESS_MODULE_NAME); } allow_flush = false; process_metrics = NULL; process_init = false; } static inline uint32_t _get_dataflag(struct item *it) { return *((uint32_t *)item_optional(it)); } static inline void _set_dataflag(struct item *it, uint32_t flag) { *((uint32_t *)item_optional(it)) = flag; } static bool _get_key(struct response *rsp, struct bstring *key) { struct item *it; it = item_get(key); if (it != NULL) { rsp->type = RSP_VALUE; rsp->key = *key; rsp->flag = _get_dataflag(it); rsp->vcas = item_get_cas(it); rsp->vstr.len = it->vlen; rsp->vstr.data = item_data(it); if (hotkey_enabled && hotkey_sample(key)) { log_debug("hotkey detected: %.*s", key->len, key->data); } log_verb("found key at %p, location %p", key, it); return true; } else { log_verb("key at %p not found", key); return false; } } static void _process_get(struct response *rsp, struct request *req) { struct bstring *key; struct response *r = rsp; uint32_t i; INCR(process_metrics, get); /* use chained responses, move to the next response if key is found. */ for (i = 0; i < array_nelem(req->keys); ++i) { INCR(process_metrics, get_key); key = array_get(req->keys, i); if (_get_key(r, key)) { req->nfound++; r->cas = false; r = STAILQ_NEXT(r, next); if (r == NULL) { INCR(process_metrics, get_ex); log_warn("get response incomplete due to lack of rsp objects"); return; } INCR(process_metrics, get_key_hit); } else { INCR(process_metrics, get_key_miss); } } r->type = RSP_END; log_verb("get req %p processed, %d out of %d keys found", req, req->nfound, i); } static void _process_gets(struct response *rsp, struct request *req) { struct bstring *key; struct response *r = rsp; uint32_t i; INCR(process_metrics, gets); /* use chained responses, move to the next response if key is found. */ for (i = 0; i < array_nelem(req->keys); ++i) { INCR(process_metrics, gets_key); key = array_get(req->keys, i); if (_get_key(r, key)) { r->cas = true; r = STAILQ_NEXT(r, next); if (r == NULL) { INCR(process_metrics, gets_ex); log_warn("gets response incomplete due to lack of rsp objects"); } req->nfound++; INCR(process_metrics, gets_key_hit); } else { INCR(process_metrics, gets_key_miss); } } r->type = RSP_END; log_verb("gets req %p processed, %d out of %d keys found", req, req->nfound, i); } static void _process_delete(struct response *rsp, struct request *req) { INCR(process_metrics, delete); if (item_delete(array_first(req->keys))) { rsp->type = RSP_DELETED; INCR(process_metrics, delete_deleted); } else { rsp->type = RSP_NOT_FOUND; INCR(process_metrics, delete_notfound); } log_verb("delete req %p processed, rsp type %d", req, rsp->type); } static void _error_rsp(struct response *rsp, item_rstatus_e status) { INCR(process_metrics, process_ex); if (status == ITEM_EOVERSIZED) { rsp->type = RSP_CLIENT_ERROR; rsp->vstr = str2bstr(OVERSIZE_ERR_MSG); } else if (status == ITEM_ENAN) { rsp->type = RSP_CLIENT_ERROR; rsp->vstr = str2bstr(DELTA_ERR_MSG); } else if (status == ITEM_ENOMEM) { rsp->type = RSP_SERVER_ERROR; rsp->vstr = str2bstr(OOM_ERR_MSG); INCR(process_metrics, process_server_ex); } else { NOT_REACHED(); rsp->type = RSP_SERVER_ERROR; rsp->vstr = str2bstr(OTHER_ERR_MSG); INCR(process_metrics, process_server_ex); } } /* * for the first segment three return values are possible: * - PUT_OK * - PUT_PARTIAL * - PUT_ERROR (error code given in *istatus) * * for the following segment(s) two return values are possible: * - PUT_OK * - PUT_PARTIAL */ static put_rstatus_e _put(item_rstatus_e *istatus, struct request *req) { put_rstatus_e status; struct item *it = NULL; *istatus = ITEM_OK; if (req->first) { /* self-contained req */ struct bstring *key = array_first(req->keys); *istatus = item_reserve(&it, key, &req->vstr, req->vlen, DATAFLAG_SIZE, time_convert_proc_sec((time_i)req->expiry)); req->first = false; req->reserved = it; } else { /* backfill reserved item */ it = req->reserved; item_backfill(it, &req->vstr); } if (!req->partial) { status = (*istatus == ITEM_OK) ? PUT_OK : PUT_ERROR; } else { /* should not update hash */ status = (*istatus == ITEM_OK) ? PUT_PARTIAL : PUT_ERROR; } if (status == PUT_ERROR) { req->swallow = true; req->serror = true; } if (status == PUT_OK) { /* set flag when put is complete */ _set_dataflag(it, req->flag); } return status; } /* * for set/add/replace/cas, we have to recover key from the reserved item, * because the keys field in the request are only valid for the first segment * of the request buffer. Once we move to later segments, the areas pointed to * by these pointers will be overwritten. */ static void _process_set(struct response *rsp, struct request *req) { put_rstatus_e status; item_rstatus_e istatus; struct item *it; struct bstring key; status = _put(&istatus, req); if (status == PUT_PARTIAL) { return; } if (status == PUT_ERROR) { _error_rsp(rsp, istatus); INCR(process_metrics, set_ex); return; } /* PUT_OK, meaning we have an item reserved, i.e. req->reserved != NULL */ INCR(process_metrics, set); it = (struct item *)req->reserved; key = (struct bstring){it->klen, item_key(it)}; item_insert(it, &key); rsp->type = RSP_STORED; INCR(process_metrics, set_stored); log_verb("set req %p processed, rsp type %d", req, rsp->type); } static void _process_add(struct response *rsp, struct request *req) { put_rstatus_e status; item_rstatus_e istatus; struct item *it; struct bstring key; status = _put(&istatus, req); if (status == PUT_PARTIAL) { return; } if (status == PUT_ERROR) { _error_rsp(rsp, istatus); INCR(process_metrics, add_ex); return; } /* PUT_OK, meaning we have an item reserved, i.e. req->reserved != NULL */ INCR(process_metrics, add); it = (struct item *)req->reserved; key = (struct bstring){it->klen, item_key(it)}; if (item_get(&key) != NULL) { item_release((struct item **)&req->reserved); rsp->type = RSP_NOT_STORED; INCR(process_metrics, add_notstored); } else { item_insert(it, &key); rsp->type = RSP_STORED; INCR(process_metrics, add_stored); } log_verb("add req %p processed, rsp type %d", req, rsp->type); } static void _process_replace(struct response *rsp, struct request *req) { put_rstatus_e status; item_rstatus_e istatus; struct item *it = NULL; struct bstring key; status = _put(&istatus, req); if (status == PUT_PARTIAL) { return; } if (status == PUT_ERROR) { _error_rsp(rsp, istatus); INCR(process_metrics, replace_ex); return; } /* PUT_OK, meaning we have an item reserved, i.e. req->reserved != NULL */ INCR(process_metrics, replace); it = (struct item *)req->reserved; key = (struct bstring){it->klen, item_key(it)}; if (item_get(&key) != NULL) { item_insert(it, &key); rsp->type = RSP_STORED; INCR(process_metrics, replace_stored); } else { item_release((struct item **)&req->reserved); rsp->type = RSP_NOT_STORED; INCR(process_metrics, replace_notstored); } log_verb("replace req %p processed, rsp type %d", req, rsp->type); } static void _process_cas(struct response *rsp, struct request *req) { put_rstatus_e status; item_rstatus_e istatus; struct item *it, *oit; struct bstring key; status = _put(&istatus, req); if (status == PUT_PARTIAL) { return; } if (status == PUT_ERROR) { _error_rsp(rsp, istatus); INCR(process_metrics, cas_ex); return; } /* PUT_OK, meaning we have an item reserved, i.e. req->reserved != NULL */ it = (struct item *)req->reserved; key = (struct bstring){it->klen, item_key(it)}; oit = item_get(&key); if (oit == NULL) { item_release((struct item **)&req->reserved); rsp->type = RSP_NOT_FOUND; INCR(process_metrics, cas_notfound); } else { if (item_get_cas(oit) != req->vcas) { item_release((struct item **)&req->reserved); rsp->type = RSP_EXISTS; INCR(process_metrics, cas_exists); } else { item_insert(it, &key); rsp->type = RSP_STORED; INCR(process_metrics, cas_stored); } } log_verb("cas req %p processed, rsp type %d", req, rsp->type); } /* get integer value of it */ /* update item with integer value */ static item_rstatus_e _process_delta(struct response *rsp, struct item *it, struct request *req, struct bstring *key, bool incr) { item_rstatus_e status; uint32_t dataflag; uint64_t vint; struct bstring nval; char buf[CC_UINT64_MAXLEN]; status = item_atou64(&vint, it); if (status != ITEM_OK) { return status; } if (incr) { vint += req->delta; } else { if (vint < req->delta) { vint = 0; } else { vint -= req->delta; } } rsp->vint = vint; nval.len = cc_print_uint64_unsafe(buf, vint); nval.data = buf; if (item_slabid(it->klen, nval.len, it->olen) == it->id) { item_update(it, &nval); return ITEM_OK; } dataflag = _get_dataflag(it); status = item_reserve(&it, key, &nval, nval.len, DATAFLAG_SIZE, it->expire_at); if (status == ITEM_OK) { _set_dataflag(it, dataflag); item_insert(it, key); } return status; } static void _process_incr(struct response *rsp, struct request *req) { item_rstatus_e status; struct bstring *key; struct item *it; INCR(process_metrics, incr); key = array_first(req->keys); it = item_get(key); if (it != NULL) { status = _process_delta(rsp, it, req, key, true); if (status == ITEM_OK) { rsp->type = RSP_NUMERIC; INCR(process_metrics, incr_stored); } else { _error_rsp(rsp, status); INCR(process_metrics, incr_ex); } } else { rsp->type = RSP_NOT_FOUND; INCR(process_metrics, incr_notfound); } log_verb("incr req %p processed, rsp type %d", req, rsp->type); } static void _process_decr(struct response *rsp, struct request *req) { item_rstatus_e status; struct bstring *key; struct item *it; INCR(process_metrics, decr); key = array_first(req->keys); it = item_get(key); if (it != NULL) { status = _process_delta(rsp, it, req, key, false); if (status == ITEM_OK) { rsp->type = RSP_NUMERIC; INCR(process_metrics, decr_stored); } else { _error_rsp(rsp, status); INCR(process_metrics, decr_ex); } } else { rsp->type = RSP_NOT_FOUND; INCR(process_metrics, decr_notfound); } log_verb("decr req %p processed, rsp type %d", req, rsp->type); } static void _process_append(struct response *rsp, struct request *req) { item_rstatus_e status; struct bstring *key; struct item *it; key = array_first(req->keys); it = item_get(key); if (it == NULL) { rsp->type = RSP_NOT_STORED; INCR(process_metrics, append_notstored); } else { if (req->partial) { /* reject incomplete append requests */ status = ITEM_EOVERSIZED; } else { status = item_annex(it, key, &(req->vstr), true); } if (status == ITEM_OK) { rsp->type = RSP_STORED; INCR(process_metrics, append_stored); } else { _error_rsp(rsp, status); INCR(process_metrics, append_ex); } } log_verb("append req %p processed, rsp type %d", req, rsp->type); } static void _process_prepend(struct response *rsp, struct request *req) { item_rstatus_e status; struct bstring *key; struct item *it; key = array_first(req->keys); it = item_get(key); if (it == NULL) { rsp->type = RSP_NOT_STORED; INCR(process_metrics, prepend_notstored); } else { if (req->partial) { /* reject incomplete prepend requests */ status = ITEM_EOVERSIZED; } else { status = item_annex(it, key, &(req->vstr), false); } if (status == ITEM_OK) { rsp->type = RSP_STORED; INCR(process_metrics, prepend_stored); } else { _error_rsp(rsp, status); INCR(process_metrics, prepend_ex); } } log_verb("prepend req %p processed, rsp type %d", req, rsp->type); } static void _process_flush(struct response *rsp, struct request *req) { INCR(process_metrics, flush); rsp->type = RSP_NUMERIC; rsp->vint = item_expire(array_first(req->keys)); log_info("flush req %p processed, rsp type %d", req, rsp->type); } static void _process_flushall(struct response *rsp, struct request *req) { if (allow_flush) { INCR(process_metrics, flushall); item_flush(); rsp->type = RSP_OK; log_info("flush_all req %p processed, rsp type %d", req, rsp->type); } else { rsp->type = RSP_CLIENT_ERROR; rsp->vstr = str2bstr(CMD_ERR_MSG); } } void process_request(struct response *rsp, struct request *req) { log_verb("processing req %p, write rsp to %p", req, rsp); INCR(process_metrics, process_req); switch (req->type) { case REQ_GET: _process_get(rsp, req); break; case REQ_GETS: _process_gets(rsp, req); break; case REQ_DELETE: _process_delete(rsp, req); break; case REQ_SET: _process_set(rsp, req); break; case REQ_ADD: _process_add(rsp, req); break; case REQ_REPLACE: _process_replace(rsp, req); break; case REQ_CAS: _process_cas(rsp, req); break; case REQ_INCR: _process_incr(rsp, req); break; case REQ_DECR: _process_decr(rsp, req); break; case REQ_APPEND: _process_append(rsp, req); break; case REQ_PREPEND: _process_prepend(rsp, req); break; case REQ_FLUSH: _process_flush(rsp, req); break; case REQ_FLUSHALL: _process_flushall(rsp, req); break; default: rsp->type = RSP_CLIENT_ERROR; rsp->vstr = str2bstr(CMD_ERR_MSG); break; } } static inline void _cleanup(struct request *req, struct response *rsp) { struct response *nr = STAILQ_NEXT(rsp, next); request_reset(req); /* return all but the first response */ if (nr != NULL) { response_return_all(&nr); } response_reset(rsp); req->rsp = rsp; } int twemcache_process_read(struct buf **rbuf, struct buf **wbuf, void **data) { parse_rstatus_e status; struct request *req; /* data should be NULL or hold a req pointer */ struct response *rsp; log_verb("post-read processing"); /* deal with the stateful part: request and response */ req = *data; if (req == NULL) { req = *data = request_borrow(); if (req == NULL) { /* TODO(yao): simply return for now, better to respond with OOM */ log_error("cannot process request: OOM"); INCR(process_metrics, process_ex); return -1; } } rsp = (req->rsp != NULL) ? req->rsp : response_borrow(); if (rsp == NULL) { request_return(&req); /* TODO(yao): simply return for now, better to respond with OOM */ log_error("cannot process request: OOM"); INCR(process_metrics, process_ex); return -1; } /* keep parse-process-compose until running out of data in rbuf */ while (buf_rsize(*rbuf) > 0) { struct response *nr; int i, card; /* stage 1: parsing */ log_verb("%"PRIu32" bytes left", buf_rsize(*rbuf)); status = parse_req(req, *rbuf); if (status == PARSE_EUNFIN) { buf_lshift(*rbuf); return 0; } if (status != PARSE_OK) { /* parsing errors are all client errors, since we don't know * how to recover from client errors in this condition (we do not * have a valid request so we don't know where the invalid request * ends), we should close the connection */ log_warn("illegal request received, status: %d", status); return -1; } if (req->swallow) { /* skip to the end of current request */ continue; } /* stage 2: processing- check for quit, allocate response(s), process */ /* quit is special, no response expected */ if (req->type == REQ_QUIT) { log_info("peer called quit"); return -1; } /* find cardinality of the request and get enough response objects */ card = array_nelem(req->keys) - 1; /* we already have one in rsp */ if (req->type == REQ_GET || req->type == REQ_GETS) { /* extra response object for the "END" line after values */ card++; } for (i = 0, nr = rsp; i < card; i++, STAILQ_NEXT(nr, next) = response_borrow(), nr = STAILQ_NEXT(nr, next)) { if (nr == NULL) { log_error("cannot acquire response: OOM"); INCR(process_metrics, process_ex); _cleanup(req, rsp); return -1; } } /* actual processing */ process_request(rsp, req); if (req->partial) { /* implies end of rbuf w/o complete processing */ /* in this case, do not attempt to log or write response */ buf_lshift(*rbuf); return 0; } /* stage 3: write response(s) if necessary */ /* noreply means no need to write to buffers */ card++; if (!req->noreply) { nr = rsp; if (req->type == REQ_GET || req->type == REQ_GETS) { /* for get/gets, card is determined by number of values */ card = req->nfound + 1; } for (i = 0; i < card; nr = STAILQ_NEXT(nr, next), ++i) { if (compose_rsp(wbuf, nr) < 0) { log_error("composing rsp erred"); INCR(process_metrics, process_ex); _cleanup(req, rsp); return -1; } } } /* logging, clean-up */ klog_write(req, rsp); _cleanup(req, rsp); } return 0; } int twemcache_process_write(struct buf **rbuf, struct buf **wbuf, void **data) { log_verb("post-write processing"); buf_lshift(*rbuf); dbuf_shrink(rbuf); buf_lshift(*wbuf); dbuf_shrink(wbuf); return 0; } int twemcache_process_error(struct buf **rbuf, struct buf **wbuf, void **data) { struct request *req = *data; struct response *rsp; log_verb("post-error processing"); /* normalize buffer size */ buf_reset(*rbuf); dbuf_shrink(rbuf); buf_reset(*wbuf); dbuf_shrink(wbuf); /* release request data & associated reserved data */ if (req != NULL) { rsp = req->rsp; if (req->reserved != NULL) { item_release((struct item **)&req->reserved); } response_return_all(&rsp); request_return(&req); } *data = NULL; return 0; }