Skip to content

Commit dd2447c

Browse files
authored
feat: block stream write (#17744)
* add block serializer * fix test * add settings enable_block_stream_write * fix test * Optimize handling of tail fragments and block serialization * make lint
1 parent 9b7282a commit dd2447c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1994
-743
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scripts/build/build-pkg.sh

+6-9
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@
33
set -e
44

55
ARCH=x86_64
6-
glibc=`getconf GNU_LIBC_VERSION|awk '{print $2}'`
6+
glibc=$(getconf GNU_LIBC_VERSION | awk '{print $2}')
77
TARGET=${ARCH}-linux-glib$glibc
88
MUSL_RUST_TARGET=${ARCH}-unknown-linux-musl
99
BENSQL_VERSION=0.26.1
1010

11-
12-
1311
RUSTFLAGS="-C target-feature=+sse4.2 -C link-arg=-Wl,--compress-debug-sections=zlib"
1412

1513
echo "==> https_proxy=${https_proxy}"
@@ -21,14 +19,13 @@ mkdir -p distro/{pkg,bin,configs,systemd,scripts}
2119
BENSQL_PKG=bendsql-${MUSL_RUST_TARGET}.tar.gz
2220

2321
if [ ! -f distro/pkg/${BENSQL_PKG} ]; then
24-
echo "==> downloading ${BENSQL_PKG} ..."
25-
curl -L https://github.com/databendlabs/bendsql/releases/download/v${BENSQL_VERSION}/${BENSQL_PKG} -o distro/pkg/${BENSQL_PKG}
22+
echo "==> downloading ${BENSQL_PKG} ..."
23+
curl -L https://github.com/databendlabs/bendsql/releases/download/v${BENSQL_VERSION}/${BENSQL_PKG} -o distro/pkg/${BENSQL_PKG}
2624
fi
2725

2826
tar -xzvf distro/pkg/${BENSQL_PKG} -C distro/bin
2927
distro/bin/bendsql --version
3028

31-
3229
rustup show
3330

3431
DATABEND_VERSION=$(git describe --tags --abbrev=0)
@@ -38,9 +35,9 @@ echo "==> building databend ${DATABEND_VERSION} for ${TARGET} ..."
3835
#cargo clean
3936

4037
cargo build --release \
41-
--bin=databend-query \
42-
--bin=databend-meta \
43-
--bin=databend-metactl
38+
--bin=databend-query \
39+
--bin=databend-meta \
40+
--bin=databend-metactl
4441

4542
# --bin=table-meta-inspector
4643

src/common/native/src/write/writer.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ impl<W: Write> NativeWriter<W> {
7676
}
7777

7878
/// Consumes itself into the inner writer
79-
pub fn into_inner(self) -> W {
80-
self.writer.w
79+
pub fn inner_mut(&mut self) -> &mut W {
80+
&mut self.writer.w
8181
}
8282

8383
/// Writes the header and first (schema) message to the file.

src/query/catalog/src/table_args.rs

-10
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::cmp::Ordering;
1615
use std::collections::HashMap;
1716

1817
use databend_common_exception::ErrorCode;
@@ -132,15 +131,6 @@ pub fn u64_literal(val: u64) -> Scalar {
132131
Scalar::Number(NumberScalar::UInt64(val))
133132
}
134133

135-
pub fn cmp_with_null(v1: &Scalar, v2: &Scalar) -> Ordering {
136-
match (v1.is_null(), v2.is_null()) {
137-
(true, true) => Ordering::Equal,
138-
(true, false) => Ordering::Greater,
139-
(false, true) => Ordering::Less,
140-
(false, false) => v1.cmp(v2),
141-
}
142-
}
143-
144134
pub fn parse_sequence_args(table_args: &TableArgs, func_name: &str) -> Result<String> {
145135
let args = table_args.expect_all_positioned(func_name, Some(1))?;
146136
let sequence = string_value(&args[0])?;

src/query/expression/src/utils/block_thresholds.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl Default for BlockThresholds {
3636
BlockThresholds {
3737
max_rows_per_block: DEFAULT_BLOCK_ROW_COUNT,
3838
min_rows_per_block: (DEFAULT_BLOCK_ROW_COUNT * 4).div_ceil(5),
39-
max_bytes_per_block: DEFAULT_BLOCK_BUFFER_SIZE,
39+
max_bytes_per_block: DEFAULT_BLOCK_BUFFER_SIZE * 2,
4040
min_bytes_per_block: (DEFAULT_BLOCK_BUFFER_SIZE * 4).div_ceil(5),
4141
max_compressed_per_block: DEFAULT_BLOCK_COMPRESSED_SIZE,
4242
min_compressed_per_block: (DEFAULT_BLOCK_COMPRESSED_SIZE * 4).div_ceil(5),
@@ -48,15 +48,15 @@ impl Default for BlockThresholds {
4848
impl BlockThresholds {
4949
pub fn new(
5050
max_rows_per_block: usize,
51-
max_bytes_per_block: usize,
51+
bytes_per_block: usize,
5252
max_compressed_per_block: usize,
5353
block_per_segment: usize,
5454
) -> Self {
5555
BlockThresholds {
5656
max_rows_per_block,
5757
min_rows_per_block: (max_rows_per_block * 4).div_ceil(5),
58-
max_bytes_per_block,
59-
min_bytes_per_block: (max_bytes_per_block * 4).div_ceil(5),
58+
max_bytes_per_block: bytes_per_block * 2,
59+
min_bytes_per_block: (bytes_per_block * 4).div_ceil(5),
6060
max_compressed_per_block,
6161
min_compressed_per_block: (max_compressed_per_block * 4).div_ceil(5),
6262
block_per_segment,

src/query/service/src/pipelines/builders/builder_recluster.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ impl PipelineBuilder {
159159
SortPipelineBuilder::create(self.ctx.clone(), schema, sort_descs.into())?
160160
.with_block_size_hit(sort_block_size)
161161
.remove_order_col_at_last();
162-
sort_pipeline_builder.build_merge_sort_pipeline(&mut self.main_pipeline, false)?;
162+
// Todo(zhyass): Recluster will no longer perform sort in the near future.
163+
sort_pipeline_builder.build_full_sort_pipeline(&mut self.main_pipeline)?;
163164

164165
// Compact after merge sort.
165166
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;

src/query/service/src/test_kits/block_writer.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use databend_common_storages_fuse::io::WriteSettings;
2626
use databend_common_storages_fuse::FuseStorageFormat;
2727
use databend_storages_common_blocks::blocks_to_parquet;
2828
use databend_storages_common_index::BloomIndex;
29+
use databend_storages_common_index::BloomIndexBuilder;
2930
use databend_storages_common_table_meta::meta::BlockMeta;
3031
use databend_storages_common_table_meta::meta::ClusterStatistics;
3132
use databend_storages_common_table_meta::meta::Compression;
@@ -128,12 +129,9 @@ impl<'a> BlockWriter<'a> {
128129
let bloom_index_cols = BloomIndexColumns::All;
129130
let bloom_columns_map =
130131
bloom_index_cols.bloom_index_fields(schema.clone(), BloomIndex::supported_type)?;
131-
let maybe_bloom_index = BloomIndex::try_create(
132-
FunctionContext::default(),
133-
location.1,
134-
block,
135-
bloom_columns_map,
136-
)?;
132+
let mut builder = BloomIndexBuilder::create(FunctionContext::default(), bloom_columns_map);
133+
builder.add_block(block)?;
134+
let maybe_bloom_index = builder.finalize()?;
137135
if let Some(bloom_index) = maybe_bloom_index {
138136
let index_block = bloom_index.serialize_to_data_block()?;
139137
let filter_schema = bloom_index.filter_schema;

src/query/service/tests/it/storages/fuse/statistics.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ fn test_ft_stats_block_stats() -> databend_common_exception::Result<()> {
8484
#[test]
8585
fn test_ft_stats_block_stats_with_column_distinct_count() -> databend_common_exception::Result<()> {
8686
let schema = Arc::new(TableSchema::new(vec![
87-
TableField::new("a", TableDataType::Number(NumberDataType::Int32)),
88-
TableField::new("b", TableDataType::String),
87+
TableField::new_from_column_id("a", TableDataType::Number(NumberDataType::Int32), 0),
88+
TableField::new_from_column_id("b", TableDataType::String, 1),
8989
]));
9090

9191
let block = DataBlock::new_from_columns(vec![

src/query/settings/src/settings_default.rs

+14-8
Original file line numberDiff line numberDiff line change
@@ -1250,14 +1250,20 @@ impl DefaultSettings {
12501250
range: Some(SettingRange::Numeric(0..=1)),
12511251
}),
12521252
// Add this to the HashMap in DefaultSettings::instance()
1253-
("optimizer_skip_list", DefaultSettingValue {
1254-
value: UserSettingValue::String(String::new()),
1255-
desc: "Comma-separated(,) list of optimizer names to skip during query optimization",
1256-
mode: SettingMode::Both,
1257-
scope: SettingScope::Both,
1258-
range: None,
1259-
}),
1260-
1253+
("optimizer_skip_list", DefaultSettingValue {
1254+
value: UserSettingValue::String(String::new()),
1255+
desc: "Comma-separated(,) list of optimizer names to skip during query optimization",
1256+
mode: SettingMode::Both,
1257+
scope: SettingScope::Both,
1258+
range: None,
1259+
}),
1260+
("enable_block_stream_write", DefaultSettingValue {
1261+
value: UserSettingValue::UInt64(0),
1262+
desc: "Enables block stream write",
1263+
mode: SettingMode::Both,
1264+
scope: SettingScope::Both,
1265+
range: Some(SettingRange::Numeric(0..=1)),
1266+
}),
12611267
]);
12621268

12631269
Ok(Arc::new(DefaultSettings {

src/query/settings/src/settings_getter_setter.rs

+4
Original file line numberDiff line numberDiff line change
@@ -932,4 +932,8 @@ impl Settings {
932932
pub fn get_optimizer_skip_list(&self) -> Result<String> {
933933
self.try_get_string("optimizer_skip_list")
934934
}
935+
936+
pub fn get_enable_block_stream_write(&self) -> Result<bool> {
937+
Ok(self.try_get_u64("enable_block_stream_write")? == 1)
938+
}
935939
}

0 commit comments

Comments
 (0)