diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 3f25bbda3..3e59cbb2e 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -16,30 +16,22 @@ // under the License. use std::collections::HashMap; -use std::future::Future; -use std::ops::Deref; -use std::pin::Pin; -use std::sync::{Arc, RwLock}; -use std::task::{Context, Poll}; +use std::sync::{Arc, OnceLock}; use futures::StreamExt; use futures::channel::mpsc::{Sender, channel}; +use tokio::sync::Notify; +use crate::Result; use crate::runtime::spawn; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; -use crate::{Error, ErrorKind, Result}; /// Index of delete files #[derive(Clone, Debug)] pub(crate) struct DeleteFileIndex { - state: Arc>, -} - -#[derive(Debug)] -enum DeleteFileIndexState { - Populating, - Populated(PopulatedDeleteFileIndex), + index: Arc>, + ready_notify: Arc, } #[derive(Debug)] @@ -59,36 +51,50 @@ impl DeleteFileIndex { pub(crate) fn new() -> (DeleteFileIndex, Sender) { // TODO: what should the channel limit be? let (tx, rx) = channel(10); - let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating)); + let index = Arc::new(OnceLock::new()); + let ready_notify = Arc::new(Notify::new()); let delete_file_stream = rx.boxed(); spawn({ - let state = state.clone(); + let index = index.clone(); + let ready_notify = ready_notify.clone(); async move { let delete_files = delete_file_stream.collect::>().await; let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files); - let mut guard = state.write().unwrap(); - *guard = DeleteFileIndexState::Populated(populated_delete_file_index); + index + .set(populated_delete_file_index) + .expect("delete file index should not be written by another thread"); + ready_notify.notify_waiters(); } }); - (DeleteFileIndex { state }, tx) + ( + DeleteFileIndex { + index, + ready_notify, + }, + tx, + ) } /// Gets all the delete files that apply to the specified data file. - /// - /// Returns a future that resolves to a Result> - pub(crate) fn get_deletes_for_data_file<'a>( + pub(crate) async fn get_deletes_for_data_file( &self, - data_file: &'a DataFile, + data_file: &DataFile, seq_num: Option, - ) -> DeletesForDataFile<'a> { - DeletesForDataFile { - state: self.state.clone(), - data_file, - seq_num, + ) -> Result> { + match self.index.get() { + Some(idx) => Ok(idx.get_deletes_for_data_file(data_file, seq_num)), + None => { + self.ready_notify.notified().await; + Ok(self + .index + .get() + .unwrap() + .get_deletes_for_data_file(data_file, seq_num)) + } } } } @@ -193,26 +199,3 @@ impl PopulatedDeleteFileIndex { results } } - -/// Future for the `DeleteFileIndex::get_deletes_for_data_file` method -pub(crate) struct DeletesForDataFile<'a> { - state: Arc>, - data_file: &'a DataFile, - seq_num: Option, -} - -impl Future for DeletesForDataFile<'_> { - type Output = Result>; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - match self.state.try_read() { - Ok(guard) => match guard.deref() { - DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok( - idx.get_deletes_for_data_file(self.data_file, self.seq_num) - )), - _ => Poll::Pending, - }, - Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))), - } - } -}