Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ bloom-filters.workspace = true
ckb-spawn.workspace = true
bitflags.workspace = true
p2p = { workspace = true, default-features = false }
lz4 = "1.28.1"

[target.'cfg(not(target_family = "wasm"))'.dependencies]
p2p = { workspace = true, default-features = false, features = [
Expand Down
162 changes: 128 additions & 34 deletions network/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use std::io;

pub(crate) const COMPRESSION_SIZE_THRESHOLD: usize = 1024;
const UNCOMPRESS_FLAG: u8 = 0b0000_0000;
const COMPRESS_FLAG: u8 = 0b1000_0000;
const SNAPPY_FLAG: u8 = 0b1000_0000;
const LZ4_FLAG: u8 = 0b0100_0000;
const MAX_UNCOMPRESSED_LEN: usize = 1 << 23; // 8MB

/// Compressed decompression structure
Expand All @@ -26,6 +27,31 @@ const MAX_UNCOMPRESSED_LEN: usize = 1 << 23; // 8MB
/// +-------+------+------------------------------------------------+
/// | 1~ | | Payload (Serialized Data with Compress) |
/// +-------+------+------------------------------------------------+
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum CompressionType {
None,
Snappy,
Lz4,
}

impl CompressionType {
fn from_flag(flag: u8) -> Self {
match flag & 0b1100_0000 {
SNAPPY_FLAG => CompressionType::Snappy,
LZ4_FLAG => CompressionType::Lz4,
_ => CompressionType::None,
}
}

fn to_flag(self) -> u8 {
match self {
CompressionType::None => UNCOMPRESS_FLAG,
CompressionType::Snappy => SNAPPY_FLAG,
CompressionType::Lz4 => LZ4_FLAG,
}
}
}

#[derive(Clone, Debug)]
pub(crate) struct Message {
inner: BytesMut,
Expand All @@ -45,65 +71,128 @@ impl Message {
Self { inner: data }
}

/// Compress message
pub(crate) fn compress(mut self) -> Bytes {
if self.inner.len() > COMPRESSION_SIZE_THRESHOLD {
/// Compress message with specified compression type
pub(crate) fn compress_with(mut self, compression_type: CompressionType) -> Bytes {
if self.inner.len() > COMPRESSION_SIZE_THRESHOLD && compression_type != CompressionType::None {
let input = self.inner.split_off(1);
match SnapEncoder::new().compress_vec(&input) {
let compress_result = match compression_type {
CompressionType::Snappy => self.compress_snappy(&input),
CompressionType::Lz4 => self.compress_lz4(&input),
CompressionType::None => return self.inner.freeze(),
};

match compress_result {
Ok(res) => {
self.inner.extend_from_slice(&res);
self.set_compress_flag();
self.set_compression_flag(compression_type);
}
Err(e) => {
debug!("snappy compress error: {}", e);
debug!("{:?} compress error: {}", compression_type, e);
self.inner.unsplit(input);
}
}
}
self.inner.freeze()
}

/// Compress message with default snappy compression
pub(crate) fn compress(self) -> Bytes {
self.compress_with(CompressionType::Snappy)
}

/// Compress message in snappy format
fn compress_snappy(&self, input: &BytesMut) -> Result<Vec<u8>, io::Error> {
SnapEncoder::new()
.compress_vec(input)
.map_err(|e| {
io::Error::new(io::ErrorKind::InvalidData, e)
})
}

/// Compress message in lz4 format
fn compress_lz4(&self, input: &BytesMut) -> Result<Vec<u8>, io::Error> {
lz4::block::compress(input, None, false)
.map_err(|e| {
io::Error::new(io::ErrorKind::InvalidData, e)
})
}

/// Decompress message
pub(crate) fn decompress(mut self) -> Result<Bytes, io::Error> {
if self.inner.is_empty() {
Err(io::ErrorKind::InvalidData.into())
} else if self.compress_flag() {
match decompress_len(&self.inner[1..]) {
Ok(decompressed_bytes_len) => {
if decompressed_bytes_len > MAX_UNCOMPRESSED_LEN {
debug!(
"The limit for uncompressed bytes len is exceeded. limit: {}, len: {}",
MAX_UNCOMPRESSED_LEN, decompressed_bytes_len
);
return Err(io::ErrorKind::InvalidData.into());
}

let compression_type = self.compression_type();

match compression_type {
CompressionType::None => {
let _ = self.inner.split_to(1);
Ok(self.inner.freeze())
},
CompressionType::Snappy => self.decompress_snappy(),
CompressionType::Lz4 => self.decompress_lz4(),
}
}

/// Decompress message in snappy format
fn decompress_snappy(&mut self) -> Result<Bytes, io::Error> {
match decompress_len(&self.inner[1..]) {
Ok(decompressed_bytes_len) => {
if decompressed_bytes_len > MAX_UNCOMPRESSED_LEN {
debug!(
"The limit for uncompressed bytes len is exceeded. limit: {}, len: {}",
MAX_UNCOMPRESSED_LEN, decompressed_bytes_len
);
return Err(io::ErrorKind::InvalidData.into());
}

let mut buf = vec![0; decompressed_bytes_len];
match SnapDecoder::new().decompress(&self.inner[1..], &mut buf) {
Ok(_) => Ok(buf.into()),
Err(e) => {
debug!("snappy decompress error: {:?}", e);
Err(io::ErrorKind::InvalidData.into())
} else {
let mut buf = vec![0; decompressed_bytes_len];
match SnapDecoder::new().decompress(&self.inner[1..], &mut buf) {
Ok(_) => Ok(buf.into()),
Err(e) => {
debug!("snappy decompress error: {:?}", e);
Err(io::ErrorKind::InvalidData.into())
}
}
}
}
Err(e) => {
debug!("snappy decompress_len error: {:?}", e);
Err(io::ErrorKind::InvalidData.into())
}
Err(e) => {
debug!("snappy decompress_len error: {:?}", e);
Err(io::ErrorKind::InvalidData.into())
}
}
}

/// Decompress message in lz4 format
fn decompress_lz4(&mut self) -> Result<Bytes, io::Error> {
match lz4::block::decompress(&self.inner[1..], Some(MAX_UNCOMPRESSED_LEN as i32)) {
Ok(decompressed_data) => {
if decompressed_data.len() > MAX_UNCOMPRESSED_LEN {
debug!(
"The limit for uncompressed bytes len is exceeded. limit: {}, len: {}",
MAX_UNCOMPRESSED_LEN, decompressed_data.len()
);
return Err(io::ErrorKind::InvalidData.into());
}
Ok(decompressed_data.into())
}
Err(e) => {
debug!("lz4 decompress error: {:?}", e);
Err(io::ErrorKind::InvalidData.into())
}
} else {
let _ = self.inner.split_to(1);
Ok(self.inner.freeze())
}
}

pub(crate) fn set_compress_flag(&mut self) {
self.inner[0] = COMPRESS_FLAG;
fn set_compression_flag(&mut self, compression_type: CompressionType) {
self.inner[0] = compression_type.to_flag();
}

pub(crate) fn compression_type(&self) -> CompressionType {
CompressionType::from_flag(self.inner[0])
}

pub(crate) fn compress_flag(&self) -> bool {
(self.inner[0] & COMPRESS_FLAG) != 0
self.compression_type() != CompressionType::None
}
}

Expand All @@ -116,3 +205,8 @@ pub fn compress(src: Bytes) -> Bytes {
pub fn decompress(src: BytesMut) -> Result<Bytes, io::Error> {
Message::from_compressed(src).decompress()
}

/// Compress data with specified compression type
pub fn compress_with(src: Bytes, compression_type: CompressionType) -> Bytes {
Message::from_raw(src).compress_with(compression_type)
}
21 changes: 20 additions & 1 deletion network/src/tests/compress.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use p2p::bytes::{Bytes, BytesMut};

use crate::compress::{COMPRESSION_SIZE_THRESHOLD, Message, compress, decompress};
use crate::compress::{COMPRESSION_SIZE_THRESHOLD, Message, compress, decompress, CompressionType, compress_with};

#[test]
fn test_no_need_compress() {
Expand Down Expand Up @@ -46,3 +46,22 @@ fn test_invalid_data() {
assert!(decompress(BytesMut::from(&cmp_data.as_ref()[1..])).is_err());
assert!(decompress(BytesMut::new()).is_err());
}

#[test]
fn test_compression_types() {
let data = Bytes::from(vec![1u8; 2000]);

let snappy_compressed = compress_with(data.clone(), CompressionType::Snappy);
let snappy_msg = Message::from_compressed(BytesMut::from(snappy_compressed.as_ref()));
assert_eq!(snappy_msg.compression_type(), CompressionType::Snappy);

let lz4_compressed = compress_with(data.clone(), CompressionType::Lz4);
let lz4_msg = Message::from_compressed(BytesMut::from(lz4_compressed.as_ref()));
assert_eq!(lz4_msg.compression_type(), CompressionType::Lz4);

let decompressed_snappy = decompress(BytesMut::from(snappy_compressed.as_ref())).unwrap();
let decompressed_lz4 = decompress(BytesMut::from(lz4_compressed.as_ref())).unwrap();

assert_eq!(decompressed_snappy, data);
assert_eq!(decompressed_lz4, data);
}
Loading