Skip to content

Commit

Permalink
perf(query): improve SELECTs performance
Browse files Browse the repository at this point in the history
Also, add `RowCursor::received_bytes()` and `RowCursor::decoded_bytes()`
  • Loading branch information
loyd committed Oct 18, 2024
1 parent 7b599ef commit dd5f3e5
Show file tree
Hide file tree
Showing 17 changed files with 411 additions and 454 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ url = "2.1.1"
futures = "0.3.5"
futures-channel = "0.3.30"
static_assertions = "1.1"
sealed = "0.5"
sealed = "0.6"
sha-1 = { version = "0.10", optional = true }
serde_json = { version = "1.0.68", optional = true }
lz4_flex = { version = "0.11.3", default-features = false, features = [
Expand Down
15 changes: 10 additions & 5 deletions benches/select_numbers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ struct Data {

async fn bench(name: &str, compression: Compression) {
let start = std::time::Instant::now();
let sum = tokio::spawn(do_bench(compression)).await.unwrap();
let (sum, dec_mbytes, rec_mbytes) = tokio::spawn(do_bench(compression)).await.unwrap();
assert_eq!(sum, 124999999750000000);
let elapsed = start.elapsed();
println!("{name:>8} {elapsed:>7.3?}");
let throughput = dec_mbytes / elapsed.as_secs_f64();
println!("{name:>8} {elapsed:>7.3?} {throughput:>4.0} MiB/s {rec_mbytes:>4.0} MiB");
}

async fn do_bench(compression: Compression) -> u64 {
async fn do_bench(compression: Compression) -> (u64, f64, f64) {
let client = Client::default()
.with_compression(compression)
.with_url("http://localhost:8123");
Expand All @@ -30,12 +31,16 @@ async fn do_bench(compression: Compression) -> u64 {
sum += row.no;
}

sum
let dec_bytes = cursor.decoded_bytes();
let dec_mbytes = dec_bytes as f64 / 1024.0 / 1024.0;
let recv_bytes = cursor.received_bytes();
let recv_mbytes = recv_bytes as f64 / 1024.0 / 1024.0;
(sum, dec_mbytes, recv_mbytes)
}

#[tokio::main]
async fn main() {
println!("compress elapsed");
println!("compress elapsed throughput received");
bench("none", Compression::None).await;
#[cfg(feature = "lz4")]
bench("lz4", Compression::Lz4).await;
Expand Down
167 changes: 0 additions & 167 deletions src/buflist.rs

This file was deleted.

80 changes: 80 additions & 0 deletions src/bytes_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use bytes::{Bytes, BytesMut};

#[derive(Default)]
pub(crate) struct BytesExt {
bytes: Bytes,
cursor: usize,
}

impl BytesExt {
#[inline(always)]
pub(crate) fn slice(&self) -> &[u8] {
&self.bytes[self.cursor..]
}

#[inline(always)]
pub(crate) fn remaining(&self) -> usize {
self.bytes.len() - self.cursor
}

#[inline(always)]
pub(crate) fn set_remaining(&mut self, n: usize) {
// We can use `bytes.advance()` here, but it's slower.
self.cursor = self.bytes.len() - n;
}

#[cfg(any(feature = "lz4", feature = "watch"))]
#[inline(always)]
pub(crate) fn advance(&mut self, n: usize) {
// We can use `bytes.advance()` here, but it's slower.
self.cursor += n;
}

#[inline(always)]
pub(crate) fn extend(&mut self, chunk: Bytes) {
if self.cursor == self.bytes.len() {
// Most of the time, we read the next chunk after consuming the previous one.
self.bytes = chunk;
self.cursor = 0;
} else {
// Some bytes are left in the buffer, we need to merge them with the next chunk.
self.extend_slow(chunk);
}
}

#[cold]
#[inline(never)]
fn extend_slow(&mut self, chunk: Bytes) {
let total = self.remaining() + chunk.len();
let mut new_bytes = BytesMut::with_capacity(total);
let capacity = new_bytes.capacity();
new_bytes.extend_from_slice(self.slice());
new_bytes.extend_from_slice(&chunk);
debug_assert_eq!(new_bytes.capacity(), capacity);
self.bytes = new_bytes.freeze();
self.cursor = 0;
}
}

#[test]
fn it_works() {
let mut bytes = BytesExt::default();
assert!(bytes.slice().is_empty());
assert_eq!(bytes.remaining(), 0);

bytes.extend(Bytes::from_static(b"hello"));
assert_eq!(bytes.slice(), b"hello");
assert_eq!(bytes.remaining(), 5);

bytes.advance(3);
assert_eq!(bytes.slice(), b"lo");
assert_eq!(bytes.remaining(), 2);

bytes.extend(Bytes::from_static(b"l"));
assert_eq!(bytes.slice(), b"lol");
assert_eq!(bytes.remaining(), 3);

bytes.set_remaining(1);
assert_eq!(bytes.slice(), b"l");
assert_eq!(bytes.remaining(), 1);
}
Loading

0 comments on commit dd5f3e5

Please sign in to comment.