Skip to content

Commit 1d646b8

Browse files
skyzhwangrunji0408
andauthored
feat(storage): support vector block (#871)
We added a new block type that stores fixed-size chunk data. The in-memory array type is added in #869. #864 --------- Signed-off-by: Alex Chi Z <[email protected]> Signed-off-by: Runji Wang <[email protected]> Signed-off-by: Alex Chi <[email protected]> Co-authored-by: Runji Wang <[email protected]>
1 parent 210c770 commit 1d646b8

15 files changed

+632
-17
lines changed

src/array/var_array.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
use std::borrow::Borrow;
44
use std::fmt::{Display, Write};
5+
use std::hash::Hash;
56
use std::marker::PhantomData;
67
use std::mem;
78

@@ -20,7 +21,10 @@ pub struct VarArray<T: ValueRef<U> + ?Sized, U: PrimitiveValueType = u8> {
2021
_type: PhantomData<T>,
2122
}
2223

23-
pub trait PrimitiveValueType: Send + Sync + 'static + Copy + Clone + Default {}
24+
pub trait PrimitiveValueType:
25+
Send + Sync + 'static + Copy + Clone + Default + PartialEq + Eq + Hash
26+
{
27+
}
2428

2529
impl PrimitiveValueType for u8 {}
2630
impl PrimitiveValueType for F64 {}

src/storage/secondary/block.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ mod primitive_block_builder;
1616
mod primitive_block_iterator;
1717
mod rle_block_builder;
1818
mod rle_block_iterator;
19+
mod vector_block_builder;
20+
mod vector_block_iterator;
1921

2022
use bitvec::prelude::{BitVec, Lsb0};
2123
pub use blob_block_builder::*;
@@ -38,6 +40,8 @@ pub use block_index_builder::*;
3840
use bytes::{Buf, BufMut, Bytes};
3941
use risinglight_proto::rowset::block_checksum::ChecksumType;
4042
use risinglight_proto::rowset::block_index::BlockType;
43+
pub use vector_block_builder::*;
44+
pub use vector_block_iterator::*;
4145

4246
use super::StorageResult;
4347
use crate::array::Array;
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0.
2+
3+
use bitvec::prelude::{BitVec, Lsb0};
4+
use bytes::BufMut;
5+
use risinglight_proto::rowset::BlockStatistics;
6+
7+
use super::super::statistics::StatisticsBuilder;
8+
use super::{BlockBuilder, NonNullableBlockBuilder};
9+
use crate::array::VectorArray;
10+
use crate::types::VectorRef;
11+
12+
/// Encodes fixed-chunk data into a block. The data layout is
13+
/// ```plain
14+
/// | data | data | data | element_size |
15+
/// ```
16+
/// The `element_size` is the size for each vector element, and the data is aligned to the
17+
/// `element_size`. The length of each element is `element_size * std::mem::size_of::<f64>()`.
18+
pub struct PlainVectorBlockBuilder {
19+
data: Vec<u8>,
20+
element_size: Option<usize>,
21+
target_size: usize,
22+
}
23+
24+
impl PlainVectorBlockBuilder {
25+
pub fn new(target_size: usize) -> Self {
26+
let data = Vec::with_capacity(target_size);
27+
Self {
28+
data,
29+
element_size: None,
30+
target_size,
31+
}
32+
}
33+
}
34+
35+
impl PlainVectorBlockBuilder {
36+
fn update_element_size(&mut self, new_element_size: usize) {
37+
if let Some(element_size) = self.element_size {
38+
assert_eq!(element_size, new_element_size);
39+
}
40+
self.element_size = Some(new_element_size);
41+
}
42+
}
43+
44+
impl NonNullableBlockBuilder<VectorArray> for PlainVectorBlockBuilder {
45+
fn append_value(&mut self, item: &VectorRef) {
46+
for i in item.iter() {
47+
self.data.extend_from_slice(&i.to_le_bytes());
48+
}
49+
self.update_element_size(item.len());
50+
}
51+
52+
fn append_default(&mut self) {
53+
panic!("PlainVectorBlockBuilder does not support append_default");
54+
}
55+
56+
fn get_statistics_with_bitmap(&self, selection: &BitVec<u8, Lsb0>) -> Vec<BlockStatistics> {
57+
let selection_empty = selection.is_empty();
58+
let mut stats_builder = StatisticsBuilder::new();
59+
let element_size = self.element_size.unwrap();
60+
let item_cnt = self.data.len() / element_size / std::mem::size_of::<f64>();
61+
for idx in 0..item_cnt {
62+
let begin_pos = idx * element_size * std::mem::size_of::<f64>();
63+
let end_pos = begin_pos + element_size * std::mem::size_of::<f64>();
64+
65+
if selection_empty || selection[idx] {
66+
stats_builder.add_item(Some(&self.data[begin_pos..end_pos]));
67+
}
68+
}
69+
stats_builder.get_statistics()
70+
}
71+
72+
fn estimated_size_with_next_item(&self, next_item: &Option<&VectorRef>) -> usize {
73+
self.estimated_size()
74+
+ next_item
75+
.map(|x| x.len() * std::mem::size_of::<f64>())
76+
.unwrap_or(0)
77+
+ std::mem::size_of::<u32>()
78+
}
79+
80+
fn is_empty(&self) -> bool {
81+
self.data.is_empty()
82+
}
83+
}
84+
85+
impl BlockBuilder<VectorArray> for PlainVectorBlockBuilder {
86+
fn append(&mut self, item: Option<&VectorRef>) {
87+
match item {
88+
Some(item) => {
89+
self.append_value(item);
90+
}
91+
None => {
92+
self.append_default();
93+
}
94+
}
95+
}
96+
97+
fn estimated_size(&self) -> usize {
98+
self.data.len() + std::mem::size_of::<u32>() // element_size
99+
}
100+
101+
fn should_finish(&self, next_item: &Option<&VectorRef>) -> bool {
102+
!self.is_empty() && self.estimated_size_with_next_item(next_item) > self.target_size
103+
}
104+
105+
fn get_statistics(&self) -> Vec<BlockStatistics> {
106+
self.get_statistics_with_bitmap(&BitVec::new())
107+
}
108+
109+
fn finish(self) -> Vec<u8> {
110+
let mut encoded_data = vec![];
111+
encoded_data.extend(self.data);
112+
encoded_data.put_u32(self.element_size.unwrap() as u32); // so that we can likely get vectors aligned
113+
encoded_data
114+
}
115+
116+
fn get_target_size(&self) -> usize {
117+
self.target_size
118+
}
119+
}
120+
121+
#[cfg(test)]
122+
mod tests {
123+
use super::*;
124+
125+
#[test]
126+
fn test_build_vector() {
127+
let mut builder = PlainVectorBlockBuilder::new(128);
128+
builder.append(Some(VectorRef::new(&[1.0.into(), 2.0.into(), 3.0.into()])));
129+
builder.append(Some(VectorRef::new(&[4.0.into(), 5.0.into(), 6.0.into()])));
130+
builder.append_value(VectorRef::new(&[7.0.into(), 8.0.into(), 9.0.into()]));
131+
assert_eq!(builder.estimated_size(), 3 * 3 * 8 + 4);
132+
assert!(!builder.should_finish(&Some(VectorRef::new(&[
133+
10.0.into(),
134+
11.0.into(),
135+
12.0.into()
136+
]))));
137+
builder.finish();
138+
}
139+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0.
2+
3+
use bytes::Buf;
4+
5+
use super::{Block, BlockIterator, NonNullableBlockIterator};
6+
use crate::array::{ArrayBuilder, VectorArray, VectorArrayBuilder};
7+
use crate::types::{VectorRef, F64};
8+
9+
/// Scans one or several arrays from the block content.
10+
pub struct PlainVectorBlockIterator {
11+
/// Block content
12+
block: Block,
13+
14+
/// Total count of elements in block
15+
row_count: usize,
16+
17+
/// Indicates the beginning row of the next batch
18+
next_row: usize,
19+
20+
/// Fixed-size buffer for vector data
21+
vec_buffer: Vec<F64>,
22+
23+
/// The size for each vector element
24+
element_size: usize,
25+
}
26+
27+
impl PlainVectorBlockIterator {
28+
pub fn new(block: Block, row_count: usize) -> Self {
29+
let element_size =
30+
(&block[block.len() - std::mem::size_of::<u32>()..block.len()]).get_u32() as usize;
31+
32+
Self {
33+
block,
34+
row_count,
35+
next_row: 0,
36+
vec_buffer: Vec::new(),
37+
element_size,
38+
}
39+
}
40+
}
41+
42+
impl NonNullableBlockIterator<VectorArray> for PlainVectorBlockIterator {
43+
fn next_batch_non_null(
44+
&mut self,
45+
expected_size: Option<usize>,
46+
builder: &mut VectorArrayBuilder,
47+
) -> usize {
48+
if self.next_row >= self.row_count {
49+
return 0;
50+
}
51+
52+
// TODO(chi): error handling on corrupted block
53+
54+
let mut cnt = 0;
55+
let data_buffer = &self.block[..];
56+
57+
loop {
58+
if let Some(expected_size) = expected_size {
59+
assert!(expected_size > 0);
60+
if cnt >= expected_size {
61+
break;
62+
}
63+
}
64+
65+
if self.next_row >= self.row_count {
66+
break;
67+
}
68+
69+
let from = self.next_row * self.element_size * std::mem::size_of::<f64>();
70+
let to = from + self.element_size * std::mem::size_of::<f64>();
71+
assert!((to - from) % std::mem::size_of::<f64>() == 0);
72+
self.vec_buffer.clear();
73+
self.vec_buffer
74+
.reserve(self.element_size * std::mem::size_of::<f64>());
75+
let mut buf = &data_buffer[from..to];
76+
for _ in 0..self.element_size {
77+
self.vec_buffer.push(F64::from(buf.get_f64_le()));
78+
}
79+
builder.push(Some(VectorRef::new(&self.vec_buffer)));
80+
81+
cnt += 1;
82+
self.next_row += 1;
83+
}
84+
85+
cnt
86+
}
87+
}
88+
89+
impl BlockIterator<VectorArray> for PlainVectorBlockIterator {
90+
fn next_batch(
91+
&mut self,
92+
expected_size: Option<usize>,
93+
builder: &mut VectorArrayBuilder,
94+
) -> usize {
95+
self.next_batch_non_null(expected_size, builder)
96+
}
97+
98+
fn skip(&mut self, cnt: usize) {
99+
self.next_row += cnt;
100+
}
101+
102+
fn remaining_items(&self) -> usize {
103+
self.row_count - self.next_row
104+
}
105+
}
106+
107+
#[cfg(test)]
108+
mod tests {
109+
use bytes::Bytes;
110+
111+
use super::*;
112+
use crate::array::{ArrayBuilder, ArrayToVecExt, VectorArrayBuilder};
113+
use crate::storage::secondary::block::{BlockBuilder, PlainVectorBlockBuilder};
114+
use crate::storage::secondary::BlockIterator;
115+
use crate::types::Vector;
116+
117+
#[test]
118+
fn test_scan_vector() {
119+
let mut builder = PlainVectorBlockBuilder::new(128);
120+
let input = [
121+
Some(Vector::new(vec![1.0, 2.0, 3.0])),
122+
Some(Vector::new(vec![4.0, 5.0, 6.0])),
123+
Some(Vector::new(vec![7.0, 8.0, 9.0])),
124+
];
125+
126+
input
127+
.iter()
128+
.for_each(|v| builder.append(v.as_ref().map(|v| v.as_ref())));
129+
let data = builder.finish();
130+
131+
let mut scanner = PlainVectorBlockIterator::new(Bytes::from(data), 3);
132+
133+
let mut builder = VectorArrayBuilder::new();
134+
135+
scanner.skip(1);
136+
assert_eq!(scanner.remaining_items(), 2);
137+
138+
assert_eq!(scanner.next_batch(Some(1), &mut builder), 1);
139+
assert_eq!(
140+
builder.finish().to_vec(),
141+
vec![Some(
142+
VectorRef::new(&[F64::from(4.0), F64::from(5.0), F64::from(6.0)]).to_vector()
143+
)]
144+
);
145+
146+
let mut builder = VectorArrayBuilder::new();
147+
assert_eq!(scanner.next_batch(Some(2), &mut builder), 1);
148+
149+
assert_eq!(
150+
builder.finish().to_vec(),
151+
vec![Some(
152+
VectorRef::new(&[F64::from(7.0), F64::from(8.0), F64::from(9.0)]).to_vector()
153+
)]
154+
);
155+
156+
let mut builder = VectorArrayBuilder::new();
157+
assert_eq!(scanner.next_batch(None, &mut builder), 0);
158+
}
159+
}

src/storage/secondary/column.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,34 @@
1010
mod blob_column_builder;
1111
mod blob_column_factory;
1212
mod char_column_builder;
13+
mod char_column_factory;
1314
mod column_builder;
1415
mod column_iterator;
1516
mod concrete_column_iterator;
1617
mod primitive_column_builder;
1718
mod primitive_column_factory;
1819
mod row_handler_column_iterator;
20+
mod vector_column_builder;
21+
mod vector_column_factory;
1922

2023
use std::future::Future;
2124
use std::io::{Read, Seek, SeekFrom};
25+
use std::os::unix::fs::FileExt;
26+
use std::sync::{Arc, Mutex};
2227

2328
pub use blob_column_factory::*;
29+
use bytes::Bytes;
30+
pub use char_column_factory::*;
2431
pub use column_builder::*;
2532
pub use column_iterator::*;
2633
pub use concrete_column_iterator::*;
34+
use moka::future::Cache;
2735
pub use primitive_column_builder::*;
2836
pub use primitive_column_factory::*;
2937
use risinglight_proto::rowset::BlockIndex;
3038
pub use row_handler_column_iterator::*;
31-
mod char_column_factory;
32-
use std::os::unix::fs::FileExt;
33-
use std::sync::{Arc, Mutex};
34-
35-
use bytes::Bytes;
36-
pub use char_column_factory::*;
37-
use moka::future::Cache;
39+
pub use vector_column_builder::*;
40+
pub use vector_column_factory::*;
3841

3942
use super::block::BLOCK_META_CHECKSUM_SIZE;
4043
use super::{Block, BlockCacheKey, BlockMeta, ColumnIndex, BLOCK_META_SIZE};

0 commit comments

Comments
 (0)