rust/ccommon-rs/src/buf.rs (251 lines of code) (raw):

// ccommon - a cache common library. // Copyright (C) 2019 Twitter, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use ccommon_sys::buf; use std::io::{self, Read, Write}; use std::mem::MaybeUninit; use std::ops::{Deref, DerefMut}; use bytes::{Buf as BytesBuf, BufMut as BytesBufMut}; /// A non-owned buffer, can be read from and written to. /// /// # Safety /// This is a self-referential struct that assumes it is /// followed by it's data. Attempting to move it or create /// a new instance (via transmute) will cause UB. #[repr(transparent)] #[derive(Debug)] pub struct Buf { buf: buf, } impl Buf { pub unsafe fn from_ptr<'a>(buf: *const buf) -> &'a Buf { &*(buf as *const Buf) } pub unsafe fn from_mut_ptr<'a>(buf: *mut buf) -> &'a mut Buf { &mut *(buf as *mut Buf) } pub fn as_ptr(&self) -> *const buf { &self.buf as *const buf } pub fn as_mut_ptr(&mut self) -> *mut buf { &mut self.buf as *mut buf } #[deprecated(note = "use from_ptr instead")] pub unsafe fn from_raw<'a>(buf: *const buf) -> &'a Buf { &*(buf as *const Buf) } #[deprecated(note = "use from_mut_ptr instead")] pub unsafe fn from_raw_mut<'a>(buf: *mut buf) -> &'a mut Buf { &mut *(buf as *mut Buf) } #[deprecated(note = "use as_ptr instead")] #[allow(clippy::wrong_self_convention)] pub fn into_raw(&self) -> *const buf { &self.buf as *const _ } #[deprecated(note = "use as_mut_ptr instead")] #[allow(clippy::wrong_self_convention)] pub fn into_raw_mut(&mut self) -> *mut buf { &mut self.buf as *mut _ } /// The number of bytes that can still be written to the /// buffer before it is full. pub fn write_size(&self) -> usize { assert!(self.buf.wpos as usize <= self.buf.end as usize); self.buf.end as usize - self.buf.wpos as usize } /// The number of bytes left to read from the buffer pub fn read_size(&self) -> usize { assert!(self.buf.rpos as usize <= self.buf.wpos as usize); self.buf.wpos as usize - self.buf.rpos as usize } pub fn capacity(&self) -> usize { assert!(self.buf.begin.as_ptr() as usize <= self.buf.end as usize); self.buf.end as usize - self.buf.begin.as_ptr() as usize } /// Additional capacity required to write count bytes to the buffer pub fn new_cap(&self, bytes: usize) -> usize { assert!(self.buf.begin.as_ptr() as usize <= self.buf.wpos as usize); bytes.saturating_sub(self.write_size()) } /// Clear the buffer and remove it from any allocation pool pub fn reset(&mut self) { self.buf.next.stqe_next = std::ptr::null_mut(); self.buf.free = false; self.buf.rpos = self.buf.begin.as_mut_ptr(); self.buf.wpos = self.buf.rpos; } /// Shift data in the buffer to the end of the buffer. pub fn rshift(&mut self) { let size = self.read_size(); if size > 0 { unsafe { std::ptr::copy(self.buf.rpos, self.buf.rpos.offset(-(size as isize)), size); } } self.buf.rpos = unsafe { self.buf.end.offset(-(size as isize)) }; self.buf.wpos = self.buf.end; } /// Shift data in the buffer to the start of the buffer. pub fn lshift(&mut self) { let size = self.read_size(); if size > 0 { unsafe { std::ptr::copy(self.buf.rpos, self.buf.begin.as_mut_ptr(), size); } } self.buf.rpos = self.buf.begin.as_mut_ptr(); self.buf.wpos = self.buf.rpos.wrapping_add(size); } /// Get the currently valid part of the buffer as a slice pub fn as_slice(&self) -> &[u8] { use std::slice; unsafe { slice::from_raw_parts(self.buf.rpos as *const u8, self.read_size()) } } /// Get the currently valid part of the buffer as a slice pub fn as_slice_mut(&mut self) -> &mut [u8] { use std::slice; unsafe { slice::from_raw_parts_mut(self.buf.rpos as *mut u8, self.read_size()) } } } /// An owned buffer. /// /// In addition to all the operations supported by `Buf` /// an `OwnedBuf` also supports some resizing operations. #[repr(transparent)] #[derive(Debug)] pub struct OwnedBuf { buf: *mut buf, } impl OwnedBuf { pub unsafe fn from_raw(buf: *mut buf) -> Self { Self { buf } } pub fn into_raw(self) -> *mut buf { let buf = self.buf; // Don't want to run drop on the destructor std::mem::forget(self); buf } pub fn as_ptr(&self) -> *const buf { self.buf } pub fn as_mut_ptr(&mut self) -> *mut buf { self.buf } /// Double the size of the buffer. pub fn double(&mut self) -> Result<(), crate::Error> { use ccommon_sys::dbuf_double; unsafe { let status = dbuf_double(&mut self.buf as *mut _); if status != 0 { Err(status.into()) } else { Ok(()) } } } /// Shrink the buffer to the max of the initial size or the content size. pub fn shrink(&mut self) -> Result<(), crate::Error> { use ccommon_sys::dbuf_shrink; unsafe { let status = dbuf_shrink(&mut self.buf as *mut _); if status != 0 { Err(status.into()) } else { Ok(()) } } } /// Resize the buffer to fit the required capacity. /// /// # Panics /// Panics if `cap` is greater than `u32::MAX`. pub fn fit(&mut self, cap: usize) -> Result<(), crate::Error> { use ccommon_sys::dbuf_fit; assert!((cap as u64) <= std::u32::MAX as u64); unsafe { let status = dbuf_fit(&mut self.buf as *mut _, cap as u32); if status != 0 { Err(status.into()) } else { Ok(()) } } } } unsafe impl Send for OwnedBuf {} unsafe impl Sync for OwnedBuf {} impl Drop for OwnedBuf { fn drop(&mut self) { use ccommon_sys::buf_destroy; unsafe { buf_destroy(&mut self.buf as *mut _); } } } impl Deref for OwnedBuf { type Target = Buf; fn deref(&self) -> &Buf { unsafe { Buf::from_ptr(self.buf) } } } impl DerefMut for OwnedBuf { fn deref_mut(&mut self) -> &mut Buf { unsafe { Buf::from_mut_ptr(self.buf) } } } impl Read for Buf { fn read(&mut self, out: &mut [u8]) -> io::Result<usize> { let len = self.read_size().min(out.len()); unsafe { std::ptr::copy_nonoverlapping(self.buf.rpos as *const u8, out.as_mut_ptr(), len) } self.buf.rpos = self.buf.rpos.wrapping_add(len); Ok(len) } } impl Write for Buf { fn write(&mut self, src: &[u8]) -> io::Result<usize> { if src.is_empty() { return Ok(0); } let len = self.write_size().min(src.len()); unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), self.buf.wpos as *mut _, src.len()) } self.buf.wpos = self.buf.wpos.wrapping_add(len); Ok(len) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } impl Read for OwnedBuf { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { (**self).read(buf) } } impl Write for OwnedBuf { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { (**self).write(buf) } fn flush(&mut self) -> io::Result<()> { (**self).flush() } } impl BytesBuf for Buf { fn remaining(&self) -> usize { self.read_size() } fn bytes(&self) -> &[u8] { self.as_slice() } fn advance(&mut self, cnt: usize) { assert!(cnt <= self.read_size()); self.buf.rpos = self.buf.rpos.wrapping_add(cnt); } } impl BytesBufMut for Buf { fn remaining_mut(&self) -> usize { self.write_size() } unsafe fn advance_mut(&mut self, cnt: usize) { assert!(cnt <= self.write_size()); self.buf.wpos = self.buf.wpos.wrapping_add(cnt); } fn bytes_mut(&mut self) -> &mut [MaybeUninit<u8>] { unsafe { std::slice::from_raw_parts_mut(self.buf.wpos as *mut MaybeUninit<u8>, self.write_size()) } } } impl BytesBuf for OwnedBuf { fn remaining(&self) -> usize { (**self).remaining() } fn bytes(&self) -> &[u8] { BytesBuf::bytes(&**self) } fn advance(&mut self, cnt: usize) { (**self).advance(cnt) } } impl BytesBufMut for OwnedBuf { fn remaining_mut(&self) -> usize { (**self).remaining_mut() } unsafe fn advance_mut(&mut self, cnt: usize) { (**self).advance_mut(cnt) } fn bytes_mut(&mut self) -> &mut [MaybeUninit<u8>] { BytesBufMut::bytes_mut(&mut **self) } }