Skip to content

Commit b6b2b41

Browse files
committed
fix
1 parent 220d082 commit b6b2b41

File tree

3 files changed

+100
-33
lines changed

3 files changed

+100
-33
lines changed

Diff for: src/query/storages/fuse/src/io/write/stream/block_builder.rs

+85-4
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,31 @@
1414

1515
use std::collections::BTreeMap;
1616
use std::collections::HashMap;
17+
use std::collections::HashSet;
1718
use std::mem;
1819
use std::sync::Arc;
1920

2021
use chrono::Utc;
22+
use databend_common_catalog::table::Table;
2123
use databend_common_catalog::table_context::TableContext;
2224
use databend_common_exception::Result;
25+
use databend_common_expression::types::DataType;
2326
use databend_common_expression::Column;
2427
use databend_common_expression::ColumnId;
28+
use databend_common_expression::ComputedExpr;
2529
use databend_common_expression::DataBlock;
2630
use databend_common_expression::FieldIndex;
2731
use databend_common_expression::TableField;
2832
use databend_common_expression::TableSchema;
2933
use databend_common_expression::TableSchemaRef;
34+
use databend_common_expression::Value;
35+
use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID;
3036
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
3137
use databend_common_native::write::NativeWriter;
38+
use databend_storages_common_index::BloomIndex;
3239
use databend_storages_common_index::BloomIndexBuilder;
40+
use databend_storages_common_index::Index;
41+
use databend_storages_common_index::RangeIndex;
3342
use databend_storages_common_table_meta::meta::BlockMeta;
3443
use databend_storages_common_table_meta::meta::ColumnMeta;
3544
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
@@ -39,6 +48,7 @@ use parquet::basic::Encoding;
3948
use parquet::file::properties::EnabledStatistics;
4049
use parquet::file::properties::WriterProperties;
4150

51+
use crate::io::create_inverted_index_builders;
4252
use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder;
4353
use crate::io::write::stream::cluster_statistics::ClusterStatisticsState;
4454
use crate::io::write::stream::column_statistics::ColumnStatisticsState;
@@ -51,6 +61,7 @@ use crate::io::TableMetaLocationGenerator;
5161
use crate::io::WriteSettings;
5262
use crate::operations::column_parquet_metas;
5363
use crate::FuseStorageFormat;
64+
use crate::FuseTable;
5465

5566
pub enum BlockWriterImpl {
5667
Arrow(ArrowWriter<Vec<u8>>),
@@ -215,7 +226,7 @@ impl StreamBlockBuilder {
215226
})
216227
}
217228

218-
pub fn write(&mut self, block: DataBlock, schema: &TableSchemaRef) -> Result<()> {
229+
pub fn write(&mut self, block: DataBlock) -> Result<()> {
219230
if block.is_empty() {
220231
return Ok(());
221232
}
@@ -225,15 +236,17 @@ impl StreamBlockBuilder {
225236
}
226237

227238
let block = self.cluster_stats_state.add_block(block)?;
228-
self.column_stats_state.add_block(schema, &block)?;
239+
self.column_stats_state
240+
.add_block(&self.properties.source_schema, &block)?;
229241
self.bloom_index_builder.add_block(&block)?;
230242
for writer in self.inverted_index_writers.iter_mut() {
231-
writer.add_block(schema, &block)?;
243+
writer.add_block(&self.properties.source_schema, &block)?;
232244
}
233245

234246
self.row_count += block.num_rows();
235247
self.block_size += block.estimate_block_size();
236-
self.block_writer.write(block, schema)?;
248+
self.block_writer
249+
.write(block, &self.properties.source_schema)?;
237250
Ok(())
238251
}
239252

@@ -325,3 +338,71 @@ pub struct StreamBlockProperties {
325338
inverted_index_builders: Vec<InvertedIndexBuilder>,
326339
table_meta_timestamps: TableMetaTimestamps,
327340
}
341+
342+
impl StreamBlockProperties {
343+
pub fn try_create(
344+
ctx: Arc<dyn TableContext>,
345+
table: &FuseTable,
346+
table_meta_timestamps: TableMetaTimestamps,
347+
do_append: bool,
348+
) -> Result<Arc<Self>> {
349+
// remove virtual computed fields.
350+
let mut fields = table
351+
.schema()
352+
.fields()
353+
.iter()
354+
.filter(|f| !matches!(f.computed_expr(), Some(ComputedExpr::Virtual(_))))
355+
.cloned()
356+
.collect::<Vec<_>>();
357+
if !do_append {
358+
// add stream fields.
359+
for stream_column in table.stream_columns().iter() {
360+
fields.push(stream_column.table_field());
361+
}
362+
}
363+
let source_schema = Arc::new(TableSchema {
364+
fields,
365+
..table.schema().as_ref().clone()
366+
});
367+
368+
let bloom_columns_map = table
369+
.bloom_index_cols
370+
.bloom_index_fields(source_schema.clone(), BloomIndex::supported_type)?;
371+
let bloom_column_ids = bloom_columns_map
372+
.values()
373+
.map(|v| v.column_id())
374+
.collect::<HashSet<_>>();
375+
376+
let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta);
377+
378+
let cluster_stats_builder = ClusterStatisticsBuilder::try_create(table, ctx.clone())?;
379+
380+
let mut stats_columns = vec![];
381+
let mut distinct_columns = vec![];
382+
let leaf_fields = source_schema.leaf_fields();
383+
for field in leaf_fields.iter() {
384+
let column_id = field.column_id();
385+
if RangeIndex::supported_type(&DataType::from(field.data_type()))
386+
&& column_id != ORIGIN_BLOCK_ROW_NUM_COLUMN_ID
387+
{
388+
stats_columns.push(column_id);
389+
if !bloom_column_ids.contains(&column_id) {
390+
distinct_columns.push(column_id);
391+
}
392+
}
393+
}
394+
395+
Ok(Arc::new(StreamBlockProperties {
396+
ctx,
397+
meta_locations: table.meta_location_generator().clone(),
398+
source_schema,
399+
write_settings: table.get_write_settings(),
400+
cluster_stats_builder,
401+
stats_columns,
402+
distinct_columns,
403+
bloom_columns_map,
404+
inverted_index_builders,
405+
table_meta_timestamps,
406+
}))
407+
}
408+
}

