Skip to content

fix: delete file lost wake #1323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 33 additions & 50 deletions crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,22 @@
// under the License.

use std::collections::HashMap;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::sync::{Arc, OnceLock};

use futures::StreamExt;
use futures::channel::mpsc::{Sender, channel};
use tokio::sync::Notify;

use crate::Result;
use crate::runtime::spawn;
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
use crate::spec::{DataContentType, DataFile, Struct};
use crate::{Error, ErrorKind, Result};

/// Index of delete files
#[derive(Clone, Debug)]
pub(crate) struct DeleteFileIndex {
state: Arc<RwLock<DeleteFileIndexState>>,
}

#[derive(Debug)]
enum DeleteFileIndexState {
Populating,
Populated(PopulatedDeleteFileIndex),
index: Arc<OnceLock<PopulatedDeleteFileIndex>>,
ready_notify: Arc<Notify>,
}

#[derive(Debug)]
Expand All @@ -59,36 +51,50 @@ impl DeleteFileIndex {
pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
// TODO: what should the channel limit be?
let (tx, rx) = channel(10);
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating));
let index = Arc::new(OnceLock::new());
let ready_notify = Arc::new(Notify::new());
let delete_file_stream = rx.boxed();

spawn({
let state = state.clone();
let index = index.clone();
let ready_notify = ready_notify.clone();
async move {
let delete_files = delete_file_stream.collect::<Vec<_>>().await;

let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);

let mut guard = state.write().unwrap();
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
index
.set(populated_delete_file_index)
.expect("delete file index should not be written by another thread");
ready_notify.notify_waiters();
}
});

(DeleteFileIndex { state }, tx)
(
DeleteFileIndex {
index,
ready_notify,
},
tx,
)
}

/// Gets all the delete files that apply to the specified data file.
///
/// Returns a future that resolves to a Result<Vec<FileScanTaskDeleteFile>>
pub(crate) fn get_deletes_for_data_file<'a>(
pub(crate) async fn get_deletes_for_data_file(
&self,
data_file: &'a DataFile,
data_file: &DataFile,
seq_num: Option<i64>,
) -> DeletesForDataFile<'a> {
DeletesForDataFile {
state: self.state.clone(),
data_file,
seq_num,
) -> Result<Vec<FileScanTaskDeleteFile>> {
match self.index.get() {
Some(idx) => Ok(idx.get_deletes_for_data_file(data_file, seq_num)),
None => {
self.ready_notify.notified().await;
Ok(self
.index
.get()
.unwrap()
.get_deletes_for_data_file(data_file, seq_num))
}
}
}
}
Expand Down Expand Up @@ -193,26 +199,3 @@ impl PopulatedDeleteFileIndex {
results
}
}

/// Future for the `DeleteFileIndex::get_deletes_for_data_file` method
pub(crate) struct DeletesForDataFile<'a> {
state: Arc<RwLock<DeleteFileIndexState>>,
data_file: &'a DataFile,
seq_num: Option<i64>,
}

impl Future for DeletesForDataFile<'_> {
type Output = Result<Vec<FileScanTaskDeleteFile>>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.state.try_read() {
Ok(guard) => match guard.deref() {
DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok(
idx.get_deletes_for_data_file(self.data_file, self.seq_num)
)),
_ => Poll::Pending,
},
Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))),
Comment on lines -208 to -215
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have 2 possible problems here:

  1. try_read returns TryLockError::WouldBlock, which should not be an error, but Pending.
  2. More severely, we don't have any Waker set, so once Pending, the future will sleep forever.

Therefore, I think it's better to replace manual Future implementation with other sync primitives.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've addressed the missing Waker in my open PR in my open PR: 5f0b073

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which PR?

BTW, it seems in this commit the TryLock problem is not resolved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think manual Future implementation should be discouraged in favor of async fn with synchronisation primitives.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's in #982

}
}
}
Loading