Skip to content

Commit f4f841e

Browse files
committed
refactor: introduce DeleteFileContext.
Used to encapsulate enough information for DeleteFileIndex to be able to perform the requrired filtering of delete files
1 parent 5a43fc8 commit f4f841e

File tree

2 files changed

+110
-52
lines changed

2 files changed

+110
-52
lines changed

crates/iceberg/src/delete_file_index.rs

Lines changed: 92 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ use std::collections::HashMap;
22
use std::sync::Arc;
33

44
use futures::channel::mpsc;
5-
use futures::{StreamExt, TryStreamExt};
5+
use futures::{StreamExt, TryFutureExt, TryStreamExt};
66
use tokio::sync::watch;
77

8-
use crate::scan::FileScanTaskDeleteFile;
9-
use crate::spec::{DataContentType, DataFile};
8+
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
9+
use crate::spec::{DataContentType, DataFile, Struct};
1010
use crate::Result;
1111

1212
type DeleteFileIndexRef = Arc<Result<DeleteFileIndex>>;
@@ -16,17 +16,15 @@ pub(crate) type DeleteFileIndexRefReceiver = watch::Receiver<Option<DeleteFileIn
1616
#[derive(Debug)]
1717
pub(crate) struct DeleteFileIndex {
1818
#[allow(dead_code)]
19-
global_deletes: Vec<Arc<FileScanTaskDeleteFile>>,
20-
#[allow(dead_code)]
21-
equality_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>>,
22-
#[allow(dead_code)]
23-
positional_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>>,
24-
positional_deletes_by_path: HashMap<String, Vec<Arc<FileScanTaskDeleteFile>>>,
19+
global_deletes: Vec<Arc<DeleteFileContext>>,
20+
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
21+
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
22+
pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
2523
}
2624

