Skip to content

Commit abb3e65

Browse files
committed
fix: delete file lost wake
Signed-off-by: xxchan <[email protected]>
1 parent 513376c commit abb3e65

File tree

1 file changed

+33
-50
lines changed

1 file changed

+33
-50
lines changed

crates/iceberg/src/delete_file_index.rs

Lines changed: 33 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,22 @@
1616
// under the License.
1717

1818
use std::collections::HashMap;
19-
use std::future::Future;
20-
use std::ops::Deref;
21-
use std::pin::Pin;
22-
use std::sync::{Arc, RwLock};
23-
use std::task::{Context, Poll};
19+
use std::sync::{Arc, OnceLock};
2420

2521
use futures::channel::mpsc::{channel, Sender};
2622
use futures::StreamExt;
23+
use tokio::sync::Notify;
2724

2825
use crate::runtime::spawn;
2926
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
3027
use crate::spec::{DataContentType, DataFile, Struct};
31-
use crate::{Error, ErrorKind, Result};
28+
use crate::Result;
3229

3330
/// Index of delete files
3431
#[derive(Clone, Debug)]
3532
pub(crate) struct DeleteFileIndex {
36-
state: Arc<RwLock<DeleteFileIndexState>>,
37-
}
38-
39-
#[derive(Debug)]
40-
enum DeleteFileIndexState {
41-
Populating,
42-
Populated(PopulatedDeleteFileIndex),
33+
index: Arc<OnceLock<PopulatedDeleteFileIndex>>,
34+
ready_notify: Arc<Notify>,
4335
}
4436

4537
#[derive(Debug)]
@@ -59,36 +51,50 @@ impl DeleteFileIndex {
5951
pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
6052
// TODO: what should the channel limit be?
6153
let (tx, rx) = channel(10);
62-
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating));
54+
let index = Arc::new(OnceLock::new());
55+
let ready_notify = Arc::new(Notify::new());
6356
let delete_file_stream = rx.boxed();
6457

6558
spawn({
66-
let state = state.clone();
59+
let index = index.clone();
60+
let ready_notify = ready_notify.clone();
6761
async move {
6862
let delete_files = delete_file_stream.collect::<Vec<_>>().await;
6963

7064
let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);
7165

72-
let mut guard = state.write().unwrap();
73-
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
66+
index
67+
.set(populated_delete_file_index)
68+
.expect("delete file index should not be written by another thread");
69+
ready_notify.notify_waiters();
7470
}
7571
});
7672

77-
(DeleteFileIndex { state }, tx)
73+
(
74+
DeleteFileIndex {
75+
index,
76+
ready_notify,
77+
},
78+
tx,
79+
)
7880
}
7981

8082
/// Gets all the delete files that apply to the specified data file.
81-
///
82-
/// Returns a future that resolves to a Result<Vec<FileScanTaskDeleteFile>>
83-
pub(crate) fn get_deletes_for_data_file<'a>(
83+
pub(crate) async fn get_deletes_for_data_file(
8484
&self,
85-
data_file: &'a DataFile,
85+
data_file: &DataFile,
8686
seq_num: Option<i64>,
87-
) -> DeletesForDataFile<'a> {
88-
DeletesForDataFile {
89-
state: self.state.clone(),
90-
data_file,
91-
seq_num,
87+
) -> Result<Vec<FileScanTaskDeleteFile>> {
88+
match self.index.get() {
89+
Some(idx) => Ok(idx.get_deletes_for_data_file(data_file, seq_num)),
90+
None => {
91+
self.ready_notify.notified().await;
92+
Ok(self
93+
.index
94+
.get()
95+
.unwrap()
96+
.get_deletes_for_data_file(data_file, seq_num))
97+
}
9298
}
9399
}
94100
}
@@ -193,26 +199,3 @@ impl PopulatedDeleteFileIndex {
193199
results
194200
}
195201
}
196-
197-
/// Future for the `DeleteFileIndex::get_deletes_for_data_file` method
198-
pub(crate) struct DeletesForDataFile<'a> {
199-
state: Arc<RwLock<DeleteFileIndexState>>,
200-
data_file: &'a DataFile,
201-
seq_num: Option<i64>,
202-
}
203-
204-
impl Future for DeletesForDataFile<'_> {
205-
type Output = Result<Vec<FileScanTaskDeleteFile>>;
206-
207-
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
208-
match self.state.try_read() {
209-
Ok(guard) => match guard.deref() {
210-
DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok(
211-
idx.get_deletes_for_data_file(self.data_file, self.seq_num)
212-
)),
213-
_ => Poll::Pending,
214-
},
215-
Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))),
216-
}
217-
}
218-
}

0 commit comments

Comments
 (0)