src/session/buffer.rs (241 lines of code) (raw):

// Copyright 2021 Twitter, Inc. // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 //! A very simple buffer type that can be replaced in the future. use core::borrow::{Borrow, BorrowMut}; /// A growable byte buffer pub struct Buffer { buffer: Vec<u8>, read_offset: usize, write_offset: usize, target_capacity: usize, } impl Buffer { /// Create a new `Buffer` that can hold up to `capacity` bytes without /// re-allocating. #[allow(clippy::slow_vector_initialization)] pub fn with_capacity(capacity: usize) -> Self { let mut buffer = Vec::with_capacity(capacity); buffer.resize(capacity, 0); // SESSION_BUFFER_BYTE.add(buffer.capacity() as _); Self { buffer, read_offset: 0, write_offset: 0, target_capacity: capacity, } } /// Returns the amount of space available to write into the buffer without /// reallocating. pub fn available_capacity(&self) -> usize { self.buffer.len() - self.write_offset } /// Return the number of bytes currently in the buffer. pub fn len(&self) -> usize { self.write_offset - self.read_offset } /// Check if the buffer is empty. pub fn is_empty(&self) -> bool { self.len() == 0 } // TODO(bmartin): we're currently relying on the resize behaviors of the // underlying `Vec` storage. This currently results in growth to the next // nearest power of two. Effectively resulting in buffer doubling when a // resize is required. /// Reserve room for `additional` bytes in the buffer. This may reserve more /// space than requested to avoid frequent allocations. If the buffer /// already has sufficient available capacity, this is a no-op. pub fn reserve(&mut self, additional: usize) { // let old_cap = self.buffer.capacity(); let needed = additional.saturating_sub(self.available_capacity()); if needed > 0 { let current = self.buffer.len(); let target = (current + needed).next_power_of_two(); self.buffer.resize(target, 0); // SESSION_BUFFER_BYTE.add((self.buffer.capacity() - old_cap) as _); } } /// Append the bytes from `other` onto `self`. pub fn extend_from_slice(&mut self, other: &[u8]) { self.reserve(other.len()); self.buffer[self.write_offset..(self.write_offset + other.len())].copy_from_slice(other); self.increase_len(other.len()); } /// Mark that `amt` bytes have been consumed and should not be returned in /// future reads from the buffer. pub fn consume(&mut self, bytes: usize) { // let old_capacity = self.buffer.capacity(); self.read_offset = std::cmp::min(self.read_offset + bytes, self.write_offset); // if we have content, before shrinking we must shift content left if !self.is_empty() { self.buffer .copy_within(self.read_offset..self.write_offset, 0); } self.write_offset -= self.read_offset; self.read_offset = 0; // determine the target size of the buffer let target_size = if self.len() * 2 > self.buffer.len() { // buffer too full to shrink, early return return; } else if self.len() > self.target_capacity { // should shrink, but not to target capacity self.buffer.len() / 2 } else { // shrink down to target capacity self.target_capacity }; // buffer can be reduced to the target_size determined above self.buffer.truncate(target_size); self.buffer.shrink_to_fit(); // update stats if the buffer has resized // SESSION_BUFFER_BYTE.sub(old_capacity as i64 - self.buffer.capacity() as i64); } /// Marks the buffer as now containing `amt` additional bytes. This function /// prevents advancing the write offset beyond the initialized area of the /// underlying storage. pub fn increase_len(&mut self, amt: usize) { self.write_offset = std::cmp::min(self.write_offset + amt, self.buffer.len()); } } impl Borrow<[u8]> for Buffer { fn borrow(&self) -> &[u8] { &self.buffer[self.read_offset..self.write_offset] } } impl BorrowMut<[u8]> for Buffer { fn borrow_mut(&mut self) -> &mut [u8] { let available = self.buffer.len(); &mut self.buffer[self.write_offset..available] } } impl Drop for Buffer { fn drop(&mut self) { // SESSION_BUFFER_BYTE.sub(self.buffer.capacity() as _); } } #[cfg(test)] mod tests { use super::Buffer; use std::borrow::Borrow; #[test] // test buffer initialization with various capacities fn new() { let buffer = Buffer::with_capacity(1024); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 1024); assert!(buffer.is_empty()); let buffer = Buffer::with_capacity(2048); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 2048); assert!(buffer.is_empty()); // test zero capacity buffer let buffer = Buffer::with_capacity(0); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 0); assert!(buffer.is_empty()); // test with non power of 2 let buffer = Buffer::with_capacity(100); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 100); assert!(buffer.is_empty()); } #[test] // tests a small buffer growing only on second write fn write_1() { let mut buffer = Buffer::with_capacity(8); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 8); assert!(buffer.is_empty()); // first write fits in buffer buffer.extend_from_slice(b"GET "); assert_eq!(buffer.len(), 4); assert_eq!(buffer.available_capacity(), 4); assert!(!buffer.is_empty()); let content: &[u8] = buffer.borrow(); assert_eq!(content, b"GET "); // second write causes buffer to grow buffer.extend_from_slice(b"SOME_KEY\r\n"); assert_eq!(buffer.len(), 14); assert_eq!(buffer.available_capacity(), 2); assert!(!buffer.is_empty()); let content: &[u8] = buffer.borrow(); assert_eq!(content, b"GET SOME_KEY\r\n"); } #[test] // test a zero capacity buffer growing on two consecutive writes fn write_2() { let mut buffer = Buffer::with_capacity(0); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 0); assert!(buffer.is_empty()); // zero capacity buffer grows on first write buffer.extend_from_slice(b"GET KEY\r\n"); assert_eq!(buffer.len(), 9); assert_eq!(buffer.available_capacity(), 7); assert!(!buffer.is_empty()); let content: &[u8] = buffer.borrow(); assert_eq!(content, b"GET KEY\r\n"); // and again on second write buffer.extend_from_slice(b"SET OTHER_KEY 0 0 1\r\nA\r\n"); assert_eq!(buffer.len(), 33); assert_eq!(buffer.available_capacity(), 31); assert!(!buffer.is_empty()); let content: &[u8] = buffer.borrow(); assert_eq!(content, b"GET KEY\r\nSET OTHER_KEY 0 0 1\r\nA\r\n"); } #[test] // tests a large buffer that grows on first write fn write_3() { let mut buffer = Buffer::with_capacity(16); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 16); assert!(buffer.is_empty()); buffer.extend_from_slice(b"SET SOME_REALLY_LONG_KEY 0 0 1\r\nA\r\n"); assert_eq!(buffer.len(), 35); assert_eq!(buffer.available_capacity(), 29); } #[test] // tests a consume operation where all bytes are consumed and the buffer // remains its original size fn consume_1() { let mut buffer = Buffer::with_capacity(16); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 16); assert!(buffer.is_empty()); buffer.extend_from_slice(b"END\r\n"); assert_eq!(buffer.len(), 5); assert_eq!(buffer.available_capacity(), 11); assert!(!buffer.is_empty()); buffer.consume(5); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 16); assert!(buffer.is_empty()); } #[test] // tests a consume operation where all bytes are consumed and the buffer // shrinks to its original size fn consume_2() { let mut buffer = Buffer::with_capacity(2); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 2); assert!(buffer.is_empty()); // buffer extends to the next power of two // with 5 byte message we need 8 bytes for the buffer buffer.extend_from_slice(b"END\r\n"); assert_eq!(buffer.len(), 5); assert_eq!(buffer.available_capacity(), 3); assert!(!buffer.is_empty()); buffer.consume(5); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 2); assert!(buffer.is_empty()); } #[test] // tests a consume operation where not all bytes are consumed and buffer // remains its original size fn consume_3() { let mut buffer = Buffer::with_capacity(8); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 8); assert!(buffer.is_empty()); let content = b"END\r\n"; let len = content.len(); buffer.extend_from_slice(content); assert_eq!(buffer.len(), len); assert_eq!(buffer.available_capacity(), 3); assert!(!buffer.is_empty()); // consume all but the last byte of content in the buffer, one byte at // a time // - buffer len decreases with each call to consume() // - buffer available capacity increases with each call to consume() for i in 1..len { buffer.consume(1); assert_eq!(buffer.len(), len - i); assert_eq!(buffer.available_capacity(), 3 + i); assert!(!buffer.is_empty()); } // when consuming the final byte, the read/write offsets move to the // start of the buffer, and available capacity should be the original // buffer size buffer.consume(1); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 8); assert!(buffer.is_empty()); } #[test] // tests a consume operation where not all bytes are consumed and buffer // shrinks as bytes are consumed fn consume_4() { let mut buffer = Buffer::with_capacity(16); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 16); assert!(buffer.is_empty()); let content = b"VALUE SOME_REALLY_LONG_KEY 0 1\r\n1\r\nEND\r\n"; // buffer resizes up to 64 bytes to hold 40 bytes // length = 40, size = 64, capacity = 24 buffer.extend_from_slice(content); assert_eq!(buffer.len(), 40); assert_eq!(buffer.available_capacity(), 24); assert!(!buffer.is_empty()); // partial consume, len decrease, buffer shrinks by half // length = 32, size = 32, capacity = 0 buffer.consume(8); assert_eq!(buffer.len(), 32); assert_eq!(buffer.available_capacity(), 0); assert!(!buffer.is_empty()); // consume one more byte and we should get available capacity // length = 31, size = 32, capacity = 1 buffer.consume(1); assert_eq!(buffer.len(), 31); assert_eq!(buffer.available_capacity(), 1); assert!(!buffer.is_empty()); // partial consume, len decrease, buffer shrinks down to target capacity // length = 16, size = 16, capacity = 0 buffer.consume(15); assert_eq!(buffer.len(), 16); assert_eq!(buffer.available_capacity(), 0); // from here on, buffer will not shrink below target capacity // consume one more byte // length = 15, size = 16, capacity = 0 buffer.consume(1); assert_eq!(buffer.len(), 15); assert_eq!(buffer.available_capacity(), 1); // partial consume, len decrease // length = 8, size = 16, capacity = 8 buffer.consume(7); assert_eq!(buffer.len(), 8); assert_eq!(buffer.available_capacity(), 8); // partial consume, len decrease // length = 7, size = 16, capacity = 9 buffer.consume(1); assert_eq!(buffer.len(), 7); assert_eq!(buffer.available_capacity(), 9); // consume all but the final byte // partial consume, len decrease // length = 1, size = 16, capacity = 15 buffer.consume(6); assert_eq!(buffer.len(), 1); assert_eq!(buffer.available_capacity(), 15); // consume the final byte // length = 0, size = 16, capacity = 16 buffer.consume(1); assert_eq!(buffer.len(), 0); assert_eq!(buffer.available_capacity(), 16); } }