src/cc_ring_array.c (87 lines of code) (raw):

/* * ccommon - a cache common library. * Copyright (C) 2015 Twitter, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <cc_ring_array.h> #include <cc_bstring.h> #include <cc_debug.h> #include <cc_mm.h> #include <stdbool.h> #define RING_ARRAY_HDR_SIZE offsetof(struct ring_array, data) /** * The total number of slots allocated is (cap + 1) * * Each ring array should have exactly one reader and exactly one writer, as * far as threads are concerned (which can be the same). This allows the use of * atomic instructions to replace locks. * * We use an extra slot to differentiate full from empty. * * 1) If rpos == wpos, the buffer is empty. * * 2) If rpos is behind wpos (see below): * # of occupied slots: wpos - rpos * # of vacant slots: rpos + cap - wpos + 1 * # of writable slots: rpos + cap - wpos * full if: rpos == 0, wpos == cap * * 0 cap * | | * v v * +-+-+-+---------------+-+-+ * data | | | | ... | | | * +-+-+-+---------------+-+-+ * ^ ^ * | | * rpos wpos * * 3) If rpos is ahead of wpos (see below): * # of occupied slots: wpos + cap - rpos + 1 * # of vacant slots: rpos - wpos * # of writable slots: rpos - wpos - 1 * full if: rpos == wpos + 1 * * 0 cap * | | * v v * +-+-+-+---------------+-+-+ * data | | | | ... | | | * +-+-+-+---------------+-+-+ * ^ ^ * | | * wpos rpos * */ static inline uint32_t ring_array_nelem(uint32_t rpos, uint32_t wpos, uint32_t cap) { if (rpos <= wpos) { /* condition 1), 2) */ return wpos - rpos; } else { /* condition 3) */ return wpos + (cap - rpos + 1); } } rstatus_i ring_array_push(const void *elem, struct ring_array *arr) { uint32_t new_wpos; if (ring_array_full(arr)) { log_debug("Could not push to ring array %p; array is full", arr); return CC_ERROR; } cc_memcpy(arr->data + (arr->elem_size * arr->wpos), elem, arr->elem_size); /* update wpos atomically */ new_wpos = (arr->wpos + 1) % (arr->cap + 1); __atomic_store_n(&(arr->wpos), new_wpos, __ATOMIC_RELAXED); return CC_OK; } bool ring_array_full(const struct ring_array *arr) { /* * Take snapshot of rpos, since another thread might be popping. Note: other * members of arr do not need to be saved because we assume the other thread * only pops and does not push; in other words, only one thread updates * either rpos or wpos. */ uint32_t rpos = __atomic_load_n(&(arr->rpos), __ATOMIC_RELAXED); return ring_array_nelem(rpos, arr->wpos, arr->cap) == arr->cap; } rstatus_i ring_array_pop(void *elem, struct ring_array *arr) { uint32_t new_rpos; if (ring_array_empty(arr)) { log_debug("Could not pop from ring array %p; array is empty", arr); return CC_ERROR; } if (elem != NULL) { cc_memcpy(elem, arr->data + (arr->elem_size * arr->rpos), arr->elem_size); } /* update rpos atomically */ new_rpos = (arr->rpos + 1) % (arr->cap + 1); __atomic_store_n(&(arr->rpos), new_rpos, __ATOMIC_RELAXED); return CC_OK; } bool ring_array_empty(const struct ring_array *arr) { /* take snapshot of wpos, since another thread might be pushing */ uint32_t wpos = __atomic_load_n(&(arr->wpos), __ATOMIC_RELAXED); return ring_array_nelem(arr->rpos, wpos, arr->cap) == 0; } void ring_array_flush(struct ring_array *arr) { uint32_t wpos = __atomic_load_n(&(arr->wpos), __ATOMIC_RELAXED); __atomic_store_n(&(arr->rpos), wpos, __ATOMIC_RELAXED); } struct ring_array * ring_array_create(size_t elem_size, uint32_t cap) { struct ring_array *arr; /* underlying array has # items stored + 1, since full is when wpos is 1 element behind wpos */ arr = cc_alloc(RING_ARRAY_HDR_SIZE + elem_size * (cap + 1)); if (arr == NULL) { log_error("Could not allocate memory for ring array cap %u " "elem_size %u", cap, elem_size); return NULL; } arr->elem_size = elem_size; arr->cap = cap; arr->rpos = arr->wpos = 0; return arr; } void ring_array_destroy(struct ring_array **arr) { if ((arr == NULL) || (*arr == NULL)) { log_warn("destroying NULL ring_array pointer"); return; } log_verb("destroying ring array %p and freeing memory", *arr); cc_free(*arr); *arr = NULL; }