From 83647a66e1edd3fd8e3fd91edc5fd60887409bba Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Mon, 10 Feb 2025 14:58:08 +0100 Subject: [PATCH] packet: Use `Bytes` instead of an array in `Packet` The reason behind the switch is that `Packet` gets cloned in multiple places. `Bytes` provides a zero-copy abstraction, where calling `clone()` doesn't make an actual copy of the underlying data and all instances point to the same memory. However, the old layout containing a sized array is convenient for CUDA. To not break CUDA support, this change introduces a new struct called `PacketArray`, which a `Packet` can be converted into just before calling GPU-based sigverify. That requires a copy, ideally just one. At the same time, CPU-based sigverify and all the other components are going to benefit from zero-copy properties of `Bytes`. --- Cargo.lock | 4 + Cargo.toml | 1 + packet/Cargo.toml | 1 + packet/src/lib.rs | 530 +++++++++++++++++++++++++++++++++++----------- 4 files changed, 416 insertions(+), 120 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5977ef1..d052ca66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -496,6 +496,9 @@ name = "bytes" version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9" +dependencies = [ + "serde", +] [[package]] name = "cast" @@ -3263,6 +3266,7 @@ version = "2.2.1" dependencies = [ "bincode", "bitflags 2.8.0", + "bytes", "cfg_eval", "serde", "serde_derive", diff --git a/Cargo.toml b/Cargo.toml index 63b455e6..3f78c385 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,6 +140,7 @@ bs58 = { version = "0.5.1", default-features = false } bv = "0.11.1" bytemuck = "1.21.0" bytemuck_derive = "1.8.1" +bytes = { version = "1.10", features = ["serde"] } cfg_eval = "0.1.2" chrono = { version = "0.4.39", default-features = false } console = "0.15.10" diff --git a/packet/Cargo.toml b/packet/Cargo.toml index 2817b9b6..e3b6e1d6 100644 --- a/packet/Cargo.toml +++ b/packet/Cargo.toml @@ -12,6 +12,7 @@ edition = { workspace = true } [dependencies] bincode = { workspace = true, optional = true } bitflags = { workspace = true } +bytes = { workspace = true } cfg_eval = { workspace = true, optional = true } serde = { workspace = true, optional = true } serde_derive = { workspace = true, optional = true } diff --git a/packet/src/lib.rs b/packet/src/lib.rs index 871f9050..871bc993 100644 --- a/packet/src/lib.rs +++ b/packet/src/lib.rs @@ -2,26 +2,27 @@ #![cfg_attr(feature = "frozen-abi", feature(min_specialization))] #![cfg_attr(docsrs, feature(doc_auto_cfg))] +#[cfg(feature = "bincode")] +use bincode::{Options, Result}; +// `BufMut` provides convenient methods for writing bytes into `BytesMut`. +// Export them. +pub use bytes::BufMut; +#[cfg(feature = "serde")] +use serde_derive::{Deserialize, Serialize}; #[cfg(feature = "frozen-abi")] use solana_frozen_abi_macro::AbiExample; -#[cfg(feature = "bincode")] -use { - bincode::{Options, Result}, - std::io::Write, -}; use { bitflags::bitflags, + bytes::{Bytes, BytesMut}, std::{ fmt, + io::Write, + mem::{self, MaybeUninit}, net::{IpAddr, Ipv4Addr, SocketAddr}, + ops::{Deref, DerefMut}, slice::SliceIndex, }, }; -#[cfg(feature = "serde")] -use { - serde_derive::{Deserialize, Serialize}, - serde_with::{serde_as, Bytes}, -}; #[cfg(test)] static_assertions::const_assert_eq!(PACKET_DATA_SIZE, 1232); @@ -31,18 +32,6 @@ static_assertions::const_assert_eq!(PACKET_DATA_SIZE, 1232); /// 8 bytes is the size of the fragment header pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8; -#[cfg(feature = "bincode")] -pub trait Encode { - fn encode(&self, writer: W) -> Result<()>; -} - -#[cfg(feature = "bincode")] -impl Encode for T { - fn encode(&self, writer: W) -> Result<()> { - bincode::serialize_into::(writer, self) - } -} - bitflags! { #[repr(C)] #[cfg_attr(feature = "serde", derive(Deserialize, Serialize))] @@ -68,7 +57,6 @@ bitflags! { #[derive(Clone, Debug, PartialEq, Eq)] #[repr(C)] pub struct Meta { - pub size: usize, pub addr: IpAddr, pub port: u16, pub flags: PacketFlags, @@ -89,154 +77,383 @@ impl ::solana_frozen_abi::abi_example::EvenAsOpaque for PacketFlags { const TYPE_NAME_MATCHER: &'static str = "::_::InternalBitFlags"; } -// serde_as is used as a work around because array isn't supported by serde -// (and serde_bytes). -// -// the root cause is of a historical special handling for [T; 0] in rust's -// `Default` and supposedly mirrored serde's `Serialize` (macro) impls, -// pre-dating stabilized const generics, meaning it'll take long time...: -// https://github.com/rust-lang/rust/issues/61415 -// https://github.com/rust-lang/rust/issues/88744#issuecomment-1138678928 -// -// Due to the nature of the root cause, the current situation is complicated. -// All in all, the serde_as solution is chosen for good perf and low maintenance -// need at the cost of another crate dependency.. -// -// For details, please refer to the below various links... -// -// relevant merged/published pr for this serde_as functionality used here: -// https://github.com/jonasbb/serde_with/pull/277 -// open pr at serde_bytes: -// https://github.com/serde-rs/bytes/pull/28 -// open issue at serde: -// https://github.com/serde-rs/serde/issues/1937 -// closed pr at serde (due to the above mentioned [N; 0] issue): -// https://github.com/serde-rs/serde/pull/1860 -// ryoqun's dirty experiments: -// https://github.com/ryoqun/serde-array-comparisons -// -// We use the cfg_eval crate as advised by the serde_with guide: -// https://docs.rs/serde_with/latest/serde_with/guide/serde_as/index.html#gating-serde_as-on-features -#[cfg_attr(feature = "serde", cfg_eval::cfg_eval, serde_as)] +/// Returns an immutable slice of the provided `buffer`. +/// +/// Returns `None` if the index is invalid or if the provided `meta` is marked +/// as discard. +#[inline] +fn data<'a, I>( + buffer: &'a [u8], + meta: &Meta, + index: I, +) -> Option<&'a >::Output> +where + I: SliceIndex<[u8]>, +{ + // If the packet is marked as discard, it is either invalid or + // otherwise should be ignored, and so the payload should not be read + // from. + if meta.discard() { + None + } else { + buffer.get(index) + } +} + +/// Creates a [`BytesMut`] buffer and [`Meta`] from the given serializable +/// `data`. +#[cfg(feature = "bincode")] +fn from_data(dest: Option<&SocketAddr>, data: T) -> Result<(BytesMut, Meta)> +where + T: serde::Serialize, +{ + let buffer = BytesMut::with_capacity(PACKET_DATA_SIZE); + let mut writer = buffer.writer(); + bincode::serialize_into(&mut writer, &data)?; + let buffer = writer.into_inner(); + let mut meta = Meta::default(); + if let Some(dest) = dest { + meta.set_socket_addr(dest); + } + Ok((buffer, meta)) +} + +/// Read data and metadata from the packet. +pub trait PacketRead { + fn data(&self, index: I) -> Option<&>::Output> + where + I: SliceIndex<[u8]>; + /// Returns an immutable reference to the metadata. + fn meta(&self) -> &Meta; + fn size(&self) -> usize; +} + +/// Representation of a network packet, consisting of the `buffer` containing +/// the payload and `meta` with information about socket address, size and +/// flags. +/// +/// `Packet` is cheaply clonable. Multiple `Packet` instances can point to +/// the same underlying memory. Cloning a `Packet` copies only metadata. +/// +/// `Packet`'s `buffer` is immutable. If you are looking for a structure +/// meant for receiving socket messages and mutation, use [`PacketMut`]. #[cfg_attr(feature = "frozen-abi", derive(AbiExample))] #[cfg_attr(feature = "serde", derive(Deserialize, Serialize))] -#[derive(Clone, Eq)] -#[repr(C)] +#[derive(Clone, Default, Eq)] pub struct Packet { - // Bytes past Packet.meta.size are not valid to read from. - // Use Packet.data(index) to read from the buffer. - #[cfg_attr(feature = "serde", serde_as(as = "Bytes"))] - buffer: [u8; PACKET_DATA_SIZE], + buffer: Bytes, meta: Meta, } impl Packet { - pub fn new(buffer: [u8; PACKET_DATA_SIZE], meta: Meta) -> Self { + pub fn new(buffer: Bytes, meta: Meta) -> Self { Self { buffer, meta } } - /// Returns an immutable reference to the underlying buffer up to - /// packet.meta.size. The rest of the buffer is not valid to read from. - /// packet.data(..) returns packet.buffer.get(..packet.meta.size). - /// Returns None if the index is invalid or if the packet is already marked - /// as discard. + /// Returns a mutable reference to the metadata. #[inline] - pub fn data(&self, index: I) -> Option<&>::Output> + pub fn meta_mut(&mut self) -> &mut Meta { + &mut self.meta + } + + #[cfg(feature = "bincode")] + pub fn from_data(dest: Option<&SocketAddr>, data: T) -> Result where - I: SliceIndex<[u8]>, + T: serde::Serialize, { - // If the packet is marked as discard, it is either invalid or - // otherwise should be ignored, and so the payload should not be read - // from. - if self.meta.discard() { - None - } else { - self.buffer.get(..self.meta.size)?.get(index) - } + let (buffer, meta) = from_data(dest, data)?; + let buffer = buffer.freeze(); + Ok(Packet { buffer, meta }) + } + + #[cfg(feature = "bincode")] + pub fn deserialize_slice(&self, index: I) -> Result + where + T: serde::de::DeserializeOwned, + I: SliceIndex<[u8], Output = [u8]>, + { + let bytes = self.data(index).ok_or(bincode::ErrorKind::SizeLimit)?; + bincode::options() + .with_limit(PACKET_DATA_SIZE as u64) + .with_fixint_encoding() + .reject_trailing_bytes() + .deserialize(bytes) } +} - /// Returns a mutable reference to the entirety of the underlying buffer to - /// write into. The caller is responsible for updating Packet.meta.size - /// after writing to the buffer. +impl fmt::Debug for Packet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Packet {{ addr: {:?} }}", self.meta.socket_addr()) + } +} + +impl Deref for Packet { + type Target = Bytes; + + fn deref(&self) -> &Self::Target { + &self.buffer + } +} + +impl From for Packet { + fn from(packet_array: PacketArray) -> Self { + let buffer = match packet_array.data(..) { + Some(data) => Bytes::from(data.to_vec()), + None => Bytes::new(), + }; + let PacketArray { meta, .. } = packet_array; + Self { buffer, meta } + } +} + +impl PacketRead for Packet { #[inline] - pub fn buffer_mut(&mut self) -> &mut [u8] { - debug_assert!(!self.meta.discard()); - &mut self.buffer[..] + fn data(&self, index: I) -> Option<&>::Output> + where + I: SliceIndex<[u8]>, + { + data(&self.buffer, &self.meta, index) } #[inline] - pub fn meta(&self) -> &Meta { + fn meta(&self) -> &Meta { &self.meta } #[inline] + fn size(&self) -> usize { + self.buffer.len() + } +} + +impl PartialEq for Packet { + fn eq(&self, other: &Self) -> bool { + self.meta() == other.meta() && self.data(..) == other.data(..) + } +} + +/// Mutable representation of a network packet, consisting of the `buffer` for +/// storing the payload and `meta` for keeping information about socket +/// address, size and flags. +/// +/// The main use case is using `PacketMut` as a buffer for receving messages +/// with syscalls like `recvmmsg`. It's also convenient for writing tests, +/// where we want to construct a packet manually before passing it to a tested +/// function. +/// +/// `PacketMut` is cheaply clonable. Multiple `PacketMut` instances can point +/// to the same underlying memory. Cloning a `PacketMut` copies only metadata. +/// +/// `PacketMut` can be converted into an immutable packet using +/// [`freeze`](Self::freeze) method. +#[derive(Clone, Eq, PartialEq)] +pub struct PacketMut { + buffer: BytesMut, + meta: Meta, +} + +impl PacketMut { + /// Returns a mutable reference to the underlying buffer. + pub fn buffer_mut(&mut self) -> &mut BytesMut { + &mut self.buffer + } + + /// Returns a mutable reference to the metadata. pub fn meta_mut(&mut self) -> &mut Meta { &mut self.meta } #[cfg(feature = "bincode")] - pub fn from_data(dest: Option<&SocketAddr>, data: T) -> Result { - let mut packet = Self::default(); - Self::populate_packet(&mut packet, dest, &data)?; - Ok(packet) + pub fn from_data(dest: Option<&SocketAddr>, data: T) -> Result + where + T: serde::Serialize, + { + let (buffer, meta) = from_data(dest, data)?; + Ok(PacketMut { buffer, meta }) } - #[cfg(feature = "bincode")] - pub fn populate_packet( - &mut self, - dest: Option<&SocketAddr>, - data: &T, - ) -> Result<()> { - debug_assert!(!self.meta.discard()); - let mut wr = std::io::Cursor::new(self.buffer_mut()); - ::encode(data, &mut wr)?; - self.meta.size = wr.position() as usize; - if let Some(dest) = dest { - self.meta.set_socket_addr(dest); + /// Converts `self` into an immutable [`Packet`]. + /// + /// The conversion is zero cost and is used to indicate that the packet + /// buffer referenced by the handle will no longer be mutated. The + /// resulting [`Packet`] can be cloned and shared across threads. + pub fn freeze(self) -> Packet { + let Self { buffer, meta } = self; + let buffer = buffer.freeze(); + Packet { buffer, meta } + } +} + +impl fmt::Debug for PacketMut { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "PacketMut {{ addr: {:?} }}", self.meta.socket_addr()) + } +} + +impl Default for PacketMut { + fn default() -> Self { + let buffer = BytesMut::with_capacity(PACKET_DATA_SIZE); + let meta = Meta::default(); + Self { buffer, meta } + } +} + +impl Deref for PacketMut { + type Target = BytesMut; + + fn deref(&self) -> &Self::Target { + &self.buffer + } +} + +impl DerefMut for PacketMut { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.buffer + } +} + +impl PacketRead for PacketMut { + #[inline] + fn data(&self, index: I) -> Option<&>::Output> + where + I: SliceIndex<[u8]>, + { + data(&self.buffer, &self.meta, index) + } + + fn meta(&self) -> &Meta { + &self.meta + } + + #[inline] + fn size(&self) -> usize { + self.buffer.len() + } +} + +// TODO: Re-evaluate the necessity of `PacketArray`. +// +// `Bytes` contain contiguous memory. We know the size of each bytes. What's +// not contiguous is the `Packet` struct itself. Perhaps if we pass `Bytes` of +// a packet to CUDA separately (just `Bytes`, not the whole `Packet`), it's +// going to work just fine. However, doing so is not going to be trivial. + +/// Representation of a network packet, where the `buffer` is an array. +/// +/// `PacketArray` is expensive to clone and loses all the zero-copy benefits of +/// [`Packet`] and [`PacketMut`]. However, `PacketArray` is a contiguous struct +/// with no pointers, and therefore is very convenient for CUDA. +#[derive(Clone)] +#[repr(C)] +pub struct PacketArray { + buffer: [MaybeUninit; PACKET_DATA_SIZE], + meta: Meta, + size: usize, +} + +impl PacketArray { + pub fn from_packet