Diff for: src/query/storages/fuse/src/io/write/stream/column_statistics.rs

-3
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ impl ColumnStatisticsState {
6969
let rows = data_block.num_rows();
7070
let leaves = traverse_values_dfs(data_block.columns(), schema.fields())?;
7171
for (column_id, col, data_type) in leaves {
72-
if !self.col_stats.contains_key(&column_id) {
73-
continue;
74-
}
7572
match col {
7673
Value::Scalar(s) => {
7774
let unset_bits = if s == Scalar::Null { rows } else { 0 };

Diff for: src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs

+15-26
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use parquet::basic::ZstdLevel;
4242
use parquet::file::properties::EnabledStatistics;
4343
use parquet::file::properties::WriterProperties;
4444
use parquet::file::properties::WriterVersion;
45-
45+
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
4646
use crate::io::create_inverted_index_builders;
4747
use crate::io::StreamBlockBuilder;
4848
use crate::io::StreamBlockProperties;
@@ -66,33 +66,22 @@ impl TransformBlockWriter {
6666
output: Arc<OutputPort>,
6767
table: &FuseTable,
6868
kind: MutationKind,
69+
table_meta_timestamps: TableMetaTimestamps,
70+
with_tid: bool,
6971
) -> Result<ProcessorPtr> {
70-
// remove virtual computed fields.
71-
let mut fields = table
72-
.schema()
73-
.fields()
74-
.iter()
75-
.filter(|f| !matches!(f.computed_expr(), Some(ComputedExpr::Virtual(_))))
76-
.cloned()
77-
.collect::<Vec<_>>();
78-
if !matches!(kind, MutationKind::Insert | MutationKind::Replace) {
79-
// add stream fields.
80-
for stream_column in table.stream_columns().iter() {
81-
fields.push(stream_column.table_field());
72+
let do_append = matches!(kind, MutationKind::Insert | MutationKind::Replace);
73+
let properties = StreamBlockProperties::try_create(ctx, table, table_meta_timestamps, do_append)?;
74+
Ok(ProcessorPtr::create(Box::new(
75+
TransformBlockWriter {
76+
input,
77+
output,
78+
properties,
79+
builder: None,
80+
dal: table.get_operator(),
81+
table_id: if with_tid { Some(table.get_id()) } else { None },
82+
kind,
8283
}
83-
}
84-
let source_schema = Arc::new(TableSchema {
85-
fields,
86-
..table.schema().as_ref().clone()
87-
});
88-
89-
let bloom_columns_map = table
90-
.bloom_index_cols
91-
.bloom_index_fields(source_schema.clone(), BloomIndex::supported_type)?;
92-
93-
let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta);
94-
95-
todo!()
84+
)))
9685
}
9786
pub fn reinit_writer(&mut self) -> Result<()> {
9887
Ok(())

0 commit comments

Comments
 (0)