Skip to content

Conversation

@nameexhaustion
Copy link
Collaborator

@nameexhaustion nameexhaustion commented Dec 4, 2025

This PR introduces a new streaming partitioned IO sink pipeline. It is enabled by default when using pl.PartitionByKey and pl.PartitionMaxSize with sink_parquet (with the exception of some unsupported parameters).

Benchmarks

  • Up to 3.7x runtime speedup, 1.57% peak memory usage

Test script

# Data generation
# pl.select(
#     int=pl.int_range(0, 3 * 1024 * 1024 * 1024, dtype=pl.Int64) // (1024 * 1024)
# ).write_parquet("/Users/nxs/git/polars/.env/data.parquet")

pl.scan_parquet("/Users/nxs/git/polars/.env/data.parquet").sink_parquet(
    pl.PartitionByKey("/Users/nxs/git/polars/.env/_data_out/partitioned/", by="int"),
    mkdir=True,
)

Benchmark comparison

image

Logs - Before

[partition[by-key]]: Start on new file '/Users/nxs/git/polars/.env/_data_out/partitioned/int=127/0.parquet'
[partition[by-key]]: Reached maximum open partitions. Buffering the rest to memory before writing.

Logs - After

io-sink[partition-keyed[parquet]]: PartitionDistributor: Join tasks
io-sink[partition-keyed[parquet]]: Statistics:
    num_partitions: 3072,
    total_size: RowCountAndSize { num_rows: 3221225472, num_bytes: 25769803776 },
    finalize_flush_size: RowCountAndSize { num_rows: 201326592, num_bytes: 1610612736 } (6.250% total rows, 6.250% total bytes),
    total_sink_opens: 6016,
    forced_sink_closes: 2944 (48.936% total, 100.000% max)

*finalize_flush_size: How much data was flushed from memory during finalize (lower is better)
*total_sink_opens: Total number of files opened.
*forced_sink_closes: Number of file closes performed to reclaim a file permit for opening a new file

Implementation overview

Files under components/:

  • partition_distributor.rs:
    • Receives partitioned morsels from partitioned_pipeline.rs and stores state for each partition:
      • Buffered rows
      • File sink state (open file for that partition)
    • Sends rows (morsels) to the file sink associated with a partition once enough rows are buffered (via partition_morsel_sender)
    • Opens / closes file sinks for partitions:
      • Adapts to newly discovered partitions in high-cardinality keys by closing existing files to reclaim permits
  • partition_morsel_sender.rs:
    • Splits buffered rows and sends them to the file sink for a partition
    • Closes and opens a new file (within the partition) where necessary if a file size limit is hit
  • partition_sink_starter.rs
  • partitioner_pipeline.rs:
    • Partitions incoming morsels by key using partitioner.rs (can also pass-through in the case of PartitionStrategy::FileSize

pipeline_initialization/partition_by.rs contains logic on initializing and connecting the above components

Compatibility

Note that some parameters are not yet supported and will fall back to the existing sink pipeline:

  • file_path_cb
    • Will be supported soon with an updated interface
  • per_partition_sort_by
    • Will be supported soon
  • finish_callback
    • Will be removed soon

Todo

  • Test coverage for adaptive file closing logic (see partition_distributor.rs L156 on codecov), this can be tested once we have the updated pl.PartitionBy API that lets us specify max_rows_per_file for a keyed partition strategy.

@github-actions github-actions bot added A-io-parquet Area: reading/writing Parquet files enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars labels Dec 4, 2025
@codecov
Copy link

codecov bot commented Dec 4, 2025

Codecov Report

❌ Patch coverage is 86.12303% with 97 lines in your changes missing coverage. Please review.
✅ Project coverage is 79.63%. Comparing base (b7c28d8) to head (11b0e5c).

Files with missing lines Patch % Lines
...odes/io_sinks2/components/partition_distributor.rs 78.07% 50 Missing ⚠️
...es/io_sinks2/components/partition_morsel_sender.rs 87.59% 16 Missing ⚠️
...stream/src/nodes/io_sinks2/components/par_utils.rs 0.00% 12 Missing ⚠️
...-stream/src/nodes/io_sinks2/components/arg_sort.rs 0.00% 6 Missing ⚠️
.../io_sinks2/pipeline_initialization/partition_by.rs 95.83% 5 Missing ⚠️
crates/polars-stream/src/physical_plan/to_graph.rs 71.42% 4 Missing ⚠️
...ream/src/nodes/io_sinks2/components/partitioner.rs 98.03% 2 Missing ⚠️
...nodes/io_sinks2/components/partitioner_pipeline.rs 96.00% 1 Missing ⚠️
crates/polars-stream/src/nodes/io_sinks2/mod.rs 93.75% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #25629      +/-   ##
==========================================
- Coverage   79.67%   79.63%   -0.04%     
==========================================
  Files        1743     1751       +8     
  Lines      240288   240969     +681     
  Branches     3038     3038              
==========================================
+ Hits       191442   191890     +448     
- Misses      48063    48296     +233     
  Partials      783      783              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@nameexhaustion nameexhaustion force-pushed the nxs/io-partitioned-sink branch 3 times, most recently from 5adb8af to ed62325 Compare December 4, 2025 19:36
@nameexhaustion nameexhaustion changed the title feat: New IO partitioned sink pipeline enabled for sink_parquet feat: New partitioned IO sink pipeline enabled for sink_parquet Dec 4, 2025
@nameexhaustion nameexhaustion force-pushed the nxs/io-partitioned-sink branch from ed62325 to acc8390 Compare December 4, 2025 21:33
@nameexhaustion nameexhaustion marked this pull request as ready for review December 4, 2025 21:48
@nameexhaustion nameexhaustion force-pushed the nxs/io-partitioned-sink branch from acc8390 to 0b6f7c4 Compare December 4, 2025 22:08
@nameexhaustion nameexhaustion force-pushed the nxs/io-partitioned-sink branch from 0b6f7c4 to 11b0e5c Compare December 4, 2025 22:21
@ritchie46 ritchie46 changed the title feat: New partitioned IO sink pipeline enabled for sink_parquet perf: New partitioned IO sink pipeline enabled for sink_parquet Dec 5, 2025
@github-actions github-actions bot added the performance Performance issues or improvements label Dec 5, 2025
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
slotmap = { workspace = true }
smallvec = { workspace = true }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this dependency worth it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do better actually

@nameexhaustion nameexhaustion marked this pull request as draft December 5, 2025 14:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-io-parquet Area: reading/writing Parquet files enhancement New feature or an improvement of an existing feature performance Performance issues or improvements python Related to Python Polars rust Related to Rust Polars

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants