Skip to content

Commit ea9211b

Browse files
authored
Merge pull request #40 from marshallpierce/deser-varint
Deserialize optimizations
2 parents 15e4654 + 69a37a5 commit ea9211b

File tree

5 files changed

+453
-127
lines changed

5 files changed

+453
-127
lines changed

src/lib.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ extern crate num;
155155
use std::borrow::Borrow;
156156
use std::cmp;
157157
use std::ops::{Index, IndexMut, AddAssign, SubAssign};
158+
use num::ToPrimitive;
158159

159160
use iterators::HistogramIterator;
160161

@@ -736,9 +737,9 @@ impl<T: Counter> Histogram<T> {
736737
};
737738

738739
// determine exponent range needed to support the trackable value with no overflow:
739-
let len = h.cover(high);
740-
741-
h.alloc(len as usize);
740+
// TODO usize safety
741+
let len = h.cover(high).to_usize().unwrap();
742+
h.counts.resize(len, T::zero());
742743
Ok(h)
743744
}
744745

@@ -748,20 +749,15 @@ impl<T: Counter> Histogram<T> {
748749
let mut h = Self::new_with_bounds(source.lowest_discernible_value,
749750
source.highest_trackable_value,
750751
source.significant_value_digits)
751-
.unwrap();
752+
.expect("Using another histogram's parameters failed");
752753

753754
// h.start_time = source.start_time;
754755
// h.end_time = source.end_time;
755756
h.auto_resize = source.auto_resize;
756-
h.alloc(source.len());
757+
h.counts.resize(source.len(), T::zero());
757758
h
758759
}
759760

760-
/// Allocate a counts array of the given size.
761-
fn alloc(&mut self, len: usize) {
762-
self.counts = std::iter::repeat(T::zero()).take(len).collect();
763-
}
764-
765761
// ********************************************************************************************
766762
// Recording samples.
767763
// ********************************************************************************************

src/serialization/benchmarks.rs

Lines changed: 65 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,64 @@ extern crate rand;
22
extern crate test;
33

44
use super::v2_serializer::varint_write;
5-
use super::deserializer::varint_read;
5+
use super::deserializer::{varint_read, varint_read_slice};
66
use std::io::Cursor;
7-
use self::rand::Rng;
7+
use self::rand::distributions::range::Range;
8+
use self::rand::distributions::IndependentSample;
89
use self::test::Bencher;
910

