Skip to content

Commit 3d33fd6

Browse files
committed
add test
1 parent cab7acd commit 3d33fd6

File tree

15 files changed

+431
-73
lines changed

15 files changed

+431
-73
lines changed

src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use databend_common_storages_fuse::io::TableMetaLocationGenerator;
3939
use databend_common_storages_fuse::operations::ASSUMPTION_MAX_TXN_DURATION;
4040
use databend_common_storages_fuse::FuseTable;
4141
use databend_common_storages_fuse::RetentionPolicy;
42-
use databend_common_storages_fuse::FUSE_TBL_REF_PREFIX;
4342
use databend_storages_common_cache::CacheAccessor;
4443
use databend_storages_common_cache::CacheManager;
4544
use databend_storages_common_io::Files;
@@ -621,6 +620,9 @@ async fn process_snapshot_refs(
621620
) -> Result<RefVacuumInfo> {
622621
let start = std::time::Instant::now();
623622
let op = fuse_table.get_operator();
623+
let ref_snapshot_location_prefix = fuse_table
624+
.meta_location_generator()
625+
.ref_snapshot_location_prefix();
624626
// Refs that expired and should be cleaned up
625627
let mut expired_refs = Vec::new();
626628
// Updated refs map (branch anchors updated)
@@ -652,19 +654,15 @@ async fn process_snapshot_refs(
652654
}
653655
SnapshotRefInfo::Branch { head, anchor } => {
654656
// Branch: need to clean up snapshots, get gc_root and snapshots_to_gc
655-
let ref_prefix = format!(
656-
"{}/{}/{}/",
657-
fuse_table.meta_location_generator().prefix(),
658-
FUSE_TBL_REF_PREFIX,
659-
ref_name
660-
);
657+
let ref_prefix = format!("{}{}/", ref_snapshot_location_prefix, ref_name);
661658

662659
let snapshots_before_retention = fuse_table
663660
.list_files_until_timestamp(&ref_prefix, retention_time, true, None)
664661
.await?;
665662

666663
let (gc_root_location, gc_root_snap) = match process_branch_gc_root(
667664
fuse_table,
665+
ref_name,
668666
head,
669667
&snapshots_before_retention,
670668
&mut ref_snapshots_to_gc,
@@ -729,12 +727,7 @@ async fn process_snapshot_refs(
729727
// Step 3: Cleanup expired ref directories
730728
if !expired_refs.is_empty() {
731729
for ref_name in &expired_refs {
732-
let dir = format!(
733-
"{}/{}/{}/",
734-
fuse_table.meta_location_generator().prefix(),
735-
FUSE_TBL_REF_PREFIX,
736-
ref_name
737-
);
730+
let dir = format!("{}{}/", ref_snapshot_location_prefix, ref_name);
738731
op.remove_all(&dir).await.inspect_err(|err| {
739732
error!("Failed to remove expired ref directory {}: {}", dir, err);
740733
})?;
@@ -762,6 +755,7 @@ async fn process_snapshot_refs(
762755
#[async_backtrace::framed]
763756
async fn process_branch_gc_root(
764757
fuse_table: &FuseTable,
758+
ref_name: &str,
765759
head: &str,
766760
snapshots_before_retention: &[Entry],
767761
ref_snapshots_to_gc: &mut Vec<String>,
@@ -791,7 +785,7 @@ async fn process_branch_gc_root(
791785
};
792786
let gc_root_path = fuse_table
793787
.meta_location_generator()
794-
.snapshot_location_from_uuid(&gc_root_id, gc_root_ver)?;
788+
.ref_snapshot_location_from_uuid(ref_name, &gc_root_id, gc_root_ver)?;
795789

796790
// Try to read gc_root snapshot
797791
match SnapshotsIO::read_snapshot(gc_root_path.clone(), op, false).await {

src/query/ee/src/table_ref/handler.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,11 @@ impl TableRefHandler for RealTableRefHandler {
116116
// write down new snapshot
117117
let new_snapshot_location = fuse_table
118118
.meta_location_generator()
119-
.gen_ref_snapshot_location(&plan.ref_name, &new_snapshot.snapshot_id);
119+
.ref_snapshot_location_from_uuid(
120+
&plan.ref_name,
121+
&new_snapshot.snapshot_id,
122+
new_snapshot.format_version,
123+
)?;
120124
let data = new_snapshot.to_bytes()?;
121125
fuse_table
122126
.get_operator_ref()

src/query/service/src/table_functions/system/table_statistics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ pub async fn get_fuse_table_snapshot(
219219
snapshot_location,
220220
format_version,
221221
meta_location_generator.clone(),
222+
fuse_table.get_table_branch_name(),
222223
);
223224

224225
if let Some(Ok((snapshot, _v))) = lite_snapshot_stream.take(1).next().await {

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,10 @@ impl FuseTable {
455455
&self.operator
456456
}
457457

458+
pub fn get_table_branch_name(&self) -> Option<String> {
459+
self.table_branch.as_ref().map(|(name, _)| name.clone())
460+
}
461+
458462
pub fn try_from_table(tbl: &dyn Table) -> Result<&FuseTable> {
459463
tbl.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
460464
ErrorCode::Internal(format!(
@@ -1010,19 +1014,39 @@ impl Table for FuseTable {
10101014
}
10111015
}
10121016
_ => {
1013-
let s = &self.table_info.meta.statistics;
1014-
TableStatistics {
1015-
num_rows: Some(s.number_of_rows),
1016-
data_size: Some(s.data_bytes),
1017-
data_size_compressed: Some(s.compressed_data_bytes),
1018-
index_size: Some(s.index_data_bytes),
1019-
bloom_index_size: s.bloom_index_size,
1020-
ngram_index_size: s.ngram_index_size,
1021-
inverted_index_size: s.inverted_index_size,
1022-
vector_index_size: s.vector_index_size,
1023-
virtual_column_size: s.virtual_column_size,
1024-
number_of_blocks: s.number_of_blocks,
1025-
number_of_segments: s.number_of_segments,
1017+
if self.table_branch.is_some() {
1018+
let Some(ss) = self.read_table_snapshot().await? else {
1019+
return Ok(None);
1020+
};
1021+
let stats = &ss.summary;
1022+
TableStatistics {
1023+
num_rows: Some(stats.row_count),
1024+
data_size: Some(stats.uncompressed_byte_size),
1025+
data_size_compressed: Some(stats.compressed_byte_size),
1026+
index_size: Some(stats.index_size),
1027+
bloom_index_size: stats.bloom_index_size,
1028+
ngram_index_size: stats.ngram_index_size,
1029+
inverted_index_size: stats.inverted_index_size,
1030+
vector_index_size: stats.vector_index_size,
1031+
virtual_column_size: stats.virtual_column_size,
1032+
number_of_blocks: Some(stats.block_count),
1033+
number_of_segments: Some(ss.segments.len() as u64),
1034+
}
1035+
} else {
1036+
let s = &self.table_info.meta.statistics;
1037+
TableStatistics {
1038+
num_rows: Some(s.number_of_rows),
1039+
data_size: Some(s.data_bytes),
1040+
data_size_compressed: Some(s.compressed_data_bytes),
1041+
index_size: Some(s.index_data_bytes),
1042+
bloom_index_size: s.bloom_index_size,
1043+
ngram_index_size: s.ngram_index_size,
1044+
inverted_index_size: s.inverted_index_size,
1045+
vector_index_size: s.vector_index_size,
1046+
virtual_column_size: s.virtual_column_size,
1047+
number_of_blocks: s.number_of_blocks,
1048+
number_of_segments: s.number_of_segments,
1049+
}
10261050
}
10271051
}
10281052
};

src/query/storages/fuse/src/io/locations.rs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use databend_storages_common_table_meta::meta::SegmentInfo;
2323
use databend_storages_common_table_meta::meta::SegmentStatistics;
2424
use databend_storages_common_table_meta::meta::SnapshotVersion;
2525
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
26-
use databend_storages_common_table_meta::meta::TableSnapshot;
2726
use databend_storages_common_table_meta::meta::TableSnapshotStatisticsVersion;
2827
use databend_storages_common_table_meta::meta::Versioned;
2928
use databend_storages_common_table_meta::meta::VACUUM2_OBJECT_KEY_PREFIX;
@@ -131,6 +130,10 @@ impl TableMetaLocationGenerator {
131130
&self.segment_statistics_location_prefix
132131
}
133132

133+
pub fn ref_snapshot_location_prefix(&self) -> &str {
134+
&self.ref_snapshot_location_prefix
135+
}
136+
134137
pub fn gen_block_location(
135138
&self,
136139
table_meta_timestamps: TableMetaTimestamps,
@@ -200,14 +203,14 @@ impl TableMetaLocationGenerator {
200203
Ok(snapshot_version.create(id, &self.prefix))
201204
}
202205

203-
pub fn gen_ref_snapshot_location(&self, name: &str, uuid: &Uuid) -> String {
204-
format!(
205-
"{}/{name}/{}{}_v{}.mpk",
206-
&self.ref_snapshot_location_prefix,
207-
VACUUM2_OBJECT_KEY_PREFIX,
208-
uuid.simple(),
209-
TableSnapshot::VERSION,
210-
)
206+
pub fn ref_snapshot_location_from_uuid(
207+
&self,
208+
name: &str,
209+
id: &Uuid,
210+
version: u64,
211+
) -> Result<String> {
212+
let snapshot_version = SnapshotVersion::try_from(version)?;
213+
Ok(snapshot_version.create_ref(name, id, &self.prefix))
211214
}
212215

213216
pub fn snapshot_version(location: impl AsRef<str>) -> u64 {
@@ -353,6 +356,7 @@ impl TableMetaLocationGenerator {
353356

354357
trait SnapshotLocationCreator {
355358
fn create(&self, id: &Uuid, prefix: impl AsRef<str>) -> String;
359+
fn create_ref(&self, name: &str, id: &Uuid, prefix: impl AsRef<str>) -> String;
356360
fn suffix(&self) -> String;
357361
}
358362

@@ -376,6 +380,18 @@ impl SnapshotLocationCreator for SnapshotVersion {
376380
)
377381
}
378382

383+
fn create_ref(&self, name: &str, id: &Uuid, prefix: impl AsRef<str>) -> String {
384+
format!(
385+
"{}/{}/{}/{}{}{}",
386+
prefix.as_ref(),
387+
FUSE_TBL_REF_PREFIX,
388+
name,
389+
VACUUM2_OBJECT_KEY_PREFIX,
390+
id.simple(),
391+
self.suffix(),
392+
)
393+
}
394+
379395
fn suffix(&self) -> String {
380396
match self {
381397
SnapshotVersion::V0(_) => "".to_string(),
@@ -398,6 +414,10 @@ impl SnapshotLocationCreator for TableSnapshotStatisticsVersion {
398414
)
399415
}
400416

417+
fn create_ref(&self, _name: &str, _id: &Uuid, _prefix: impl AsRef<str>) -> String {
418+
unimplemented!()
419+
}
420+
401421
fn suffix(&self) -> String {
402422
match self {
403423
TableSnapshotStatisticsVersion::V0(_) => "_ts_v0.json".to_string(),

src/query/storages/fuse/src/io/read/snapshot_history_reader.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,26 @@ pub trait SnapshotHistoryReader {
3939
location: String,
4040
format_version: u64,
4141
location_gen: TableMetaLocationGenerator,
42+
branch_name: Option<String>,
4243
) -> TableSnapshotStream;
4344
}
45+
4446
impl SnapshotHistoryReader for TableSnapshotReader {
4547
fn snapshot_history(
4648
self,
4749
location: String,
4850
format_version: u64,
4951
location_gen: TableMetaLocationGenerator,
52+
branch_name: Option<String>,
5053
) -> TableSnapshotStream {
5154
let stream = stream::try_unfold(
52-
(self, location_gen, Some((location, format_version))),
53-
|(reader, gen, next)| async move {
55+
(
56+
self,
57+
location_gen,
58+
branch_name,
59+
Some((location, format_version)),
60+
),
61+
|(reader, gen, branch_name, next)| async move {
5462
if let Some((loc, ver)) = next {
5563
let load_params = LoadParams {
5664
location: loc,
@@ -77,14 +85,22 @@ impl SnapshotHistoryReader for TableSnapshotReader {
7785
Ok(Some(snapshot)) => {
7886
if let Some((prev_id, prev_version)) = snapshot.prev_snapshot_id {
7987
let new_ver = prev_version;
80-
let new_loc =
81-
gen.snapshot_location_from_uuid(&prev_id, prev_version)?;
88+
// Use branch-specific location if we're on a branch
89+
let new_loc = if let Some(ref name) = branch_name {
90+
gen.ref_snapshot_location_from_uuid(
91+
name,
92+
&prev_id,
93+
prev_version,
94+
)?
95+
} else {
96+
gen.snapshot_location_from_uuid(&prev_id, prev_version)?
97+
};
8298
Ok(Some((
8399
(snapshot, ver),
84-
(reader, gen, Some((new_loc, new_ver))),
100+
(reader, gen, branch_name, Some((new_loc, new_ver))),
85101
)))
86102
} else {
87-
Ok(Some(((snapshot, ver), (reader, gen, None))))
103+
Ok(Some(((snapshot, ver), (reader, gen, branch_name, None))))
88104
}
89105
}
90106
Ok(None) => Ok(None),

src/query/storages/fuse/src/io/snapshots.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,12 +223,13 @@ impl SnapshotsIO {
223223
dal: Operator,
224224
location_generator: TableMetaLocationGenerator,
225225
root_snapshot: String,
226+
ref_name: Option<String>,
226227
limit: Option<usize>,
227228
) -> Result<Vec<TableSnapshotLite>> {
228229
let table_snapshot_reader = MetaReaders::table_snapshot_reader(dal);
229230
let format_version = TableMetaLocationGenerator::snapshot_version(root_snapshot.as_str());
230231
let lite_snapshot_stream = table_snapshot_reader
231-
.snapshot_history(root_snapshot, format_version, location_generator)
232+
.snapshot_history(root_snapshot, format_version, location_generator, ref_name)
232233
.map_ok(|(snapshot, format_version)| {
233234
TableSnapshotLite::from((snapshot.as_ref(), format_version))
234235
});

src/query/storages/fuse/src/operations/gc.rs

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ use databend_storages_common_table_meta::meta::Location;
4242
use databend_storages_common_table_meta::meta::SegmentInfo;
4343
use databend_storages_common_table_meta::meta::TableSnapshot;
4444
use databend_storages_common_table_meta::meta::TableSnapshotStatistics;
45-
use databend_storages_common_table_meta::meta::VACUUM2_OBJECT_KEY_PREFIX;
4645
use log::error;
4746
use log::info;
4847
use log::warn;
@@ -58,7 +57,6 @@ use crate::io::SnapshotsIO;
5857
use crate::io::TableMetaLocationGenerator;
5958
use crate::FuseTable;
6059
use crate::RetentionPolicy;
61-
use crate::FUSE_TBL_REF_PREFIX;
6260
use crate::FUSE_TBL_SNAPSHOT_PREFIX;
6361

6462
impl FuseTable {
@@ -743,7 +741,7 @@ impl FuseTable {
743741
/// Returns gc_root location if found
744742
async fn select_branch_gc_root(
745743
&self,
746-
ref_prefix: &str,
744+
ref_name: &str,
747745
snapshots_before_retention: &[Entry],
748746
ref_protected_segments: &mut HashSet<Location>,
749747
ref_snapshots_to_purge: &mut Vec<String>,
@@ -761,15 +759,9 @@ impl FuseTable {
761759
let Some((gc_root_id, gc_root_ver)) = last_snapshot.prev_snapshot_id else {
762760
return Ok(None);
763761
};
764-
765-
let gc_root_path = format!(
766-
"{}{}{}_v{}.mpk",
767-
ref_prefix,
768-
VACUUM2_OBJECT_KEY_PREFIX,
769-
gc_root_id.simple(),
770-
gc_root_ver,
771-
);
772-
762+
let gc_root_path = self
763+
.meta_location_generator()
764+
.ref_snapshot_location_from_uuid(ref_name, &gc_root_id, gc_root_ver)?;
773765
// Try to read gc_root snapshot
774766
match SnapshotsIO::read_snapshot(gc_root_path.clone(), op.clone(), false).await {
775767
Ok((gc_root_snap, _)) => {
@@ -818,6 +810,9 @@ impl FuseTable {
818810
let now = Utc::now();
819811
let table_info = self.get_table_info();
820812
let retention_policy = self.get_data_retention_policy(ctx.as_ref())?;
813+
let ref_snapshot_location_prefix = self
814+
.meta_location_generator()
815+
.ref_snapshot_location_prefix();
821816

822817
let mut ref_protected_segments = HashSet::new();
823818
let mut ref_snapshots_to_purge = Vec::new();
@@ -847,12 +842,7 @@ impl FuseTable {
847842
}
848843
SnapshotRefInfo::Branch { head, anchor } => {
849844
// Branch: process based on retention policy
850-
let ref_prefix = format!(
851-
"{}/{}/{}/",
852-
self.meta_location_generator().prefix(),
853-
FUSE_TBL_REF_PREFIX,
854-
ref_name
855-
);
845+
let ref_prefix = format!("{}{}/", ref_snapshot_location_prefix, ref_name);
856846

857847
let snapshots_before_lvt = match &retention_policy {
858848
RetentionPolicy::ByTimePeriod(delta_duration) => {
@@ -887,7 +877,7 @@ impl FuseTable {
887877

888878
let new_anchor = match self
889879
.select_branch_gc_root(
890-
&ref_prefix,
880+
ref_name,
891881
&snapshots_before_lvt,
892882
&mut ref_protected_segments,
893883
&mut ref_snapshots_to_purge,
@@ -954,12 +944,7 @@ impl FuseTable {
954944
if !expired_refs.is_empty() {
955945
let operator = self.get_operator();
956946
for ref_name in &expired_refs {
957-
let dir = format!(
958-
"{}/{}/{}/",
959-
self.meta_location_generator().prefix(),
960-
FUSE_TBL_REF_PREFIX,
961-
ref_name
962-
);
947+
let dir = format!("{}{}/", ref_snapshot_location_prefix, ref_name);
963948
operator.remove_all(&dir).await.inspect_err(|err| {
964949
error!("Failed to remove expired ref directory {}: {}", dir, err);
965950
})?;

0 commit comments

Comments
 (0)