sparkey_returncode sparkey_hash_write()

in src/hashwriter.c [353:520]


sparkey_returncode sparkey_hash_write(const char *hash_filename, const char *log_filename, int hash_size) {
  sparkey_logheader log_header;
  sparkey_logreader *log;
  sparkey_logiter *iter = NULL;
  sparkey_logiter *ra_iter = NULL;

  RETHROW(sparkey_load_logheader(&log_header, log_filename));

  RETHROW(sparkey_logreader_open(&log, log_filename));
  sparkey_returncode returncode = SPARKEY_SUCCESS;
  TRY(sparkey_logiter_create(&iter, log), close_reader);
  TRY(sparkey_logiter_create(&ra_iter, log), close_iter);

  sparkey_hashheader hash_header;
  sparkey_hashheader old_header;

  double cap;
  uint64_t start;
  uint32_t hash_seed;
  int copy_old;
  uint32_t old_hash_size = 0;
  returncode = sparkey_load_hashheader(&old_header, hash_filename);
  if (returncode == SPARKEY_SUCCESS &&
      old_header.file_identifier == log_header.file_identifier &&
      old_header.major_version == HASH_MAJOR_VERSION &&
      old_header.minor_version == HASH_MINOR_VERSION) {
    // Prepare to copy stuff from old header
    cap = ((log_header.num_puts - old_header.num_puts) + old_header.num_entries) * 1.3;
    start = old_header.data_end;
    hash_seed = old_header.hash_seed;
    hash_header.garbage_size = old_header.garbage_size;

    copy_old = 1;
    old_hash_size = old_header.hash_size;
  } else {
    cap = log_header.num_puts * 1.3;
    start = log_header.header_size;
    TRY(rand32(&hash_seed), close_iter);
    hash_header.garbage_size = 0;
    copy_old = 0;
    returncode = SPARKEY_SUCCESS;
  }

  hash_header.hash_capacity = 1 | (uint64_t) cap;

  hash_header.hash_seed = hash_seed;
  hash_header.max_key_len = log_header.max_key_len;
  hash_header.max_value_len = log_header.max_value_len;
  hash_header.data_end = log_header.data_end;
  hash_header.num_puts = log_header.num_puts;

  hash_header.entry_block_bits = int_log2(log_header.max_entries_per_block);
  hash_header.entry_block_bitmask = (1 << hash_header.entry_block_bits) - 1;

  if (hash_header.data_end < (1ULL << (32 - hash_header.entry_block_bits))) {
    hash_header.address_size = 4;
  } else {
    hash_header.address_size = 8;
  }
  if (old_hash_size == 8 || hash_header.hash_capacity >= (1 << 23)) {
    hash_header.hash_size = 8;
  } else {
    hash_header.hash_size = 4;
  }
  if (hash_size != 0) {
    if (hash_size == 4 || hash_size == 8) {
      hash_header.hash_size = hash_size;
    } else {
      returncode = SPARKEY_HASH_SIZE_INVALID;
      goto close_iter;
    }
  }
  if (hash_header.hash_size != old_hash_size) {
    copy_old = 0;
  }
  hash_header.hash_algorithm = sparkey_get_hash_algorithm(hash_header.hash_size);

  int slot_size = hash_header.hash_size + hash_header.address_size;
  uint64_t hashsize = slot_size * hash_header.hash_capacity;
  uint8_t *hashtable = malloc(hashsize);
  if (hashtable == NULL) {
    fprintf(stderr, "sparkey_hash_write():%d bug: could not malloc %"PRIu64" bytes\n", __LINE__, hashsize);
    returncode = SPARKEY_INTERNAL_ERROR;
    goto close_iter;
  }
  memset(hashtable, 0, hashsize);

  hash_header.max_displacement = 0;
  hash_header.total_displacement = 0;
  hash_header.num_entries = 0;
  hash_header.hash_collisions = 0;

  if (copy_old) {
    if (old_header.data_end == log->header.data_end) {
      // Nothing needs to be done - just exit
      goto close_iter;
    }
    TRY(fill_hash(hashtable, hash_filename, &old_header, &hash_header), free_hashtable);
    TRY(sparkey_logiter_seek(iter, log, start), free_hashtable);
  }

  while (1) {
    TRY(sparkey_logiter_next(iter, log), free_hashtable);
    switch (iter->state) {
    case SPARKEY_ITER_CLOSED:
      goto normal_exit;
      break;
    case SPARKEY_ITER_ACTIVE:
      break;
    default:
      fprintf(stderr, "sparkey_hash_write():%d bug: invalid iter state: %d\n", __LINE__, iter->state);
      returncode = SPARKEY_INTERNAL_ERROR;
      goto free_hashtable;
      break;
    }

    uint64_t iter_block_start = iter->block_position;
    uint64_t iter_entry_count = iter->entry_count;

    uint64_t key_hash = sparkey_iter_hash(&hash_header, iter, log);
    uint64_t wanted_slot = key_hash % hash_header.hash_capacity;

    switch (iter->type) {
    case SPARKEY_ENTRY_PUT:
      TRY(hash_put(wanted_slot, key_hash, hashtable, &hash_header, iter, ra_iter, log, (iter_block_start << hash_header.entry_block_bits) | iter_entry_count), free_hashtable);
      break;
    case SPARKEY_ENTRY_DELETE:
      hash_header.garbage_size += 1 + unsigned_vlq_size(iter->keylen) + iter->keylen;
      TRY(hash_delete(wanted_slot, key_hash, hashtable, &hash_header, iter, ra_iter, log), free_hashtable);
      break;
    }
  }
normal_exit:

  calculate_max_displacement(&hash_header, hashtable);

  // Try removing it first, to avoid overwriting existing files that readers may be using.
  if (remove(hash_filename) < 0) {
    int e = errno;
    if (e != ENOENT) {
      returncode = sparkey_remove_returncode(e);
      goto free_hashtable;
    }
  }
  int fd = creat(hash_filename, 00644);
  hash_header.major_version = HASH_MAJOR_VERSION;
  hash_header.minor_version = HASH_MINOR_VERSION;
  hash_header.file_identifier = log_header.file_identifier;
  hash_header.data_end = log_header.data_end;

  TRY(write_hashheader(fd, &hash_header), close_hash);
  TRY(write_full(fd, hashtable, hashsize), close_hash);

close_hash:
  close(fd);

free_hashtable:
  free(hashtable);

close_iter:
  sparkey_logiter_close(&iter);
  sparkey_logiter_close(&ra_iter);

close_reader:
  sparkey_logreader_close(&log);

  return returncode;
}