Skip to content

Commit 846b83c

Browse files
authored
refactor(query): split Spiller to provide more scalability (#18691)
* fmt * spiller * BroadcastChannel * move
1 parent 296e072 commit 846b83c

38 files changed

+753
-532
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/pipeline/transforms/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ databend-common-column = { workspace = true }
1515
databend-common-exception = { workspace = true }
1616
databend-common-expression = { workspace = true }
1717
databend-common-pipeline-core = { workspace = true }
18+
databend-storages-common-cache = { workspace = true }
1819
jsonb = { workspace = true }
1920
log = { workspace = true }
2021
match-template = { workspace = true }

src/query/pipeline/transforms/src/processors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
mod memory_settings;
16+
pub mod traits;
1617
mod transforms;
1718

1819
pub use memory_settings::MemorySettings;
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod spill;
16+
17+
pub use spill::*;
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
use databend_common_expression::DataBlock;
17+
use databend_storages_common_cache::TempPath;
18+
19+
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
20+
pub enum Location {
21+
Remote(String),
22+
Local(TempPath),
23+
}
24+
25+
#[async_trait::async_trait]
26+
pub trait DataBlockSpill: Clone + Send + Sync + 'static {
27+
async fn spill(&self, data_block: DataBlock) -> Result<Location> {
28+
self.merge_and_spill(vec![data_block]).await
29+
}
30+
async fn merge_and_spill(&self, data_block: Vec<DataBlock>) -> Result<Location>;
31+
async fn restore(&self, location: &Location) -> Result<DataBlock>;
32+
}

src/query/service/src/physical_plans/physical_sort.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,11 @@ use crate::physical_plans::WindowPartition;
4949
use crate::physical_plans::WindowPartitionTopN;
5050
use crate::physical_plans::WindowPartitionTopNFunc;
5151
use crate::pipelines::builders::SortPipelineBuilder;
52-
use crate::pipelines::processors::transforms::TransformSortBuilder;
5352
use crate::pipelines::PipelineBuilder;
53+
use crate::spillers::SortSpiller;
54+
55+
type TransformSortBuilder =
56+
crate::pipelines::processors::transforms::TransformSortBuilder<SortSpiller>;
5457

5558
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
5659
pub struct Sort {

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@ use databend_common_storages_fuse::TableContext;
3535
use databend_storages_common_cache::TempDirManager;
3636

3737
use crate::pipelines::memory_settings::MemorySettingsExt;
38-
use crate::pipelines::processors::transforms::TransformSortBuilder;
3938
use crate::sessions::QueryContext;
40-
use crate::spillers::Spiller;
39+
use crate::spillers::SortSpiller;
4140
use crate::spillers::SpillerConfig;
4241
use crate::spillers::SpillerDiskConfig;
4342
use crate::spillers::SpillerType;
4443

44+
type TransformSortBuilder =
45+
crate::pipelines::processors::transforms::TransformSortBuilder<SortSpiller>;
46+
4547
pub struct SortPipelineBuilder {
4648
ctx: Arc<QueryContext>,
4749
output_schema: DataSchemaRef,
@@ -145,7 +147,7 @@ impl SortPipelineBuilder {
145147
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
146148
};
147149
let op = DataOperator::instance().spill_operator();
148-
Arc::new(Spiller::create(self.ctx.clone(), op, config)?)
150+
SortSpiller::new(self.ctx.clone(), op, config)?
149151
};
150152

151153
pipeline.add_transform(|input, output| {
@@ -231,7 +233,7 @@ impl SortPipelineBuilder {
231233
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
232234
};
233235
let op = DataOperator::instance().spill_operator();
234-
Arc::new(Spiller::create(self.ctx.clone(), op, config)?)
236+
SortSpiller::new(self.ctx.clone(), op, config)?
235237
};
236238

237239
let memory_settings = MemorySettings::from_sort_settings(&self.ctx)?;

src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ impl PipelinePullingExecutor {
162162
})
163163
}
164164

