-
Notifications
You must be signed in to change notification settings - Fork 257
Scan Delete Support Part 4: Delete File Loading; Skeleton for Processing #982
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
edb1d27
to
8e90bdd
Compare
ec8e7c1
to
06f0df5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice, will look at the parsed records next.
5530bc3
to
e997fc6
Compare
@liurenjie1024, @Xuanwo, @Fokko - this is ready for re-review, if you could take a look that would be great! |
e997fc6
to
056e73f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sdd for this pr. There are some missing points in current design. Also I would suggest not putting too much in DeleteFilterManager
. I suppose DeleterFilterManager
acting more like a delete loader, which manages the io and caching of record batch. The actual filtering part, could delegate to DeleteFilter
, WDYT? I think a good reference implementation is java's DeleteFilter, see https://github.com/apache/iceberg/blob/af8e3f5a40f4f36bbe1d868146749e2341471586/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java#L50
Thanks for the review @liurenjie1024 - much appreciated. Will come back with a revised design. |
bd33aa5
to
39a26ab
Compare
5739a46
to
52cf8b9
Compare
Sorry for late reply. I'm fine with deferring performance improvement, but have concerns with correctness problem. |
Hi, @sdd I saw you opened a series of pr for handling reading of deletions. I have a suggestion about it, instead of opening a series prs, you can have a large draft pr containing all your changes, while pick one component to open a small pr for review. This way reviewer could understand your whole design by walking through the large pr, and review carefully small pr. Also, when reviewer have comments, you only need to change one large pr instead of several small ones, WDYT? |
a227fa7
to
e1edc95
Compare
I've refactored as you suggested and you're right, it is a better design. |
e1edc95
to
324a872
Compare
I Could do, if you think it's worth it - the other two remaining PRs after this one are much smaller and it feels like just as much work to merge those into a single PR and then break PRs out of that again as it does to simply keep rebasing those two PRs on top of this one. I'll be very glad once this delete file read support is done - it's been a long, hard slog to be honest and I'm struggling to stay motivated with it, but we're not far off now, hopefully. |
69b4da9
to
3804bda
Compare
af14046
to
5f0b073
Compare
I had a bug in here that was causing the tests to deadlock in the follow-up PRs. I was missing a waker for my custom futures.That's been rectified now and this PR plus the two follow-ups are now ready for review once more. |
@liurenjie1024, @Xuanwo - could do with a review again when you get chance! Thanks :-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sdd , it looks better now, left some questions to refine.
CachingDeleteFileLoader::parquet_to_batch_stream(&task.file_path, self.file_io.clone()) | ||
.await?; | ||
|
||
Self::evolve_schema(raw_batch_stream, schema).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to have a reader api so that we could we don't need to repead this part everywhere, but this could be left to later.
/// Build the ArrowReader. | ||
pub fn build(self) -> ArrowReader { | ||
ArrowReader { | ||
batch_size: self.batch_size, | ||
file_io: self.file_io.clone(), | ||
delete_file_manager: CachingDeleteFileManager::new( | ||
delete_file_loader: CachingDeleteFileLoader::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not store a loader, instead we should store a DeleteFilter
. Wha't should be called in ArrowReader
should be things like following:
deleteFilter.filter(recordBatchStream)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're not making best use of the features that ArrowRecordBatchReader provides us if we do that. We can't implement the delete filter as a simple filter on a RecordBatchStream unless we ditch using parquet's ParquetRecordBatchStream
.
We're using parquet
's ParquetRecordBatchStream
to do the predicate filtering before we even get access to the RecordBatchStream
. So by the time we have a stream of RecordBatches, we can't apply positional deletes or delete vectors because we no longer know what row number in the original file a record batches row corresponds to, as some rows can have been filtered out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be able to update ArrowReader to store a DeleteFileFilter rather than a DeleteFileLoader. But not to change the semantics of how the filter itself is used to match deleteFilter.filter(recordBatchStream)
. Not without rewriting the ArrowReader entirely not use ParquetRecordBatchStream
and as a consequence needing to reimplement all the predicate filtering logic, row selection, projection, page skipping, and row group skipping that it gives us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I got your point, and you are right maybe java's interface is not best for rust implementation. I think it's fine that we move forward first and refactor later, as long as we don't expose public apis.
@@ -207,7 +233,8 @@ impl ArrowReader { | |||
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); | |||
} | |||
|
|||
let delete_predicate = delete_file_manager.build_delete_predicate(task.schema.clone())?; | |||
let delete_filter = delete_filter_rx.await.unwrap()?; | |||
let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should hide these building of filter under DeleterFilter
, rather than calling them directly in ArrowReader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please can you explain a bit more? I don't understand what you're asking for here. As per the previous comment, we're using ParquetRecordBatchStream to do most of the heavy lifting for us on a number of features. This means that the design of the DeleteFilter can't be as simple as just exposing a method that filters a recordbatch stream.
|
||
impl DeleteFilter { | ||
/// Retrieve a delete vector for the data file associated with a given file scan task | ||
pub fn get_delete_vector( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's odd to expose a public api to construct filters. For DeleteFilter
, we should only expose one public api: pub fn filter(input: ArrowRecordBatchStream)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made all of DeleteFilter
to be pub(crate)
now then in that case. As per previous replies, pub fn filter(input: ArrowRecordBatchStream)
does not make sense for usage with ArrowReader
. If we also want to expose a public API for filtering that does implement that interface, we can add that in a follow-up PR.
It's just a minor suggestion, just do it with your favourite approach. |
@liurenjie1024 back to you. I've addressed your suggestions that I think make sense to immediately change. I look forward to hearing back from you on the other points! :-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Build the ArrowReader. | ||
pub fn build(self) -> ArrowReader { | ||
ArrowReader { | ||
batch_size: self.batch_size, | ||
file_io: self.file_io.clone(), | ||
delete_file_manager: CachingDeleteFileManager::new( | ||
delete_file_loader: CachingDeleteFileLoader::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I got your point, and you are right maybe java's interface is not best for rust implementation. I think it's fine that we move forward first and refactor later, as long as we don't expose public apis.
…DeleteVector::intersect_assign pub(crate)
…our of it being always on
b0eaa66
to
94d9307
Compare
/// as per the other delete file types - only this time it is accompanied by a one-shot | ||
/// channel sender that we will eventually use to resolve the shared future that we stored | ||
/// in the state. | ||
/// * When this gets updated to add support for delete vectors, the load phase will return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking forward to the puffin / deletion vector support!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Me too! 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sdd for this pr!
Let's wait for a moment to merge it after 0.5.0 release |
Extends the
DeleteFileManager
introduced in #950 To include loading of delete files, storage and retrieval of parsed delete files from shared state, and the outline for how parsing will connect up to this new work.Issue: #630