in src/hashwriter.c [353:520]
sparkey_returncode sparkey_hash_write(const char *hash_filename, const char *log_filename, int hash_size) {
sparkey_logheader log_header;
sparkey_logreader *log;
sparkey_logiter *iter = NULL;
sparkey_logiter *ra_iter = NULL;
RETHROW(sparkey_load_logheader(&log_header, log_filename));
RETHROW(sparkey_logreader_open(&log, log_filename));
sparkey_returncode returncode = SPARKEY_SUCCESS;
TRY(sparkey_logiter_create(&iter, log), close_reader);
TRY(sparkey_logiter_create(&ra_iter, log), close_iter);
sparkey_hashheader hash_header;
sparkey_hashheader old_header;
double cap;
uint64_t start;
uint32_t hash_seed;
int copy_old;
uint32_t old_hash_size = 0;
returncode = sparkey_load_hashheader(&old_header, hash_filename);
if (returncode == SPARKEY_SUCCESS &&
old_header.file_identifier == log_header.file_identifier &&
old_header.major_version == HASH_MAJOR_VERSION &&
old_header.minor_version == HASH_MINOR_VERSION) {
// Prepare to copy stuff from old header
cap = ((log_header.num_puts - old_header.num_puts) + old_header.num_entries) * 1.3;
start = old_header.data_end;
hash_seed = old_header.hash_seed;
hash_header.garbage_size = old_header.garbage_size;
copy_old = 1;
old_hash_size = old_header.hash_size;
} else {
cap = log_header.num_puts * 1.3;
start = log_header.header_size;
TRY(rand32(&hash_seed), close_iter);
hash_header.garbage_size = 0;
copy_old = 0;
returncode = SPARKEY_SUCCESS;
}
hash_header.hash_capacity = 1 | (uint64_t) cap;
hash_header.hash_seed = hash_seed;
hash_header.max_key_len = log_header.max_key_len;
hash_header.max_value_len = log_header.max_value_len;
hash_header.data_end = log_header.data_end;
hash_header.num_puts = log_header.num_puts;
hash_header.entry_block_bits = int_log2(log_header.max_entries_per_block);
hash_header.entry_block_bitmask = (1 << hash_header.entry_block_bits) - 1;
if (hash_header.data_end < (1ULL << (32 - hash_header.entry_block_bits))) {
hash_header.address_size = 4;
} else {
hash_header.address_size = 8;
}
if (old_hash_size == 8 || hash_header.hash_capacity >= (1 << 23)) {
hash_header.hash_size = 8;
} else {
hash_header.hash_size = 4;
}
if (hash_size != 0) {
if (hash_size == 4 || hash_size == 8) {
hash_header.hash_size = hash_size;
} else {
returncode = SPARKEY_HASH_SIZE_INVALID;
goto close_iter;
}
}
if (hash_header.hash_size != old_hash_size) {
copy_old = 0;
}
hash_header.hash_algorithm = sparkey_get_hash_algorithm(hash_header.hash_size);
int slot_size = hash_header.hash_size + hash_header.address_size;
uint64_t hashsize = slot_size * hash_header.hash_capacity;
uint8_t *hashtable = malloc(hashsize);
if (hashtable == NULL) {
fprintf(stderr, "sparkey_hash_write():%d bug: could not malloc %"PRIu64" bytes\n", __LINE__, hashsize);
returncode = SPARKEY_INTERNAL_ERROR;
goto close_iter;
}
memset(hashtable, 0, hashsize);
hash_header.max_displacement = 0;
hash_header.total_displacement = 0;
hash_header.num_entries = 0;
hash_header.hash_collisions = 0;
if (copy_old) {
if (old_header.data_end == log->header.data_end) {
// Nothing needs to be done - just exit
goto close_iter;
}
TRY(fill_hash(hashtable, hash_filename, &old_header, &hash_header), free_hashtable);
TRY(sparkey_logiter_seek(iter, log, start), free_hashtable);
}
while (1) {
TRY(sparkey_logiter_next(iter, log), free_hashtable);
switch (iter->state) {
case SPARKEY_ITER_CLOSED:
goto normal_exit;
break;
case SPARKEY_ITER_ACTIVE:
break;
default:
fprintf(stderr, "sparkey_hash_write():%d bug: invalid iter state: %d\n", __LINE__, iter->state);
returncode = SPARKEY_INTERNAL_ERROR;
goto free_hashtable;
break;
}
uint64_t iter_block_start = iter->block_position;
uint64_t iter_entry_count = iter->entry_count;
uint64_t key_hash = sparkey_iter_hash(&hash_header, iter, log);
uint64_t wanted_slot = key_hash % hash_header.hash_capacity;
switch (iter->type) {
case SPARKEY_ENTRY_PUT:
TRY(hash_put(wanted_slot, key_hash, hashtable, &hash_header, iter, ra_iter, log, (iter_block_start << hash_header.entry_block_bits) | iter_entry_count), free_hashtable);
break;
case SPARKEY_ENTRY_DELETE:
hash_header.garbage_size += 1 + unsigned_vlq_size(iter->keylen) + iter->keylen;
TRY(hash_delete(wanted_slot, key_hash, hashtable, &hash_header, iter, ra_iter, log), free_hashtable);
break;
}
}
normal_exit:
calculate_max_displacement(&hash_header, hashtable);
// Try removing it first, to avoid overwriting existing files that readers may be using.
if (remove(hash_filename) < 0) {
int e = errno;
if (e != ENOENT) {
returncode = sparkey_remove_returncode(e);
goto free_hashtable;
}
}
int fd = creat(hash_filename, 00644);
hash_header.major_version = HASH_MAJOR_VERSION;
hash_header.minor_version = HASH_MINOR_VERSION;
hash_header.file_identifier = log_header.file_identifier;
hash_header.data_end = log_header.data_end;
TRY(write_hashheader(fd, &hash_header), close_hash);
TRY(write_full(fd, hashtable, hashsize), close_hash);
close_hash:
close(fd);
free_hashtable:
free(hashtable);
close_iter:
sparkey_logiter_close(&iter);
sparkey_logiter_close(&ra_iter);
close_reader:
sparkey_logreader_close(&log);
return returncode;
}