Skip to content

Commit 200c8f1

Browse files
committed
Optimize handling of tail fragments and block serialization
1 parent 362aedd commit 200c8f1

File tree

1 file changed

+58
-28
lines changed

1 file changed

+58
-28
lines changed

src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs

+58-28
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ use crate::FUSE_OPT_KEY_ROW_PER_BLOCK;
4343
#[allow(clippy::large_enum_variant)]
4444
enum State {
4545
Consume,
46-
Serialize(DataBlock),
46+
Collect(DataBlock),
47+
Serialize,
48+
Finalize,
4749
Flush,
4850
Write(BlockSerialization),
4951
}
@@ -56,6 +58,10 @@ pub struct TransformBlockWriter {
5658
properties: Arc<StreamBlockProperties>,
5759

5860
builder: Option<StreamBlockBuilder>,
61+
need_flush: bool,
62+
input_data_size: usize,
63+
input_num_rows: usize,
64+
5965
dal: Operator,
6066
// Only used in multi table insert
6167
table_id: Option<u64>,
@@ -86,8 +92,11 @@ impl TransformBlockWriter {
8692
properties,
8793
builder: None,
8894
dal: table.get_operator(),
95+
need_flush: false,
8996
table_id: if with_tid { Some(table.get_id()) } else { None },
9097
input_data: VecDeque::new(),
98+
input_data_size: 0,
99+
input_num_rows: 0,
91100
output_data: None,
92101
max_block_size,
93102
})))
@@ -126,12 +135,12 @@ impl Processor for TransformBlockWriter {
126135
}
127136

128137
fn event(&mut self) -> Result<Event> {
129-
if matches!(self.state, State::Serialize { .. } | State::Flush) {
130-
return Ok(Event::Sync);
131-
}
132-
133-
if matches!(self.state, State::Write { .. }) {
134-
return Ok(Event::Async);
138+
match &self.state {
139+
State::Collect(_) | State::Serialize | State::Flush | State::Finalize => {
140+
return Ok(Event::Sync)
141+
}
142+
State::Write(_) => return Ok(Event::Async),
143+
_ => {}
135144
}
136145

137146
if self.output.is_finished() {
@@ -147,59 +156,80 @@ impl Processor for TransformBlockWriter {
147156
return Ok(Event::NeedConsume);
148157
}
149158

150-
if let Some(block) = self.input_data.pop_front() {
151-
self.state = State::Serialize(block);
159+
// To avoid tail fragments, flush only when the input is large enough.
160+
if self.need_flush
161+
&& self
162+
.properties
163+
.block_thresholds
164+
.check_large_enough(self.input_num_rows, self.input_data_size)
165+
{
166+
self.state = State::Flush;
167+
return Ok(Event::Sync);
168+
}
169+
170+
if !self.need_flush && !self.input_data.is_empty() {
171+
self.state = State::Serialize;
172+
return Ok(Event::Sync);
173+
}
174+
175+
if self.input.has_data() {
176+
let input_data = self.input.pull_data().unwrap()?;
177+
self.state = State::Collect(input_data);
152178
return Ok(Event::Sync);
153179
}
154180

155181
if self.input.is_finished() {
156-
if self.builder.is_some() {
157-
self.state = State::Flush;
182+
if !self.input_data.is_empty() || self.builder.is_some() {
183+
self.state = State::Finalize;
158184
return Ok(Event::Sync);
159185
}
160186
self.output.finish();
161187
return Ok(Event::Finished);
162188
}
163189

164-
if !self.input.has_data() {
165-
self.input.set_need_data();
166-
return Ok(Event::NeedData);
167-
}
168-
169-
let input_data = self.input.pull_data().unwrap()?;
170-
self.state = State::Serialize(input_data);
171-
Ok(Event::Sync)
190+
self.input.set_need_data();
191+
Ok(Event::NeedData)
172192
}
173193

174194
fn process(&mut self) -> Result<()> {
175195
match std::mem::replace(&mut self.state, State::Consume) {
176-
State::Serialize(block) => {
196+
State::Collect(block) => {
177197
// Check if the datablock is valid, this is needed to ensure data is correct
178198
block.check_valid()?;
199+
self.input_data_size += block.estimate_block_size();
200+
self.input_num_rows += block.num_rows();
179201
let max_rows_per_block = self.calc_max_block_size(&block);
180202
let blocks = block.split_by_rows_if_needed_no_tail(max_rows_per_block);
181-
let mut blocks = VecDeque::from(blocks);
203+
self.input_data.extend(blocks);
204+
}
205+
State::Serialize => {
206+
while let Some(b) = self.input_data.pop_front() {
207+
self.input_data_size -= b.estimate_block_size();
208+
self.input_num_rows -= b.num_rows();
182209

183-
let builder = self.get_or_create_builder()?;
184-
while let Some(b) = blocks.pop_front() {
210+
let builder = self.get_or_create_builder()?;
185211
builder.write(b)?;
186212

187213
if builder.need_flush() {
188-
self.state = State::Flush;
189-
190-
for left in blocks {
191-
self.input_data.push_back(left);
192-
}
214+
self.need_flush = true;
193215
return Ok(());
194216
}
195217
}
196218
}
219+
State::Finalize => {
220+
while let Some(b) = self.input_data.pop_front() {
221+
let builder = self.get_or_create_builder()?;
222+
builder.write(b)?;
223+
}
224+
self.state = State::Flush;
225+
}
197226
State::Flush => {
198227
let builder = self.builder.take().unwrap();
199228
if !builder.is_empty() {
200229
let serialized = builder.finish()?;
201230
self.state = State::Write(serialized);
202231
}
232+
self.need_flush = false;
203233
}
204234
_ => return Err(ErrorCode::Internal("It's a bug.")),
205235
}

0 commit comments

Comments
 (0)