in src/storage/seg/src/segments/segments.rs [693:802]
fn merge_evict(
&mut self,
start: NonZeroU32,
hashtable: &mut HashTable,
) -> Result<Option<NonZeroU32>, SegmentsError> {
SEGMENT_MERGE.increment();
let dst_id = start;
let chain_len = self.merge_evict_chain_len(start);
// TODO(bmartin): this should be a different error probably
if chain_len < 3 {
return Err(SegmentsError::NoEvictableSegments);
}
let mut next_id = self.get_mut(start).map(|s| s.next_seg())?;
// merge state
let mut cutoff = 1.0;
let mut merged = 0;
// fixed merge parameters
let max_merge = self.evict.max_merge();
let n_merge = self.evict.n_merge();
let stop_ratio = self.evict.stop_ratio();
let stop_bytes = (stop_ratio * self.segment_size() as f64) as i32;
// dynamically set the target ratio based on the length of the merge chain
let target_ratio = if chain_len < n_merge {
1.0 / chain_len as f64
} else {
self.evict.target_ratio()
};
// prune and compact target segment
{
let mut dst = self.get_mut(start)?;
let dst_old_size = dst.live_bytes();
trace!("prune merge with cutoff: {}", cutoff);
cutoff = dst.prune(hashtable, cutoff, target_ratio);
trace!("cutoff is now: {}", cutoff);
dst.compact(hashtable)?;
let dst_new_size = dst.live_bytes();
trace!(
"dst {}: {} bytes -> {} bytes",
dst_id,
dst_old_size,
dst_new_size
);
dst.mark_merged();
merged += 1;
}
// while we still want to merge and can, we prune and compact the source
// and then copy into the destination. If the destination becomes full,
// we stop merging
while let Some(src_id) = next_id {
if merged > max_merge {
trace!("stop merge: merged max segments");
break;
}
if !self.get_mut(src_id).map(|s| s.can_evict()).unwrap_or(false) {
trace!("stop merge: can't evict source segment");
return Ok(None); // this causes the next_to_merge to reset
}
let (mut dst, mut src) = self.get_mut_pair(dst_id, src_id)?;
let dst_start_size = dst.live_bytes();
let src_start_size = src.live_bytes();
if dst_start_size >= stop_bytes {
trace!("stop merge: target segment is full");
break;
}
trace!("pruning source segment");
cutoff = src.prune(hashtable, cutoff, target_ratio);
trace!(
"src {}: {} bytes -> {} bytes",
src_id,
src_start_size,
src.live_bytes()
);
trace!("copying source into target");
let _ = src.copy_into(&mut dst, hashtable);
trace!("copy dropped {} bytes", src.live_bytes());
trace!(
"dst {}: {} bytes -> {} bytes",
dst_id,
dst_start_size,
dst.live_bytes()
);
next_id = src.next_seg();
src.clear(hashtable, false);
self.push_free(src_id);
merged += 1;
}
Ok(next_id)
}