1011
#[bench]
11-
fn varint_write_rand_1_k(b: &mut Bencher) {
12-
do_varint_write_rand(b, 1000)
12+
fn varint_write_rand(b: &mut Bencher) {
13+
do_varint_write_rand(b, Range::new(0, u64::max_value()))
1314
}
1415

1516
#[bench]
16-
fn varint_write_rand_1_m(b: &mut Bencher) {
17-
do_varint_write_rand(b, 1000_000)
17+
fn varint_write_rand_1_byte(b: &mut Bencher) {
18+
do_varint_write_rand(b, Range::new(0, 128))
1819
}
1920

2021
#[bench]
21-
fn varint_read_rand_1_k(b: &mut Bencher) {
22-
do_varint_read_rand(b, 1000)
22+
fn varint_write_rand_9_bytes(b: &mut Bencher) {
23+
do_varint_write_rand(b, Range::new(1 << 56, u64::max_value()))
2324
}
2425

2526
#[bench]
26-
fn varint_read_rand_1_m(b: &mut Bencher) {
27-
do_varint_read_rand(b, 1000_000)
27+
fn varint_read_rand(b: &mut Bencher) {
28+
do_varint_read_rand(b, Range::new(0, u64::max_value()))
2829
}
2930

30-
fn do_varint_write_rand(b: &mut Bencher, num: usize) {
31-
let mut rng = rand::weak_rng();
31+
#[bench]
32+
fn varint_read_rand_1_byte(b: &mut Bencher) {
33+
do_varint_read_rand(b, Range::new(0, 128))
34+
}
35+
36+
#[bench]
37+
fn varint_read_rand_9_byte(b: &mut Bencher) {
38+
do_varint_read_rand(b, Range::new(1 << 56, u64::max_value()))
39+
}
40+
41+
#[bench]
42+
fn varint_read_slice_rand(b: &mut Bencher) {
43+
do_varint_read_slice_rand(b, Range::new(0, u64::max_value()))
44+
}
45+
46+
#[bench]
47+
fn varint_read_slice_rand_1_byte(b: &mut Bencher) {
48+
do_varint_read_slice_rand(b, Range::new(0, 128))
49+
}
3250

51+
#[bench]
52+
fn varint_read_slice_rand_9_byte(b: &mut Bencher) {
53+
do_varint_read_slice_rand(b, Range::new(1 << 56, u64::max_value()))
54+
}
55+
56+
fn do_varint_write_rand(b: &mut Bencher, range: Range<u64>) {
57+
let mut rng = rand::weak_rng();
58+
let num = 1000_000;
3359
let mut vec: Vec<u64> = Vec::new();
3460

3561
for _ in 0..num {
36-
vec.push(rng.gen());
62+
vec.push(range.ind_sample(&mut rng));
3763
}
3864

3965
let mut buf = [0; 9];
@@ -44,15 +70,15 @@ fn do_varint_write_rand(b: &mut Bencher, num: usize) {
4470
});
4571
}
4672

47-
fn do_varint_read_rand(b: &mut Bencher, num: usize) {
73+
fn do_varint_read_rand(b: &mut Bencher, range: Range<u64>) {
4874
let mut rng = rand::weak_rng();
49-
75+
let num = 1000_000;
5076
let mut vec = Vec::new();
5177
vec.resize(9 * num, 0);
5278
let mut bytes_written = 0;
5379

5480
for _ in 0..num {
55-
bytes_written += varint_write(rng.gen(), &mut vec[bytes_written..]);
81+
bytes_written += varint_write(range.ind_sample(&mut rng), &mut vec[bytes_written..]);
5682
}
5783

5884
b.iter(|| {
@@ -62,3 +88,26 @@ fn do_varint_read_rand(b: &mut Bencher, num: usize) {
6288
}
6389
});
6490
}
91+
92+
fn do_varint_read_slice_rand(b: &mut Bencher, range: Range<u64>) {
93+
let mut rng = rand::weak_rng();
94+
let num = 1000_000;
95+
let mut vec = Vec::new();
96+
97+
vec.resize(9 * num, 0);
98+
let mut bytes_written = 0;
99+
100+
for _ in 0..num {
101+
bytes_written += varint_write(range.ind_sample(&mut rng), &mut vec[bytes_written..]);
102+
}
103+
104+
b.iter(|| {
105+
let mut input_index = 0;
106+
// cheat a little bit: this will skip the last couple numbers, but that's why we do a
107+
// million numbers. Losing the last few won't be measurable.
108+
while input_index < bytes_written - 9 {
109+
let (_, bytes_read) = varint_read_slice(&vec[input_index..(input_index + 9)]);
110+
input_index += bytes_read;
111+
}
112+
});
113+
}

src/serialization/deserializer.rs

Lines changed: 122 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use super::V2_COOKIE;
22
use super::super::{Counter, Histogram, RestatState};
33
use super::super::num::ToPrimitive;
44
use std::io::{self, Cursor, ErrorKind, Read};
5+
use std::marker::PhantomData;
56
use std;
67
use super::byteorder::{BigEndian, ReadBytesExt};
78

@@ -54,8 +55,6 @@ impl Deserializer {
5455
/// bytes already in slice or `Vec` form.
5556
pub fn deserialize<T: Counter, R: Read>(&mut self, reader: &mut R)
5657
-> Result<Histogram<T>, DeserializeError> {
57-
// TODO benchmark minimizing read calls by reading into a fixed-size header buffer
58-
5958
let cookie = reader.read_u32::<BigEndian>()?;
6059

6160
if cookie != V2_COOKIE {
@@ -87,31 +86,30 @@ impl Deserializer {
8786
let mut payload_slice = &mut self.payload_buf[0..payload_len];
8887
reader.read_exact(&mut payload_slice)?;
8988

90-
let mut cursor = Cursor::new(&payload_slice);
91-
let mut dest_index: usize = 0;
89+
let mut payload_index: usize = 0;
9290
let mut restat_state = RestatState::new();
93-
while cursor.position() < payload_slice.len() as u64 {
94-
let num = zig_zag_decode(varint_read(&mut cursor)?);
95-
96-
if num < 0 {
97-
let zero_count = (-num).to_usize()
98-
.ok_or(DeserializeError::UsizeTypeTooSmall)?;
99-
// skip the zeros
100-
dest_index = dest_index.checked_add(zero_count)
101-
.ok_or(DeserializeError::UsizeTypeTooSmall)?;
102-
continue;
103-
} else {
104-
let count: T = T::from_i64(num)
105-
.ok_or(DeserializeError::UnsuitableCounterType)?;
106-
107-
h.set_count_at_index(dest_index, count)
108-
.map_err(|_| DeserializeError::EncodedArrayTooLong)?;
109-
110-
restat_state.on_nonzero_count(dest_index, count);
111-
112-
dest_index = dest_index.checked_add(1)
113-
.ok_or(DeserializeError::UsizeTypeTooSmall)?;
114-
}
91+
let mut decode_state = DecodeLoopState::new();
92+
93+
while payload_index < payload_len.saturating_sub(9) {
94+
// Read with fast loop until we are within 9 of the end. Fast loop can't handle EOF,
95+
// so bail to slow version for the last few bytes.
96+
97+
let (zz_num, bytes_read) = varint_read_slice(
98+
&payload_slice[payload_index..(payload_index + 9)]);
99+
payload_index += bytes_read;
100+
101+
let count_or_zeros = zig_zag_decode(zz_num);
102+
103+
decode_state.on_decoded_num(count_or_zeros, &mut restat_state, &mut h)?;
104+
}
105+
106+
// Now read the leftovers
107+
let leftover_slice = &payload_slice[payload_index..];
108+
let mut cursor = Cursor::new(&leftover_slice);
109+
while cursor.position() < leftover_slice.len() as u64 {
110+
let count_or_zeros = zig_zag_decode(varint_read(&mut cursor)?);
111+
112+
decode_state.on_decoded_num(count_or_zeros, &mut restat_state, &mut h)?;
115113
}
116114

117115
restat_state.update_histogram(&mut h);
@@ -120,6 +118,62 @@ impl Deserializer {
120118
}
121119
}
122120

121+
// Only public for testing.
122+
/// Read from a slice that must be 9 bytes long or longer. Returns the decoded number and how many
123+
/// bytes were consumed.
124+
#[inline]
125+
pub fn varint_read_slice(slice: &[u8]) -> (u64, usize) {
126+
let mut b = slice[0];
127+
128+
// take low 7 bits
129+
let mut value: u64 = low_7_bits(b);
130+
if !is_high_bit_set(b) {
131+
return (value, 1);
132+
}
133+
// high bit set, keep reading
134+
b = slice[1];
135+
value |= low_7_bits(b) << 7;
136+
if !is_high_bit_set(b) {
137+
return (value, 2);
138+
}
139+
b = slice[2];
140+
value |= low_7_bits(b) << 7 * 2;
141+
if !is_high_bit_set(b) {
142+
return (value, 3);
143+
}
144+
b = slice[3];
145+
value |= low_7_bits(b) << 7 * 3;
146+
if !is_high_bit_set(b) {
147+
return (value, 4);
148+
}
149+
b = slice[4];
150+
value |= low_7_bits(b) << 7 * 4;
151+
if !is_high_bit_set(b) {
152+
return (value, 5);
153+
}
154+
b = slice[5];
155+
value |= low_7_bits(b) << 7 * 5;
156+
if !is_high_bit_set(b) {
157+
return (value, 6);
158+
}
159+
b = slice[6];
160+
value |= low_7_bits(b) << 7 * 6;
161+
if !is_high_bit_set(b) {
162+
return (value, 7);
163+
}
164+
b = slice[7];
165+
value |= low_7_bits(b) << 7 * 7;
166+
if !is_high_bit_set(b) {
167+
return (value, 8);
168+
}
169+
170+
b = slice[8];
171+
// special case: use last byte as is
172+
value |= (b as u64) << 7 * 8;
173+
174+
(value, 9)
175+
}
176+
123177
// Only public for testing.
124178
/// Read a LEB128-64b9B from the buffer
125179
pub fn varint_read<R: Read>(reader: &mut R) -> io::Result<u64> {
@@ -174,7 +228,6 @@ fn low_7_bits(b: u8) -> u64 {
174228

175229
#[inline]
176230
fn is_high_bit_set(b: u8) -> bool {
177-
// TODO benchmark leading zeros rather than masking
178231
(b & 0x80) != 0
179232
}
180233

@@ -183,3 +236,45 @@ fn is_high_bit_set(b: u8) -> bool {
183236
pub fn zig_zag_decode(encoded: u64) -> i64 {
184237
((encoded >> 1) as i64) ^ -((encoded & 1) as i64)
185238
}
239+
240+
/// We need to perform the same logic in two different decode loops while carrying over a modicum
241+
/// of state.
242+
struct DecodeLoopState<T: Counter> {
243+
dest_index:usize,
244+
phantom: PhantomData<T>
245+
}
246+
247+
impl <T: Counter> DecodeLoopState<T> {
248+
249+
fn new() -> DecodeLoopState<T> {
250+
DecodeLoopState {
251+
dest_index: 0,
252+
phantom: PhantomData
253+
}
254+
}
255+
256+
#[inline]
257+
fn on_decoded_num(&mut self, count_or_zeros: i64, restat_state: &mut RestatState<T>,
258+
h: &mut Histogram<T>) -> Result<(), DeserializeError> {
259+
if count_or_zeros < 0 {
260+
let zero_count = (-count_or_zeros).to_usize()
261+
.ok_or(DeserializeError::UsizeTypeTooSmall)?;
262+
// skip the zeros
263+
self.dest_index = self.dest_index.checked_add(zero_count)
264+
.ok_or(DeserializeError::UsizeTypeTooSmall)?;
265+
} else {
266+
let count: T = T::from_i64(count_or_zeros)
267+
.ok_or(DeserializeError::UnsuitableCounterType)?;
268+
269+
h.set_count_at_index(self.dest_index, count)
270+
.map_err(|_| DeserializeError::EncodedArrayTooLong)?;
271+
272+
restat_state.on_nonzero_count(self.dest_index, count);
273+
274+
self.dest_index = self.dest_index.checked_add(1)
275+
.ok_or(DeserializeError::UsizeTypeTooSmall)?;
276+
}
277+
278+
Ok(())
279+
}
280+
}

0 commit comments

Comments
 (0)