src/logreader.c (406 lines of code) (raw):
/*
* Copyright (c) 2012-2013 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include "sparkey.h"
#include "sparkey-internal.h"
#include "logheader.h"
#include "endiantools.h"
#include "util.h"
#define MAGIC_VALUE_LOGITER (0xd765c8cc)
#define MAGIC_VALUE_LOGREADER (0xe93356c4)
static inline uint64_t min64(uint64_t a, uint64_t b) {
if (a < b) {
return a;
}
return b;
}
static inline uint64_t read_vlq(uint8_t * array, uint64_t *position) {
uint64_t res = 0;
uint64_t shift = 0;
uint64_t tmp, tmp2;
while (1) {
tmp = array[(*position)++];
tmp2 = tmp & 0x7f;
if (tmp == tmp2) {
return res | tmp << shift;
}
res |= tmp2 << shift;
shift += 7;
}
return res;
}
sparkey_returncode sparkey_logreader_open_noalloc(sparkey_logreader *log, const char *filename) {
int fd = 0;
sparkey_returncode returncode;
TRY(sparkey_load_logheader(&log->header, filename), cleanup);
log->data_len = log->header.data_end;
struct stat s;
stat(filename, &s);
if (log->data_len > (uint64_t) s.st_size) {
returncode = SPARKEY_LOG_TOO_SMALL;
goto cleanup;
}
fd = open(filename, O_RDONLY);
if (fd < 0) {
returncode = sparkey_open_returncode(errno);
goto cleanup;
}
log->fd = fd;
log->data = mmap(NULL, log->data_len, PROT_READ, MAP_SHARED, fd, 0);
if (log->data == MAP_FAILED) {
returncode = SPARKEY_MMAP_FAILED;
goto cleanup;
}
log->open_status = MAGIC_VALUE_LOGREADER;
return SPARKEY_SUCCESS;
cleanup:
if (fd > 0) close(fd);
return returncode;
}
sparkey_returncode sparkey_logreader_open(sparkey_logreader **log_ref, const char *filename) {
RETHROW(correct_endian_platform());
sparkey_logreader *log = malloc(sizeof(sparkey_logreader));
if (log == NULL) {
return SPARKEY_INTERNAL_ERROR;
}
sparkey_returncode returncode;
TRY(sparkey_logreader_open_noalloc(log, filename), cleanup);
*log_ref = log;
return SPARKEY_SUCCESS;
cleanup:
free(log);
return returncode;
}
void sparkey_logreader_close_nodealloc(sparkey_logreader *log) {
if (log == NULL) {
return;
}
if (log->open_status != MAGIC_VALUE_LOGREADER) {
return;
}
log->open_status = 0;
if (log->data != NULL) {
munmap(log->data, log->data_len);
log->data = NULL;
}
close(log->fd);
log->fd = -1;
}
void sparkey_logreader_close(sparkey_logreader **log_ref) {
if (log_ref == NULL) {
return;
}
sparkey_logreader *log = *log_ref;
sparkey_logreader_close_nodealloc(log);
free(log);
*log_ref = NULL;
}
static sparkey_returncode assert_log_open(sparkey_logreader *log) {
if (log->open_status != MAGIC_VALUE_LOGREADER) {
return SPARKEY_LOG_CLOSED;
}
return SPARKEY_SUCCESS;
}
static sparkey_returncode assert_iter_open(sparkey_logiter *iter, sparkey_logreader *log) {
RETHROW(assert_log_open(log));
if (iter->open_status != MAGIC_VALUE_LOGITER) {
return SPARKEY_LOG_ITERATOR_CLOSED;
}
if (iter->file_identifier != log->header.file_identifier) {
return SPARKEY_LOG_ITERATOR_MISMATCH;
}
return SPARKEY_SUCCESS;
}
sparkey_returncode sparkey_logiter_create(sparkey_logiter **iter_ref, sparkey_logreader *log) {
RETHROW(assert_log_open(log));
sparkey_logiter *iter = malloc(sizeof(sparkey_logiter));
if (iter == NULL) {
return SPARKEY_INTERNAL_ERROR;
}
iter->open_status = MAGIC_VALUE_LOGITER;
iter->file_identifier = log->header.file_identifier;
iter->block_position = 0;
iter->next_block_position = log->header.header_size;
iter->block_offset = 0;
iter->block_len = 0;
iter->state = SPARKEY_ITER_NEW;
if (sparkey_uses_compressor(log->header.compression_type)) {
iter->compression_buf_allocated = 1;
iter->compression_buf = malloc(log->header.compression_block_size);
if (iter->compression_buf == NULL) {
free(iter);
return SPARKEY_INTERNAL_ERROR;
}
} else {
iter->compression_buf_allocated = 0;
}
*iter_ref = iter;
return SPARKEY_SUCCESS;
}
void sparkey_logiter_close(sparkey_logiter **iter_ref) {
if (iter_ref == NULL) {
return;
}
sparkey_logiter *iter = *iter_ref;
if (iter == NULL) {
return;
}
if (iter->open_status != MAGIC_VALUE_LOGITER) {
return;
}
iter->open_status = 0;
if (iter->compression_buf_allocated) {
free(iter->compression_buf);
}
free(iter);
*iter_ref = NULL;
}
static sparkey_returncode seekblock(sparkey_logiter *iter, sparkey_logreader *log, uint64_t position) {
iter->block_offset = 0;
if (iter->block_position == position) {
return SPARKEY_SUCCESS;
}
if (sparkey_uses_compressor(log->header.compression_type)) {
uint64_t pos = position;
// TODO: assert that we're not reading > uint32_t
uint32_t compressed_size = read_vlq(log->data, &pos);
uint64_t next_pos = pos + compressed_size;
uint32_t uncompressed_size = log->header.compression_block_size;
sparkey_returncode ret = sparkey_compressors[log->header.compression_type].decompress(
&log->data[pos], compressed_size, iter->compression_buf, &uncompressed_size);
if (ret != SPARKEY_SUCCESS) {
return ret;
}
iter->block_position = position;
iter->next_block_position = next_pos;
iter->block_len = uncompressed_size;
} else {
iter->compression_buf = &log->data[position];
iter->block_position = position;
iter->next_block_position = log->header.data_end;
iter->block_len = log->data_len - position;
}
return SPARKEY_SUCCESS;
}
sparkey_returncode sparkey_logiter_seek(sparkey_logiter *iter, sparkey_logreader *log, uint64_t position) {
RETHROW(assert_iter_open(iter, log));
if (position == log->header.data_end) {
iter->state = SPARKEY_ITER_CLOSED;
return SPARKEY_SUCCESS;
}
RETHROW(seekblock(iter, log, position));
iter->entry_count = -1;
iter->state = SPARKEY_ITER_NEW;
return SPARKEY_SUCCESS;
}
static sparkey_returncode ensure_available(sparkey_logiter *iter, sparkey_logreader *log) {
if (iter->block_offset < iter->block_len) {
return SPARKEY_SUCCESS;
}
if (iter->next_block_position >= log->header.data_end) {
iter->block_position = 0;
iter->block_offset = 0;
iter->block_len = 0;
return SPARKEY_SUCCESS;
}
RETHROW(seekblock(iter, log, iter->next_block_position));
iter->entry_count = -1;
return SPARKEY_SUCCESS;
}
static sparkey_returncode skip(sparkey_logiter *iter, sparkey_logreader *log, uint64_t len) {
while (len > 0) {
RETHROW(ensure_available(iter, log));
uint64_t m = min64(len, iter->block_len - iter->block_offset);
len -= m;
iter->block_offset += m;
}
return SPARKEY_SUCCESS;
}
sparkey_returncode sparkey_logiter_next(sparkey_logiter *iter, sparkey_logreader *log) {
if (iter->state == SPARKEY_ITER_CLOSED) {
return SPARKEY_SUCCESS;
}
uint64_t key_remaining = 0;
uint64_t value_remaining = 0;
if (iter->state == SPARKEY_ITER_ACTIVE) {
key_remaining = iter->key_remaining;
value_remaining = iter->value_remaining;
}
iter->state = SPARKEY_ITER_INVALID;
iter->key_remaining = 0;
iter->value_remaining = 0;
iter->keylen = 0;
iter->valuelen = 0;
RETHROW(assert_iter_open(iter, log));
RETHROW(skip(iter, log, key_remaining));
RETHROW(skip(iter, log, value_remaining));
RETHROW(ensure_available(iter, log));
if (iter->block_len - iter->block_offset == 0) {
// Reached end of data
iter->state = SPARKEY_ITER_CLOSED;
return SPARKEY_SUCCESS;
}
if (log->header.compression_type == SPARKEY_COMPRESSION_NONE) {
iter->block_position += iter->block_offset;
iter->block_len -= iter->block_offset;
iter->block_offset = 0;
iter->compression_buf = &log->data[iter->block_position];
iter->entry_count = -1;
}
iter->entry_count++;
uint64_t a = read_vlq(iter->compression_buf, &iter->block_offset);
uint64_t b = read_vlq(iter->compression_buf, &iter->block_offset);
if (a == 0) {
iter->keylen = iter->key_remaining = b;
iter->valuelen = iter->value_remaining = 0;
iter->type = SPARKEY_ENTRY_DELETE;
} else {
iter->keylen = iter->key_remaining = a - 1;
iter->valuelen = iter->value_remaining = b;
iter->type = SPARKEY_ENTRY_PUT;
}
iter->entry_block_position = iter->block_position;
iter->entry_block_offset = iter->block_offset;
iter->state = SPARKEY_ITER_ACTIVE;
return SPARKEY_SUCCESS;
}
sparkey_returncode sparkey_logiter_reset(sparkey_logiter *iter, sparkey_logreader *log) {
if (iter->state != SPARKEY_ITER_ACTIVE) {
return SPARKEY_LOG_ITERATOR_INACTIVE;
}
RETHROW(seekblock(iter, log, iter->entry_block_position));
iter->key_remaining = iter->keylen;
iter->value_remaining = iter->valuelen;
iter->block_offset = iter->entry_block_offset;
return SPARKEY_SUCCESS;
}
sparkey_returncode sparkey_logiter_skip(sparkey_logiter *iter, sparkey_logreader *log, int count) {
while (count > 0) {
count--;
RETHROW(sparkey_logiter_next(iter, log));
}
return SPARKEY_SUCCESS;
}
static sparkey_returncode sparkey_logiter_chunk(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint64_t *len, uint8_t ** res, uint64_t *var) {
RETHROW(assert_iter_open(iter, log));
if (iter->state != SPARKEY_ITER_ACTIVE) {
return SPARKEY_LOG_ITERATOR_INACTIVE;
}
if (*var > 0) {
RETHROW(ensure_available(iter, log));
uint64_t m = min64(*var, iter->block_len - iter->block_offset);
m = min64(maxlen, m);
*len = m;
*res = &iter->compression_buf[iter->block_offset];
iter->block_offset += m;
*var -= m;
return SPARKEY_SUCCESS;
}
*len = 0;
return SPARKEY_SUCCESS;
}
sparkey_returncode sparkey_logiter_keychunk(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t ** res, uint64_t *len) {
return sparkey_logiter_chunk(iter, log, maxlen, len, res, &iter->key_remaining);
}
sparkey_returncode sparkey_logiter_valuechunk(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t ** res, uint64_t *len) {
RETHROW(skip(iter, log, iter->key_remaining));
iter->key_remaining = 0;
return sparkey_logiter_chunk(iter, log, maxlen, len, res, &iter->value_remaining);
}
sparkey_returncode sparkey_logiter_fill_key(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t *buf, uint64_t *len) {
*len = 0;
while (maxlen > 0) {
uint8_t *buf2;
uint64_t len2;
RETHROW(sparkey_logiter_keychunk(iter, log, maxlen, &buf2, &len2));
if (len2 == 0) {
return SPARKEY_SUCCESS;
}
memcpy(buf, buf2, len2);
buf += len2;
*len += len2;
maxlen -= len2;
}
return SPARKEY_SUCCESS;
}
sparkey_returncode sparkey_logiter_fill_value(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t *buf, uint64_t *len) {
*len = 0;
while (maxlen > 0) {
uint8_t *buf2;
uint64_t len2;
RETHROW(sparkey_logiter_valuechunk(iter, log, maxlen, &buf2, &len2));
if (len2 == 0) {
return SPARKEY_SUCCESS;
}
memcpy(buf, buf2, len2);
buf += len2;
*len += len2;
maxlen -= len2;
}
return SPARKEY_SUCCESS;
}
sparkey_returncode sparkey_logiter_keycmp(sparkey_logiter *iter1, sparkey_logiter *iter2, sparkey_logreader *log, int *res) {
uint8_t *first;
uint64_t first_len;
uint8_t *second;
uint64_t second_len;
RETHROW(sparkey_logiter_keychunk(iter1, log, 1 << 30, &first, &first_len));
RETHROW(sparkey_logiter_keychunk(iter2, log, 1 << 30, &second, &second_len));
while (1) {
if (first_len == 0 && second_len == 0) {
break;
}
if (first_len == 0) {
*res = -1;
return SPARKEY_SUCCESS;
}
if (second_len == 0) {
*res = 1;
return SPARKEY_SUCCESS;
}
uint64_t cmp_len = min64(first_len, second_len);
int v = memcmp(first, second, cmp_len);
if (v) {
*res = v;
return SPARKEY_SUCCESS;
}
first += cmp_len;
first_len -= cmp_len;
second += cmp_len;
second_len -= cmp_len;
if (first_len == 0) {
RETHROW(sparkey_logiter_keychunk(iter1, log, 1 << 30, &first, &first_len));
}
if (second_len == 0) {
RETHROW(sparkey_logiter_keychunk(iter2, log, 1 << 30, &second, &second_len));
}
}
*res = 0;
return SPARKEY_SUCCESS;
}
uint64_t sparkey_logreader_maxkeylen(sparkey_logreader *log) {
return log->header.max_key_len;
}
uint64_t sparkey_logreader_maxvaluelen(sparkey_logreader *log) {
return log->header.max_value_len;
}
int sparkey_logreader_get_compression_blocksize(sparkey_logreader *log) {
return log->header.compression_block_size;
}
sparkey_compression_type sparkey_logreader_get_compression_type(sparkey_logreader *log) {
return log->header.compression_type;
}
sparkey_iter_state sparkey_logiter_state(sparkey_logiter *iter) {
return iter->state;
}
sparkey_entry_type sparkey_logiter_type(sparkey_logiter *iter) {
return iter->type;
}
uint64_t sparkey_logiter_keylen(sparkey_logiter *iter) {
return iter->keylen;
}
uint64_t sparkey_logiter_valuelen(sparkey_logiter *iter) {
return iter->valuelen;
}