src/cc_rbuf.c (148 lines of code) (raw):
/*
* ccommon - a cache common library.
* Copyright (C) 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cc_rbuf.h>
#include <cc_bstring.h>
#include <cc_debug.h>
#include <cc_mm.h>
#define RBUF_MODULE_NAME "ccommon::rbuf"
static rbuf_metrics_st *rbuf_metrics = NULL;
static bool rbuf_init = false;
void
rbuf_setup(rbuf_metrics_st *metrics)
{
log_info("set up the %s module", RBUF_MODULE_NAME);
rbuf_metrics = metrics;
if (rbuf_init) {
log_warn("%s has already been setup, overwrite", RBUF_MODULE_NAME);
}
rbuf_init = true;
}
void
rbuf_teardown(void)
{
log_info("tear down the %s module", RBUF_MODULE_NAME);
if (!rbuf_init) {
log_warn("%s has never been setup", RBUF_MODULE_NAME);
}
rbuf_metrics = NULL;
rbuf_init = false;
}
struct rbuf *
rbuf_create(uint32_t cap)
{
struct rbuf *buf;
log_verb("Create ring buffer with capacity %zu", cap);
buf = cc_alloc(RBUF_HDR_SIZE + cap + 1);
if (buf == NULL) {
log_error("Could not allocate rbuf with capacity %zu due to OOM", cap);
INCR(rbuf_metrics, rbuf_create_ex);
return NULL;
}
buf->wpos = buf->rpos = 0;
buf->cap = cap;
INCR(rbuf_metrics, rbuf_create);
INCR(rbuf_metrics, rbuf_curr);
INCR_N(rbuf_metrics, rbuf_byte, RBUF_HDR_SIZE + cap + 1);
return buf;
}
void
rbuf_destroy(struct rbuf **buf)
{
ASSERT(buf != NULL);
if (*buf != NULL) {
log_verb("Destroy ring buffer %p", *buf);
uint32_t cap = (*buf)->cap;
cc_free(*buf);
*buf = NULL;
INCR(rbuf_metrics, rbuf_destroy);
DECR(rbuf_metrics, rbuf_curr);
DECR_N(rbuf_metrics, rbuf_byte, RBUF_HDR_SIZE + cap + 1);
}
}
/**
* RBUF READ/WRITE CAPACITY:
*
* Cases:
*
* 1. wpos < rpos
* wcap = rpos - wpos - 1
*
* +--------------------------+
* | ||||||||||||||||||| |
* +---|------------------|---+
* ^ ^
* | |
* \ \
* wpos rpos
*
* rcap = cap + wpos - rpos + 1
*
* +--------------------------+
* ||||| |||||
* +---|------------------|---+
* ^ ^
* | |
* \ \
* wpos rpos
*
*
* 2. wpos >= rpos
* wcap = cap - wpos + rpos
*
* +--------------------------+
* |||| |||||
* +---|------------------|---+
* ^ ^
* | |
* \ \
* rpos wpos
*
* rcap = wpos - rpos
*
* +--------------------------+
* | |||||||||||||||||||| |
* +---|------------------|---+
* ^ ^
* | |
* \ \
* rpos wpos
*
*/
size_t
rbuf_rcap(struct rbuf *buf)
{
uint32_t rpos, wpos;
rpos = get_rpos(buf);
wpos = get_wpos(buf);
if (wpos < rpos) {
return buf->cap + wpos - rpos + 1;
} else {
return wpos - rpos;
}
}
size_t
rbuf_wcap(struct rbuf *buf)
{
uint32_t rpos, wpos;
rpos = get_rpos(buf);
wpos = get_wpos(buf);
if (wpos < rpos) {
/* no wrap around */
return rpos - wpos - 1;
} else {
return buf->cap - wpos + rpos;
}
}
static inline uint32_t
_min(uint32_t first, uint32_t second)
{
return first < second ? first : second;
}
size_t
rbuf_read(void *dst, struct rbuf *src, size_t n)
{
size_t capacity, ret;
uint32_t rpos, wpos;
rpos = get_rpos(src);
wpos = get_wpos(src);
if (wpos < rpos) {
/* write until end, then wrap around */
capacity = src->cap - rpos + 1;
ret = _min(capacity, n);
cc_memcpy(dst, src->data + rpos, ret);
if (ret < n) {
/* start copying from beginning of srcfer */
size_t remaining_bytes;
capacity = wpos;
remaining_bytes = _min(capacity, n - ret);
cc_memcpy((uint8_t *)dst + ret, src->data, remaining_bytes);
ret += remaining_bytes;
rpos = remaining_bytes;
} else {
rpos += ret;
}
} else {
/* no wrapping around */
capacity = wpos - rpos;
ret = _min(capacity, n);
cc_memcpy(dst, src->data + rpos, ret);
rpos += ret;
}
set_rpos(src, rpos);
return ret;
}
size_t
rbuf_write(struct rbuf *dst, void *src, size_t n)
{
size_t capacity, ret;
uint32_t rpos, wpos;
rpos = get_rpos(dst);
wpos = get_wpos(dst);
if (wpos < rpos) {
/* no wrapping around */
capacity = rpos - wpos - 1;
ret = _min(capacity, n);
cc_memcpy(dst->data + wpos, src, ret);
wpos += ret;
} else {
/* read until end, then wrap around */
capacity = dst->cap - wpos + 1;
ret = _min(capacity, n);
cc_memcpy(dst->data + wpos, src, ret);
if (ret < n) {
/* start copying from beginning of dstfer */
size_t remaining_bytes;
capacity = rpos - 1;
remaining_bytes = _min(capacity, n - ret);
cc_memcpy(dst->data, (uint8_t *)src + ret, remaining_bytes);
ret += remaining_bytes;
wpos = remaining_bytes;
} else {
wpos += ret;
}
}
set_wpos(dst, wpos);
return ret;
}