Skip to content

Commit aff1c80

Browse files
authored
feat: table function that dumps snapshot information (#17763)
* feat: fuse_dump_snapshot table function * Tweak logic test
1 parent 33dd91f commit aff1c80

File tree

6 files changed

+210
-1
lines changed

6 files changed

+210
-1
lines changed

src/query/service/src/table_functions/table_function_factory.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use databend_common_storages_fuse::table_functions::ClusteringStatisticsFunc;
2323
use databend_common_storages_fuse::table_functions::FuseAmendTable;
2424
use databend_common_storages_fuse::table_functions::FuseBlockFunc;
2525
use databend_common_storages_fuse::table_functions::FuseColumnFunc;
26+
use databend_common_storages_fuse::table_functions::FuseDumpSnapshotsFunc;
2627
use databend_common_storages_fuse::table_functions::FuseEncodingFunc;
2728
use databend_common_storages_fuse::table_functions::FuseStatisticsFunc;
2829
use databend_common_storages_fuse::table_functions::FuseTimeTravelSizeFunc;
@@ -133,6 +134,14 @@ impl TableFunctionFactory {
133134
),
134135
);
135136

137+
creators.insert(
138+
"fuse_dump_snapshots".to_string(),
139+
(
140+
next_id(),
141+
Arc::new(TableFunctionTemplate::<FuseDumpSnapshotsFunc>::create),
142+
),
143+
);
144+
136145
creators.insert(
137146
"fuse_amend".to_string(),
138147
(

src/query/storages/fuse/src/table_functions/clustering_information.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ use databend_common_expression::Value;
4242
use databend_common_functions::BUILTIN_FUNCTIONS;
4343
use databend_common_sql::analyze_cluster_keys;
4444
use databend_storages_common_index::statistics_to_domain;
45-
// use databend_storages_common_table_meta::meta::AbstractBlockMeta;
4645
use databend_storages_common_table_meta::meta::BlockMeta;
4746
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
4847
use databend_storages_common_table_meta::meta::SegmentInfo;
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_catalog::catalog_kind::CATALOG_DEFAULT;
18+
use databend_common_catalog::plan::DataSourcePlan;
19+
use databend_common_catalog::table_args::TableArgs;
20+
use databend_common_exception::ErrorCode;
21+
use databend_common_exception::Result;
22+
use databend_common_expression::types::StringType;
23+
use databend_common_expression::types::VariantType;
24+
use databend_common_expression::DataBlock;
25+
use databend_common_expression::FromData;
26+
use databend_common_expression::TableDataType;
27+
use databend_common_expression::TableField;
28+
use databend_common_expression::TableSchema;
29+
use databend_common_expression::TableSchemaRefExt;
30+
31+
use crate::io::MetaReaders;
32+
use crate::io::TableMetaLocationGenerator;
33+
use crate::sessions::TableContext;
34+
use crate::table_functions::parse_db_tb_args;
35+
use crate::table_functions::string_literal;
36+
use crate::table_functions::SimpleTableFunc;
37+
use crate::FuseTable;
38+
39+
pub struct FuseDumpSnapshotsArgs {
40+
database_name: String,
41+
table_name: String,
42+
}
43+
44+
const DEFAULT_SNAPSHOT_LIMIT: usize = 1;
45+
46+
pub struct FuseDumpSnapshotsFunc {
47+
args: FuseDumpSnapshotsArgs,
48+
}
49+
50+
impl From<&FuseDumpSnapshotsArgs> for TableArgs {
51+
fn from(args: &FuseDumpSnapshotsArgs) -> Self {
52+
TableArgs::new_positioned(vec![
53+
string_literal(args.database_name.as_str()),
54+
string_literal(args.table_name.as_str()),
55+
])
56+
}
57+
}
58+
59+
#[async_trait::async_trait]
60+
impl SimpleTableFunc for FuseDumpSnapshotsFunc {
61+
fn table_args(&self) -> Option<TableArgs> {
62+
Some((&self.args).into())
63+
}
64+
65+
fn schema(&self) -> Arc<TableSchema> {
66+
TableSchemaRefExt::create(vec![
67+
TableField::new("snapshot_id", TableDataType::String),
68+
TableField::new("snapshot", TableDataType::Variant),
69+
])
70+
}
71+
72+
async fn apply(
73+
&self,
74+
ctx: &Arc<dyn TableContext>,
75+
plan: &DataSourcePlan,
76+
) -> Result<Option<DataBlock>> {
77+
let tenant_id = ctx.get_tenant();
78+
let tbl = ctx
79+
.get_catalog(CATALOG_DEFAULT)
80+
.await?
81+
.get_table(
82+
&tenant_id,
83+
self.args.database_name.as_str(),
84+
self.args.table_name.as_str(),
85+
)
86+
.await?;
87+
88+
let table = FuseTable::try_from_table(tbl.as_ref()).map_err(|_| {
89+
ErrorCode::StorageOther(
90+
"Invalid table engine, only FUSE table supports fuse_dump_snapshots",
91+
)
92+
})?;
93+
94+
let meta_location_generator = table.meta_location_generator.clone();
95+
let snapshot_location = table.snapshot_loc();
96+
if let Some(snapshot_location) = snapshot_location {
97+
let limit = plan
98+
.push_downs
99+
.as_ref()
100+
.and_then(|v| v.limit)
101+
.unwrap_or(DEFAULT_SNAPSHOT_LIMIT);
102+
103+
let table_snapshot_reader = MetaReaders::table_snapshot_reader(table.operator.clone());
104+
let format_version =
105+
TableMetaLocationGenerator::snapshot_version(snapshot_location.as_str());
106+
107+
use crate::io::read::SnapshotHistoryReader;
108+
let lite_snapshot_stream = table_snapshot_reader.snapshot_history(
109+
snapshot_location,
110+
format_version,
111+
meta_location_generator.clone(),
112+
);
113+
114+
let mut snapshot_ids: Vec<String> = Vec::with_capacity(limit);
115+
let mut content: Vec<_> = Vec::with_capacity(limit);
116+
117+
use futures::stream::StreamExt;
118+
let mut stream = lite_snapshot_stream.take(limit);
119+
120+
use jsonb::Value as JsonbValue;
121+
while let Some(s) = stream.next().await {
122+
let (s, _v) = s?;
123+
snapshot_ids.push(s.snapshot_id.simple().to_string());
124+
content.push(JsonbValue::from(serde_json::to_value(s)?).to_vec());
125+
}
126+
127+
let block = DataBlock::new_from_columns(vec![
128+
StringType::from_data(snapshot_ids),
129+
VariantType::from_data(content),
130+
]);
131+
132+
return Ok(Some(block));
133+
}
134+
Ok(Some(DataBlock::empty_with_schema(Arc::new(
135+
self.schema().into(),
136+
))))
137+
}
138+
139+
fn create(func_name: &str, table_args: TableArgs) -> Result<Self>
140+
where Self: Sized {
141+
let (arg_database_name, arg_table_name) = parse_db_tb_args(&table_args, func_name)?;
142+
Ok(Self {
143+
args: FuseDumpSnapshotsArgs {
144+
database_name: arg_database_name,
145+
table_name: arg_table_name,
146+
},
147+
})
148+
}
149+
}

src/query/storages/fuse/src/table_functions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ mod function_template;
1818
mod fuse_amend;
1919
mod fuse_block;
2020
mod fuse_column;
21+
mod fuse_dump_snapshot;
2122
mod fuse_encoding;
2223
mod fuse_segment;
2324
mod fuse_snapshot;
@@ -38,6 +39,7 @@ pub use function_template::*;
3839
pub use fuse_amend::FuseAmendTable;
3940
pub use fuse_block::FuseBlockFunc;
4041
pub use fuse_column::FuseColumnFunc;
42+
pub use fuse_dump_snapshot::FuseDumpSnapshotsFunc;
4143
pub use fuse_encoding::FuseEncodingFunc;
4244
pub use fuse_segment::FuseSegmentFunc;
4345
pub use fuse_snapshot::FuseSnapshotFunc;

tests/sqllogictests/suites/base/06_show/06_0014_show_table_functions.test

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ SHOW TABLE_FUNCTIONS LIKE 'fuse%'
1111
fuse_amend
1212
fuse_block
1313
fuse_column
14+
fuse_dump_snapshots
1415
fuse_encoding
1516
fuse_segment
1617
fuse_snapshot
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
statement ok
2+
CREATE OR REPLACE DATABASE dump_snapshots;
3+
4+
statement ok
5+
USE dump_snapshots;
6+
7+
statement ok
8+
create table t(c int)
9+
10+
query III
11+
select length(snapshot_id), snapshot:summary:block_count from fuse_dump_snapshots('dump_snapshots', 't')
12+
----
13+
14+
statement ok
15+
insert into t values (1)
16+
17+
statement ok
18+
insert into t values (1)
19+
20+
# By default, only the latest snapshot is returned:
21+
# Expects one snapshot which contains 2 blocks
22+
query III
23+
select length(snapshot_id), snapshot:summary:block_count from fuse_dump_snapshots('dump_snapshots', 't')
24+
----
25+
32 2
26+
27+
28+
29+
# If limit is set, more snapshots will be returned up to the specified limit
30+
#
31+
# Expects 2 snapshots:
32+
# - The latest one contains 2 blocks
33+
# - The second to the last one contains 1 block
34+
query III
35+
select length(snapshot_id), snapshot:summary:block_count from fuse_dump_snapshots('dump_snapshots', 't') limit 100
36+
----
37+
32 2
38+
32 1
39+
40+
41+
42+
statement ok
43+
create table m(c int) engine = memory;
44+
45+
statement ok
46+
insert into m values (1)
47+
48+
statement error
49+
select * from fuse_dump_snapshots('dump_snapshots', 'm')

0 commit comments

Comments
 (0)