Skip to content

Commit ef827f1

Browse files
committed
feat: implement EqDelFuture
1 parent 8e90bdd commit ef827f1

File tree

1 file changed

+28
-4
lines changed

1 file changed

+28
-4
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ use std::collections::HashMap;
1919
use std::future::Future;
2020
use std::ops::BitAndAssign;
2121
use std::pin::Pin;
22-
use std::sync::{Arc, RwLock};
22+
use std::sync::{Arc, OnceLock, RwLock};
2323
use std::task::{Context, Poll};
2424

2525
use futures::channel::oneshot;
26+
use futures::future::join_all;
2627
use futures::{StreamExt, TryStreamExt};
2728
use roaring::RoaringTreemap;
2829

@@ -41,19 +42,32 @@ use crate::{Error, ErrorKind, Result};
4142
// state that can be awaited upon to get te EQ deletes. That way we can check to see if
4243
// a load of each Eq delete file is already in progress and avoid starting another one.
4344
#[derive(Debug, Clone)]
44-
struct EqDelFuture {}
45+
struct EqDelFuture {
46+
result: OnceLock<Predicate>,
47+
}
4548

4649
impl EqDelFuture {
4750
pub fn new() -> (oneshot::Sender<Predicate>, Self) {
48-
todo!()
51+
let (tx, rx) = oneshot::channel();
52+
let result = OnceLock::new();
53+
54+
crate::runtime::spawn({
55+
let result = result.clone();
56+
async move { result.set(rx.await.unwrap()) }
57+
});
58+
59+
(tx, Self { result })
4960
}
5061
}
5162

5263
impl Future for EqDelFuture {
5364
type Output = Predicate;
5465

5566
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
56-
todo!()
67+
match self.result.get() {
68+
None => Poll::Pending,
69+
Some(predicate) => Poll::Ready(predicate.clone()),
70+
}
5771
}
5872
}
5973

@@ -189,6 +203,16 @@ impl DeleteFileManager {
189203
.try_collect::<Vec<_>>()
190204
.await?;
191205

206+
// wait for all in-progress EQ deletes from other tasks
207+
let _ = join_all(results.iter().filter_map(|i| {
208+
if let ParsedDeleteFileContext::InProgEqDel(fut) = i {
209+
Some(fut.clone())
210+
} else {
211+
None
212+
}
213+
}))
214+
.await;
215+
192216
let merged_delete_vectors = results
193217
.into_iter()
194218
.fold(HashMap::default(), Self::merge_delete_vectors);

0 commit comments

Comments
 (0)