(packet: &P) -> Self + where + P: PacketRead, + { + let meta = packet.meta().to_owned(); + let mut new_buffer = [MaybeUninit::uninit(); PACKET_DATA_SIZE]; + if let Some(data) = packet.data(..) { + let mut writer = new_buffer.writer(); + // PANICS: We are writing to a buffer. The only chance of any error + // happening here is if the data is larger than a buffer, but we + // already prevent that by ensuring the constraints in `Packet`. + writer.write_all(data).unwrap(); + } + + Self { + buffer: new_buffer, + meta, + size: packet.size(), } - Ok(()) } #[cfg(feature = "bincode")] - pub fn deserialize_slice(&self, index: I) -> Result + pub fn from_data(dest: Option<&SocketAddr>, data: T) -> Result where - T: serde::de::DeserializeOwned, - I: SliceIndex<[u8], Output = [u8]>, + T: serde::Serialize, { - let bytes = self.data(index).ok_or(bincode::ErrorKind::SizeLimit)?; - bincode::options() - .with_limit(PACKET_DATA_SIZE as u64) - .with_fixint_encoding() - .reject_trailing_bytes() - .deserialize(bytes) + let mut packet = PacketArray { + size: bincode::serialized_size(&data)? as usize, + ..Default::default() + }; + let mut writer = packet.buffer.writer(); + bincode::serialize_into(&mut writer, &data)?; + if let Some(dest) = dest { + packet.meta.set_socket_addr(dest); + } + Ok(packet) } } -impl fmt::Debug for Packet { +impl fmt::Debug for PacketArray { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "Packet {{ size: {:?}, addr: {:?} }}", - self.meta.size, - self.meta.socket_addr() + "Packet {{ addr: {:?}, size: {:?} }}", + self.meta.socket_addr(), + self.size ) } } -#[allow(clippy::uninit_assumed_init)] -impl Default for Packet { +impl Default for PacketArray { fn default() -> Self { - let buffer = std::mem::MaybeUninit::<[u8; PACKET_DATA_SIZE]>::uninit(); Self { - buffer: unsafe { buffer.assume_init() }, + buffer: [MaybeUninit::uninit(); PACKET_DATA_SIZE], meta: Meta::default(), + size: usize::default(), } } } -impl PartialEq for Packet { +impl Eq for PacketArray {} + +impl

From

for PacketArray +where + P: PacketRead, +{ + fn from(packet: P) -> Self { + Self::from_packet(&packet) + } +} + +impl PacketArray { + fn data(&self, index: I) -> Option<&>::Output> + where + I: SliceIndex<[u8]>, + { + if self.meta.discard() { + None + } else { + // SAFETY: We are sure that the elements up to `self.size` are + // initialized. + let data = + unsafe { mem::transmute::<&[MaybeUninit], &[u8]>(&self.buffer[..self.size]) }; + let data = data.get(index)?; + Some(data) + } + } + + #[inline] + pub fn meta(&self) -> &Meta { + &self.meta + } + + #[inline] + pub fn size(&self) -> usize { + self.size + } +} + +impl PartialEq for PacketArray { fn eq(&self, other: &Self) -> bool { - self.meta() == other.meta() && self.data(..) == other.data(..) + self.meta() == other.meta() && self.data(..) == other.data(..) && self.size == other.size } } @@ -305,7 +522,6 @@ impl Meta { impl Default for Meta { fn default() -> Self { Self { - size: 0, addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED), port: 0, flags: PacketFlags::empty(), @@ -315,10 +531,60 @@ impl Default for Meta { #[cfg(test)] mod tests { - use super::*; + use {super::*, bytes::BufMut, std::net::SocketAddrV4}; #[test] - fn test_deserialize_slice() { + fn test_packet_partial_eq() { + let mut p1 = PacketMut::default(); + let mut p2 = PacketMut::default(); + + p1.put_u8(0); + p2.put_u8(0); + + assert!(p1 == p2); + + let fp1 = p1.clone().freeze(); + let fp2 = p2.clone().freeze(); + + assert!(fp1 == fp2); + + p2.buffer_mut()[0] = 4; + assert!(p1 != p2); + } + + #[test] + fn test_freeze() { + let p = PacketMut::from_data(None, u32::MAX).unwrap(); + let p = p.freeze(); + assert_eq!( + p.meta(), + &Meta { + addr: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + port: 0, + flags: PacketFlags::empty(), + } + ); + + let p = PacketMut::from_data( + Some(&SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(10, 0, 0, 1), + 9001, + ))), + u32::MAX, + ) + .unwrap(); + assert_eq!( + p.meta(), + &Meta { + addr: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), + port: 9001, + flags: PacketFlags::empty(), + } + ); + } + + #[test] + fn test_packet_deserialize_slice() { let p = Packet::from_data(None, u32::MAX).unwrap(); assert_eq!(p.deserialize_slice(..).ok(), Some(u32::MAX)); assert_eq!(p.deserialize_slice(0..4).ok(), Some(u32::MAX)); @@ -355,4 +621,28 @@ mod tests { Err("the size limit has been reached".to_string()), ); } + + #[test] + fn test_packet_array_from_owned() { + let p = Packet::from_data(None, u32::MAX).unwrap(); + assert_eq!(p.data(..).unwrap(), u32::MAX.to_ne_bytes()); + + let pa: PacketArray = p.into(); + assert_eq!(pa.data(..).unwrap(), u32::MAX.to_ne_bytes()); + + let p: Packet = pa.into(); + assert_eq!(p.data(..).unwrap(), u32::MAX.to_ne_bytes()); + } + + #[test] + fn test_packet_array_from_ref() { + let p = &Packet::from_data(None, u32::MAX).unwrap(); + assert_eq!(p.data(..).unwrap(), u32::MAX.to_ne_bytes()); + + let pa = PacketArray::from_packet(p); + assert_eq!(pa.data(..).unwrap(), u32::MAX.to_ne_bytes()); + + let p: Packet = pa.into(); + assert_eq!(p.data(..).unwrap(), u32::MAX.to_ne_bytes()); + } }