fn merge_evict()

in src/storage/seg/src/segments/segments.rs [693:802]


    fn merge_evict(
        &mut self,
        start: NonZeroU32,
        hashtable: &mut HashTable,
    ) -> Result<Option<NonZeroU32>, SegmentsError> {
        SEGMENT_MERGE.increment();

        let dst_id = start;
        let chain_len = self.merge_evict_chain_len(start);

        // TODO(bmartin): this should be a different error probably
        if chain_len < 3 {
            return Err(SegmentsError::NoEvictableSegments);
        }

        let mut next_id = self.get_mut(start).map(|s| s.next_seg())?;

        // merge state
        let mut cutoff = 1.0;
        let mut merged = 0;

        // fixed merge parameters
        let max_merge = self.evict.max_merge();
        let n_merge = self.evict.n_merge();
        let stop_ratio = self.evict.stop_ratio();
        let stop_bytes = (stop_ratio * self.segment_size() as f64) as i32;

        // dynamically set the target ratio based on the length of the merge chain
        let target_ratio = if chain_len < n_merge {
            1.0 / chain_len as f64
        } else {
            self.evict.target_ratio()
        };

        // prune and compact target segment
        {
            let mut dst = self.get_mut(start)?;
            let dst_old_size = dst.live_bytes();

            trace!("prune merge with cutoff: {}", cutoff);
            cutoff = dst.prune(hashtable, cutoff, target_ratio);
            trace!("cutoff is now: {}", cutoff);

            dst.compact(hashtable)?;

            let dst_new_size = dst.live_bytes();
            trace!(
                "dst {}: {} bytes -> {} bytes",
                dst_id,
                dst_old_size,
                dst_new_size
            );

            dst.mark_merged();
            merged += 1;
        }

        // while we still want to merge and can, we prune and compact the source
        // and then copy into the destination. If the destination becomes full,
        // we stop merging
        while let Some(src_id) = next_id {
            if merged > max_merge {
                trace!("stop merge: merged max segments");
                break;
            }

            if !self.get_mut(src_id).map(|s| s.can_evict()).unwrap_or(false) {
                trace!("stop merge: can't evict source segment");
                return Ok(None); // this causes the next_to_merge to reset
            }

            let (mut dst, mut src) = self.get_mut_pair(dst_id, src_id)?;

            let dst_start_size = dst.live_bytes();
            let src_start_size = src.live_bytes();

            if dst_start_size >= stop_bytes {
                trace!("stop merge: target segment is full");
                break;
            }

            trace!("pruning source segment");
            cutoff = src.prune(hashtable, cutoff, target_ratio);

            trace!(
                "src {}: {} bytes -> {} bytes",
                src_id,
                src_start_size,
                src.live_bytes()
            );

            trace!("copying source into target");
            let _ = src.copy_into(&mut dst, hashtable);
            trace!("copy dropped {} bytes", src.live_bytes());

            trace!(
                "dst {}: {} bytes -> {} bytes",
                dst_id,
                dst_start_size,
                dst.live_bytes()
            );

            next_id = src.next_seg();
            src.clear(hashtable, false);
            self.push_free(src_id);
            merged += 1;
        }

        Ok(next_id)
    }