feat: introduce pluggable SpillFile trait and TempFileFactory for custom spill backends#21882
feat: introduce pluggable SpillFile trait and TempFileFactory for custom spill backends#21882pantShrey wants to merge 13 commits into
Conversation
|
@alamb I opened this draft PR to get early feedback on the architecture.
I might be missing something here, so would really appreciate your guidance. |
|
Thanks -- will try and look at this shortly |
Kind of, though it seems like accumulating technical debt as we'll have APIs that will not be needed once we complete the work for SortMergeJoin What do you think about making a first PR to migrate SortMergeJoin to use the spill abstraction?
Makes sense to me |
alamb
left a comment
There was a problem hiding this comment.
Thanks @pantShrey - I reviewed this and the basic idea looks good to me. I do think it would be nice to have a unified (async) IO abstraction rather than leaving some hook around for sync IO and making this API more complicated
| used_disk_space: Arc<AtomicU64>, | ||
| /// Number of active temporary files created by this disk manager | ||
| active_files_count: Arc<AtomicUsize>, | ||
| /// Custom Backend |
There was a problem hiding this comment.
A small nit: I think "custom" is a somewhat unecessary term here . Perhaps this
factory: Option<Arc<dyn TempFileFactory>>,or
temp_file_factory: Option<Arc<dyn TempFileFactory>>,would be more consistent with the rest of the codebase
| .collect() | ||
| } | ||
|
|
||
| pub struct OsSpillWriter { |
| /// Writer for spill file backends. | ||
| /// Receives zero-copy `Bytes` payloads from the IPCStreamWriter adapter. | ||
| pub trait SpillWriter: Send { | ||
| fn write(&mut self, data: Bytes) -> Result<()>; |
There was a problem hiding this comment.
This is pretty similar to https://doc.rust-lang.org/std/io/trait.Write.html 🤔
There was a problem hiding this comment.
Yes, you are right. The reason I didn't use Write trait which uses &[u8] was for ownership reasons. Some backends might queue chunks to a background task (e.g., S3 multipart via a channel) and need to hold the data past the write() call's return. &[u8] can't express that, and it would force a second copy between the SpillWriteAdapter and the SpillWriter.
Also, the custom SpillWriter trait contains finish(), which maps perfectly to complete_multipart_upload for S3 and resource owner cleanup for Postgres.
There was a problem hiding this comment.
This is all true -- however, I think that since the underlying IPC writer takes a std::io::Write, forcing all backends to use Bytes will likely require an extra unecessary copy (see comments below on SpillWriterAdapter) anways.
If you use a std::io::write like interface here, backends that want to queue chunks can do so (by copying into Bytes buffers themselves)
Thus what i suggest is:
- Change this to look more like std::io::wrote:
fn write(&mut self, data: &[u8]) -> Result<()>;Which will allow you to get rid of the write adapter
|
@alamb Thank you so much for the review! I scoped out the SortMergeJoin migration today, specifically looking at bitwise_stream.rs and process_key_match_with_filter, to see what it would take. Because SortMergeJoin currently reads from the spill file via a synchronous for loop inside a hand-rolled poll state machine, making the read path truly async requires a major rewrite. We can't just .await the stream, so we may need to store the SendableRecordBatchStream in the execution state and manually persist variables like matched_count across Poll::Pending yields. Because ParadeDB is hoping to unblock their Postgres integration next week, I'm worried a state machine rewrite of this scale will stall them. Would you be open to merging this core abstraction first (with open_sync_reader marked as #[deprecated])? I can open a dedicated tracking issue for the SortMergeJoin async migration and tackle it as a fast follow-up PR. I am happy to defer to your judgment if you feel the tech debt must be addressed first! |
How about we try it in parallel? |
@alamb sure, i have already started to work on that locally while waiting for the response also i am actually still stuck on the The issue stems from the fact that RepartitionMerge now requires more memory than a RepartitionExec node, this greedily allocates memory to RepartitionExec which could have spilled instead of RepartitionMerge which cannot spill. I would really appreciate any guidance on this, am I missing something obvious here? |
Sadly I am not familar with this test so I don't have a lot to offer you Maybe you can look at git history and see who introduced the test and maybe they might have some ideas |
|
Hey @adriangb, Andrew suggested I reach out to you since you originally authored The test is currently stuck in a memory-accounting deadlock. Here’s what is happening:
I was able to trigger a spill once by setting the test memory limit to 608 B, but even that was not sufficient for the test to pass reliably. Is there a correct or idiomatic way to configure this test (batch sizes, data volume, memory pool limits, etc.) to reliably force a I would really appreciate any guidance you could provide. |
|
IIRC that test was added when we added spilling to RepartitionExec. Conceptually the test is simple: if RepartitionExec is configured to preserve order and it spills we need to make sure that spilling did not shuffle the data. The orchestration however is difficult: forcing a RepartitionExec to spill usually requires skewed upstream partition consumption rates. You could try to change the test to eg use a GroupBy or maybe we can use a RepartitionExec in isolation if we pull from the streams in the right way. I think the structure can be changed quite a bit as long as we preserve the semantic meaning of the test, I am not surprised that it is pretty fragile to changes. |
2971e41 to
de6697f
Compare
|
@alamb I’ve addressed the nits and force-pushed the updates. Could you please trigger the CI and take another look when you have a moment? In the meantime, I am working on migrating |
|
@adriangb Thank you so much for the guidance! I updated the test to simply assert that a spill does occur |
That makes sense to me. |
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
|
e31bff4 to
086632a
Compare
|
Hey @alamb, quick update! While working on the I believe the PR is now ready for review, so I've marked it as such. I'd appreciate another look whenever you have the time. Thank you! |
Grerat-- can you please make a PR for just the SMJ refactor and then stack this PR on it? |
|
That will make it easier / faster to review (I am not a SMJ expert so I can't really review that part effiicently) |
915532b to
6954b55
Compare
|
Hey @alamb, quick update! I've reworked both PRs to make them easier to review independently:
The plan is for #22230 to merge first, then I'll rebase this on top of it. Would be grateful if you could take a look whenever you get the chance! |
7632429 to
420d69d
Compare
|
@alamb, extremely sorry for the delay in pushing the latest changes. While waiting on your guidance regarding the RecordBatch abstraction, I started working through the other review comments, but I ended up getting fairly stuck on a design decision around the writer adapter and spent more time on it than I expected. I did make the writer-side changes you suggested: However, removing the adapter exposed two issues.
On the read path, I'd also like to gently push back on returning You're absolutely right that this introduces an extra buffering step compared to I've also started experimenting locally with the RecordBatch-level abstraction you suggested, but I didn't want to go too far without first getting your direction. Given the above, would you prefer that I continue down the RecordBatch route and revisit these APIs as part of that refactor, or should I finish the current approach first and keep the RecordBatch abstraction as a follow-up? |
This makes sense to me |
i suppose we could also just define something that looked like std::io::write but had a different error type 🤮 |
|
run benchmark external_aggr smj sort_tpch spill_io |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing abstract-spill-file (024ec98) to 5fcc550 (merge-base) diff using: external_aggr File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing abstract-spill-file (024ec98) to 5fcc550 (merge-base) diff using: smj File an issue against this benchmark runner |
alamb
left a comment
There was a problem hiding this comment.
I am sorry for the long turnaround time on this one @pantShrey -- but it is a fairly fundamental low level building block and I am trying to make sure it doesn't have any unforseen negative impacts on others downstream.
As this change will require other users to update their code, we should add an upgrade guide entry. I took the liberty of pushing a commit with one
As long as the benchmarks don't show any slowdown I think we are good to go
cc @2010YOUY01 and @mbutrovich -- perhaps you can have a final look at this API to see if it looks reasonable to you
| use datafusion_execution::SendableRecordBatchStream; | ||
| use datafusion_execution::disk_manager::RefCountedTempFile; | ||
| use datafusion_execution::runtime_env::RuntimeEnv; | ||
| use datafusion_execution::spill_file::SpillFile; |
There was a problem hiding this comment.
it is nice that this function basically just changesRefCountedTempFile to Arc<dyn SpillFile> 👍
| let batch = batch.slice(offset, length); | ||
| offset += batch.num_rows(); | ||
| writer.write(&batch)?; | ||
| /// A wrapper that counts the exact compressed IPC bytes written by Arrow. |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing abstract-spill-file (4904cd7) to 5fcc550 (merge-base) diff using: spill_io File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing abstract-spill-file (4904cd7) to 5fcc550 (merge-base) diff using: sort_tpch File an issue against this benchmark runner |
|
The other thing I think that would be helpful in crafting this API is an example of how to provide a user defined spill manager. For example it would be interesting to try and implement one that is backed by an ObjectStore to show how a user could use a remote source as a spill location I will try and make such an example using this API and report back Edit: Update, the PR is here |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_tpch — base (merge-base)
sort_tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesmj — base (merge-base)
smj — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagespill_io — base (merge-base)
spill_io — branch
File an issue against this benchmark runner |
|
🤔 hmm the spill_io benchmark looks pretty bad: #21882 (comment) |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageexternal_aggr — base (merge-base)
external_aggr — branch
File an issue against this benchmark runner |
|
|
||
| /// Factory for creating spill files. | ||
| pub trait TempFileFactory: | ||
| Send + Sync + std::panic::UnwindSafe + std::panic::RefUnwindSafe |
There was a problem hiding this comment.
Why does this need to be std::panic::UnwindSafe + std::panic::RefUnwindSafe? If there is a good reason it should be documented on the trait as well I think
I hit this while working on an example of using this API in
There was a problem hiding this comment.
IIRC, it was a semver failure that forced me to add it.
Because DiskManager and DiskManagerBuilder now hold an Arc<dyn TempFileFactory>, they lost their auto-implemented UnwindSafe and RefUnwindSafe traits. This triggered a cargo-semver-checks failure because existing downstream code relies on DiskManager being unwind-safe:
Checking datafusion-execution v53.1.0 -> v53.1.0 (no change; assume patch)
--- failure auto_trait_impl_removed: auto trait no longer implemented ---
type DiskManagerBuilder is no longer UnwindSafe
type DiskManagerBuilder is no longer RefUnwindSafe
type DiskManager is no longer UnwindSafe
...
I didn't have any other architectural reason for adding the bounds beyond satisfying the checker. Since this PR already requires an upgrade guide, would you prefer I remove these bounds and we just accept the semver break, or should I keep them and add a doc comment explaining why they are there?
There was a problem hiding this comment.
I think we should accept the "semver" break as this is a breaking API change anyways
There was a problem hiding this comment.
sure, I have removed the trait bounds
| } | ||
|
|
||
| /// Writer for spill file backends. | ||
| pub trait SpillWriter: std::io::Write + Send { |
There was a problem hiding this comment.
It was also strange that the SpillWriter is a sync API, but the read stream API is async
fn read_stream(&self) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes>> + Send>>>;I found this while working on an example showing how to write to a remote object store
|
@alamb I spent some time looking into the From what I can tell, increasing the My current understanding is that the previous tokio_util::io::ReaderStream::with_capacity(file, 128 * 1024)I've pushed the change mainly so the CI benchmarks can run and to get your thoughts. If this direction makes sense, or should I make it configurable instead? |
Which issue does this PR close?
Rationale for this change
DataFusion’s spill infrastructure is tightly coupled to OS-level files, with no extension points for alternative storage backends.
DiskManagercannot be customized for file creation, andIPCStreamWriterdepends on OS file paths.This prevents integration in environments where temporary storage must be managed by the host system. For example, Postgres extensions (e.g., ParadeDB) require spill files to go through
BufFileAPIs to respecttemp_tablespaces, enforcetemp_file_limit, and integrate with transaction-scoped cleanup. SinceBufFilehas no OS-visible path, it cannot work with the current design.A secondary motivation raised by @alamb is supporting object storage backends (S3, GCS) for spilling, which require async IO and cannot use
std::io::Writeorstd::io::Read.What changes are included in this PR?
SpillFile,SpillWriter, andTempFileFactorytraits to abstract spill file handlingDiskManagerMode::Customto allow pluggable backendsDiskManagerto returnArc<dyn SpillFile>instead of OS-bound typesSpillWriteAdapterto bridge sync Arrow writers with backend-agnostic writersStream<Item = Result<Bytes>>) instead of blocking state machinesArc<dyn SpillFile>Are these changes tested?
Yes. Existing spill tests cover the full read/write flow.
test_disk_usage_decreases_as_files_consumedby correcting a pre-existing off-by-one assumption in file rotationtest_preserve_order_with_spillingby just asserting spilling occurs (spill_count>0) and output batches are sortedAre there any user-facing changes?
Yes this introduces API changes:
Arc<dyn SpillFile>instead ofRefCountedTempFileSpillFile,SpillWriter,TempFileFactoryDiskManagerMode::Customfor custom backendsCustom spill backends can now be implemented and plugged in via
DiskManager.