165-
#[fastrace::trace]
166165
pub fn start(&mut self) {
167166
let _guard = ThreadTracker::tracking(self.tracking_payload.clone());
168167

src/query/service/src/pipelines/processors/transforms/sort/merge_sort.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use databend_common_pipeline_core::processors::Processor;
3131
use databend_common_pipeline_transforms::processors::sort::algorithm::SortAlgorithm;
3232
use databend_common_pipeline_transforms::sort::RowConverter;
3333
use databend_common_pipeline_transforms::sort::Rows;
34+
use databend_common_pipeline_transforms::traits::DataBlockSpill;
3435
use databend_common_pipeline_transforms::MemorySettings;
3536
use databend_common_pipeline_transforms::MergeSort;
3637
use databend_common_pipeline_transforms::SortSpillParams;
@@ -42,7 +43,6 @@ use super::sort_spill::OutputData;
4243
use super::sort_spill::SortSpill;
4344
use super::Base;
4445
use super::RowsStat;
45-
use crate::spillers::Spiller;
4646

4747
#[derive(Debug)]
4848
enum State {
@@ -54,14 +54,14 @@ enum State {
5454
Finish,
5555
}
5656

57-
enum Inner<A: SortAlgorithm> {
57+
enum Inner<A: SortAlgorithm, S: DataBlockSpill> {
5858
Collect(Vec<DataBlock>),
5959
Limit(TransformSortMergeLimit<A::Rows>),
6060
Memory(MemoryMerger<A>),
61-
Spill(Vec<DataBlock>, SortSpill<A>),
61+
Spill(Vec<DataBlock>, SortSpill<A, S>),
6262
}
6363

64-
pub struct TransformSort<A: SortAlgorithm, C> {
64+
pub struct TransformSort<A: SortAlgorithm, C, S: DataBlockSpill> {
6565
name: &'static str,
6666
input: Arc<InputPort>,
6767
output: Arc<OutputPort>,
@@ -79,19 +79,20 @@ pub struct TransformSort<A: SortAlgorithm, C> {
7979
/// so we don't need to generate the order column again.
8080
order_col_generated: bool,
8181

82-
base: Base,
83-
inner: Inner<A>,
82+
base: Base<S>,
83+
inner: Inner<A, S>,
8484

8585
aborting: AtomicBool,
8686

8787
max_block_size: usize,
8888
memory_settings: MemorySettings,
8989
}
9090

91-
impl<A, C> TransformSort<A, C>
91+
impl<A, C, S> TransformSort<A, C, S>
9292
where
9393
A: SortAlgorithm,
9494
C: RowConverter<A::Rows>,
95+
S: DataBlockSpill,
9596
{
9697
pub(super) fn new(
9798
input: Arc<InputPort>,
@@ -100,7 +101,7 @@ where
100101
sort_desc: Arc<[SortColumnDescription]>,
101102
max_block_size: usize,
102103
limit: Option<(usize, bool)>,
103-
spiller: Arc<Spiller>,
104+
spiller: S,
104105
output_order_col: bool,
105106
order_col_generated: bool,
106107
memory_settings: MemorySettings,
@@ -303,11 +304,12 @@ where
303304
}
304305

305306
#[async_trait::async_trait]
306-
impl<A, C> Processor for TransformSort<A, C>
307+
impl<A, C, S> Processor for TransformSort<A, C, S>
307308
where
308309
A: SortAlgorithm + 'static,
309310
A::Rows: 'static,
310311
C: RowConverter<A::Rows> + Send + 'static,
312+
S: DataBlockSpill,
311313
{
312314
fn name(&self) -> String {
313315
self.name.to_string()

src/query/service/src/pipelines/processors/transforms/sort/mod.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::Arc;
16-
1715
use bounds::Bounds;
1816
use databend_common_expression::local_block_meta_serde;
1917
use databend_common_expression::BlockMetaInfo;
@@ -24,8 +22,6 @@ use databend_common_pipeline_transforms::SortSpillParams;
2422
use enum_as_inner::EnumAsInner;
2523
use sort_spill::SpillableBlock;
2624

27-
use crate::spillers::Spiller;
28-
2925
mod bounds;
3026
mod merge_sort;
3127
mod sort_broadcast;
@@ -36,8 +32,6 @@ mod sort_merge_stream;
3632
mod sort_restore;
3733
mod sort_route;
3834
mod sort_spill;
39-
#[cfg(test)]
40-
mod test_memory;
4135

4236
pub use merge_sort::*;
4337
pub use sort_broadcast::*;
@@ -49,9 +43,9 @@ pub use sort_restore::*;
4943
pub use sort_route::*;
5044

5145
#[derive(Clone)]
52-
struct Base {
46+
struct Base<S: Send + Clone> {
5347
schema: DataSchemaRef,
54-
spiller: Arc<Spiller>,
48+
spiller: S,
5549
sort_row_offset: usize,
5650
limit: Option<usize>,
5751
}

0 commit comments

Comments
 (0)