2725
impl DeleteFileIndex {
28-
pub(crate) fn from_receiver(
29-
receiver: mpsc::Receiver<Result<FileScanTaskDeleteFile>>,
26+
pub(crate) fn from_del_file_chan(
27+
receiver: mpsc::Receiver<Result<DeleteFileContext>>,
3028
) -> watch::Receiver<Option<DeleteFileIndexRef>> {
3129
let (tx, rx) = watch::channel(None);
3230

@@ -41,69 +39,126 @@ impl DeleteFileIndex {
4139
rx
4240
}
4341

44-
fn from_delete_files(files: Vec<FileScanTaskDeleteFile>) -> Self {
45-
let mut equality_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>> =
42+
fn from_delete_files(files: Vec<DeleteFileContext>) -> Self {
43+
let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
4644
HashMap::default();
47-
let mut positional_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>> =
45+
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
4846
HashMap::default();
49-
let mut positional_deletes_by_path: HashMap<String, Vec<Arc<FileScanTaskDeleteFile>>> =
47+
let mut pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>> =
5048
HashMap::default();
5149

52-
files.into_iter().for_each(|file| {
53-
let arc_file = Arc::new(file);
54-
match arc_file.file_type {
50+
files.into_iter().for_each(|del_file_ctx| {
51+
let arc_del_file_ctx = Arc::new(del_file_ctx);
52+
match arc_del_file_ctx.manifest_entry.content_type() {
5553
DataContentType::PositionDeletes => {
5654
// TODO: implement logic from ContentFileUtil.referencedDataFile
5755
// see https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L54
5856
let referenced_data_file_path = "TODO".to_string();
5957

60-
positional_deletes_by_path
58+
pos_deletes_by_path
6159
.entry(referenced_data_file_path)
6260
.and_modify(|entry| {
63-
entry.push(arc_file.clone());
61+
entry.push(arc_del_file_ctx.clone());
6462
})
65-
.or_insert(vec![arc_file.clone()]);
66-
67-
positional_deletes_by_partition
68-
.entry(arc_file.partition_spec_id)
63+
.or_insert(vec![arc_del_file_ctx.clone()]);
64+
65+
pos_deletes_by_partition
66+
.entry(
67+
arc_del_file_ctx
68+
.manifest_entry
69+
.data_file()
70+
.partition()
71+
.clone(),
72+
)
6973
.and_modify(|entry| {
70-
entry.push(arc_file.clone());
74+
entry.push(arc_del_file_ctx.clone());
7175
})
72-
.or_insert(vec![arc_file.clone()]);
76+
.or_insert(vec![arc_del_file_ctx.clone()]);
7377
}
7478
DataContentType::EqualityDeletes => {
75-
equality_deletes_by_partition
76-
.entry(arc_file.partition_spec_id)
79+
eq_deletes_by_partition
80+
.entry(
81+
arc_del_file_ctx
82+
.manifest_entry
83+
.data_file()
84+
.partition()
85+
.clone(),
86+
)
7787
.and_modify(|entry| {
78-
entry.push(arc_file.clone());
88+
entry.push(arc_del_file_ctx.clone());
7989
})
80-
.or_insert(vec![arc_file.clone()]);
90+
.or_insert(vec![arc_del_file_ctx.clone()]);
8191
}
8292
_ => unreachable!(),
8393
}
8494
});
8595

8696
DeleteFileIndex {
8797
global_deletes: vec![],
88-
equality_deletes_by_partition,
89-
positional_deletes_by_partition,
90-
positional_deletes_by_path,
98+
eq_deletes_by_partition,
99+
pos_deletes_by_partition,
100+
pos_deletes_by_path,
91101
}
92102
}
93103

94104
/// Determine all the delete files that apply to the provided `DataFile`.
95105
pub(crate) fn get_deletes_for_data_file(
96106
&self,
97107
data_file: &DataFile,
108+
seq_num: Option<i64>,
98109
) -> Vec<FileScanTaskDeleteFile> {
99110
let mut results = vec![];
100111

101-
if let Some(positional_deletes) = self.positional_deletes_by_path.get(data_file.file_path())
102-
{
103-
results.extend(positional_deletes.iter().map(|i| i.as_ref().clone()))
112+
if let Some(deletes) = self.pos_deletes_by_path.get(data_file.file_path()) {
113+
deletes
114+
.iter()
115+
.filter(|&delete| {
116+
seq_num
117+
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
118+
.unwrap_or_else(|| true)
119+
})
120+
.for_each(|delete| {
121+
results.push(FileScanTaskDeleteFile {
122+
file_path: delete.manifest_entry.file_path().to_string(),
123+
file_type: delete.manifest_entry.content_type(),
124+
partition_spec_id: delete.partition_spec_id,
125+
})
126+
});
104127
}
105128

106-
// TODO: equality deletes
129+
if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) {
130+
deletes
131+
.iter()
132+
.filter(|&delete| {
133+
seq_num
134+
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
135+
.unwrap_or_else(|| true)
136+
})
137+
.for_each(|delete| {
138+
results.push(FileScanTaskDeleteFile {
139+
file_path: delete.manifest_entry.file_path().to_string(),
140+
file_type: delete.manifest_entry.content_type(),
141+
partition_spec_id: delete.partition_spec_id,
142+
})
143+
});
144+
}
145+
146+
if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) {
147+
deletes
148+
.iter()
149+
.filter(|&delete| {
150+
seq_num
151+
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
152+
.unwrap_or_else(|| true)
153+
})
154+
.for_each(|delete| {
155+
results.push(FileScanTaskDeleteFile {
156+
file_path: delete.manifest_entry.file_path().to_string(),
157+
file_type: delete.manifest_entry.content_type(),
158+
partition_spec_id: delete.partition_spec_id,
159+
})
160+
});
161+
}
107162

108163
results
109164
}

crates/iceberg/src/scan.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -385,14 +385,14 @@ impl TableScan {
385385
// used to stream the results back to the caller
386386
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
387387

388+
// DeleteFileIndexRefReceiver is a watch channel receiver that will
389+
// be notified when the DeleteFileIndex is ready.
388390
let delete_file_idx_and_tx: Option<(
389391
DeleteFileIndexRefReceiver,
390-
Sender<Result<FileScanTaskDeleteFile>>,
392+
Sender<Result<DeleteFileContext>>,
391393
)> = if self.delete_file_processing_enabled {
392-
// used to stream delete files into the DeleteFileIndex
393394
let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries);
394-
395-
let delete_file_index_rx = DeleteFileIndex::from_receiver(delete_file_rx);
395+
let delete_file_index_rx = DeleteFileIndex::from_del_file_chan(delete_file_rx);
396396
Some((delete_file_index_rx, delete_file_tx))
397397
} else {
398398
None
@@ -565,7 +565,7 @@ impl TableScan {
565565

566566
async fn process_delete_manifest_entry(
567567
manifest_entry_context: ManifestEntryContext,
568-
mut file_scan_task_delete_file_tx: Sender<Result<FileScanTaskDeleteFile>>,
568+
mut delete_file_ctx_tx: Sender<Result<DeleteFileContext>>,
569569
) -> Result<()> {
570570
// skip processing this manifest entry if it has been marked as deleted
571571
if !manifest_entry_context.manifest_entry.is_alive() {
@@ -596,13 +596,9 @@ impl TableScan {
596596
}
597597
}
598598

599-
file_scan_task_delete_file_tx
600-
.send(Ok(FileScanTaskDeleteFile {
601-
file_path: manifest_entry_context
602-
.manifest_entry
603-
.file_path()
604-
.to_string(),
605-
file_type: manifest_entry_context.manifest_entry.content_type(),
599+
delete_file_ctx_tx
600+
.send(Ok(DeleteFileContext {
601+
manifest_entry: manifest_entry_context.manifest_entry.clone(),
606602
partition_spec_id: manifest_entry_context.partition_spec_id,
607603
}))
608604
.await?;
@@ -698,9 +694,10 @@ impl ManifestEntryContext {
698694

699695
match del_file_idx_opt.as_ref() {
700696
Some(del_file_idx) => match del_file_idx.as_ref() {
701-
Ok(delete_file_idx) => {
702-
delete_file_idx.get_deletes_for_data_file(self.manifest_entry.data_file())
703-
}
697+
Ok(delete_file_idx) => delete_file_idx.get_deletes_for_data_file(
698+
self.manifest_entry.data_file(),
699+
self.manifest_entry.sequence_number(),
700+
),
704701
Err(err) => {
705702
return Err(Error::new(ErrorKind::Unexpected, err.message()));
706703
}
@@ -1107,6 +1104,12 @@ pub struct FileScanTaskDeleteFile {
11071104
pub partition_spec_id: i32,
11081105
}
11091106

1107+
#[derive(Debug)]
1108+
pub(crate) struct DeleteFileContext {
1109+
pub(crate) manifest_entry: ManifestEntryRef,
1110+
pub(crate) partition_spec_id: i32,
1111+
}
1112+
11101113
impl FileScanTask {
11111114
/// Returns the data file path of this file scan task.
11121115
pub fn data_file_path(&self) -> &str {

0 commit comments

Comments
 (0)