-
Notifications
You must be signed in to change notification settings - Fork 701
feat(iceberg): stream batch iceberg #23527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…utor for Iceberg source handling - Introduced `BatchIcebergListExecutor` to manage listing Iceberg files and handle barriers. - Added `BatchIcebergFetchExecutor` for fetching data from Iceberg sources. - Updated module exports to include new executors for Iceberg integration.
…tExecutor - Added `execute` method to `BatchIcebergListExecutor` for stream processing. - Implemented `Debug` trait for better logging and inspection of `BatchIcebergListExecutor` state.
- Introduced `SourceRefreshMode` to define refresh strategies for sources. - Updated `Source` message in `catalog.proto` and `StreamSource` in `stream_plan.proto` to include `refresh_mode`. - Implemented utility functions to resolve refresh mode from options. - Added migration for the new `refresh_mode` column in the source table. - Updated relevant data structures and handlers to support the new refresh mode functionality.
- Added a metrics field to `BatchIcebergListExecutor` for monitoring purposes. - Updated the constructor to accept and initialize the metrics parameter. - Adjusted the source executor builder to utilize `BatchIcebergListExecutor` when manual trigger refresh mode is detected.
- Added `refresh_mode` field to `StreamFsFetch` message in `stream_plan.proto`. - Updated `StreamFsFetch` implementation to include `refresh_mode` in the node properties. - Integrated `is_manual_trigger_refresh` utility function to determine refresh behavior in `FsFetchExecutorBuilder`. - Adjusted logic in `SourceExecutorBuilder` to utilize the new refresh mode functionality.
- Made `ChunksWithState` struct fields public for better accessibility. - Implemented the `BatchIcebergFetchExecutor` to manage fetching data from Iceberg sources, including handling barriers and stream processing. - Added logic for managing file reading and state transitions during data fetching. - Integrated error handling and logging for improved debugging and monitoring.
… executors - Added support for handling delete files in `scan_task_to_chunk_with_deletes` for Iceberg data processing. - Introduced `handle_delete_files` option in `IcebergScanOpts` to control delete file processing. - Updated `BatchIcebergFetchExecutor` to enable delete file handling for streaming sources. - Maintained backward compatibility with a legacy scan function that delegates to the enhanced version.
edebdd2 to
16a7f3d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for stream batch Iceberg sources by introducing a refresh_mode field to the source catalog. The refresh mode determines whether a source operates in streaming mode or manual trigger (batch) mode.
Key changes:
- Added
refresh_modefield to source catalog and protobuf definitions - Implemented batch Iceberg list and fetch executors for manual trigger refresh mode
- Enhanced Iceberg scan to handle delete files (position and equality deletes)
Reviewed Changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/stream/src/from_proto/source/trad_source.rs |
Added logic to select BatchIcebergListExecutor based on manual trigger refresh mode |
src/stream/src/from_proto/source/mod.rs |
Added helper function to check if source uses manual trigger refresh |
src/stream/src/from_proto/source/fs_fetch.rs |
Added placeholder for batch Iceberg fetch executor in manual trigger mode |
src/stream/src/executor/source/iceberg_fetch_executor.rs |
Made internal structs public and added delete file handling flag |
src/stream/src/executor/source/batch_source/mod.rs |
Exported new batch Iceberg list and fetch modules |
src/stream/src/executor/source/batch_source/batch_posix_fs_list.rs |
Fixed error handling to return error instead of just logging |
src/stream/src/executor/source/batch_source/batch_iceberg_list.rs |
New executor for listing Iceberg files in batch/manual trigger mode |
src/stream/src/executor/source/batch_source/batch_iceberg_fetch.rs |
New executor for fetching Iceberg data in batch/manual trigger mode |
src/prost/build.rs |
Added derive attributes for SourceRefreshMode protobuf types |
src/meta/src/controller/mod.rs |
Added refresh_mode field to source protobuf conversion |
src/meta/model/src/source.rs |
Added refresh_mode field to source model |
src/meta/model/src/lib.rs |
Added SourceRefreshMode blob type derivation |
src/meta/model/migration/src/m20251022_294610_source_refresh_mode.rs |
Database migration to add refresh_mode column to source table |
src/meta/model/migration/src/lib.rs |
Registered new migration |
src/frontend/src/utils/with_options.rs |
Added function to parse and validate refresh_mode from WITH options |
src/frontend/src/optimizer/plan_node/stream_source.rs |
Added refresh_mode to source node protobuf |
src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs |
Added refresh_mode to fetch node protobuf |
src/frontend/src/handler/create_source.rs |
Integrated refresh_mode resolution during source creation |
src/frontend/src/catalog/source_catalog.rs |
Added refresh_mode field to source catalog |
src/connector/src/source/iceberg/mod.rs |
Enhanced scan function to handle position and equality delete files |
src/batch/executors/src/executor/iceberg_scan.rs |
Set delete file handling flag to false for batch executor |
proto/stream_plan.proto |
Added refresh_mode field to StreamSource and StreamFsFetch messages |
proto/plan_common.proto |
Defined SourceRefreshMode message with streaming and manual_trigger variants |
proto/catalog.proto |
Added refresh_mode field to Source message |
src/stream/src/executor/source/batch_source/batch_iceberg_list.rs
Outdated
Show resolved
Hide resolved
src/stream/src/executor/source/batch_source/batch_iceberg_list.rs
Outdated
Show resolved
Hide resolved
- Refactored error messages in `bind_connector_props` to use `to_owned()` for better clarity. - Enhanced column pruning logic in `LogicalSource` to handle refreshable Iceberg sources more effectively. - Removed unused row ID handling in `BatchIcebergFetchExecutor` to streamline the code. - Added logging for file assignments and batch reader construction in `BatchIcebergFetchExecutor` for improved monitoring.
- Updated `create_iceberg_engine_table` to return refresh mode alongside properties from `bind_connector_props`. - Removed outdated comments regarding row ID handling for refreshable Iceberg tables in `LogicalSource` to improve clarity.
…ch executors - Improved handling of delete files in `scan_task_to_chunk_with_deletes`, ensuring original task integrity while processing deletes. - Introduced detailed logging for delete file processing, including position and equality deletes. - Updated `BatchIcebergListExecutor` to enable delete file processing in snapshot scans. - Refactored `BatchIcebergFetchExecutor` to use chunk capacity for insert operations, ensuring accurate chunk handling.
…essing - Updated logging statements in `scan_task_to_chunk_with_deletes` and related functions to use `tracing::debug!` instead of `tracing::info!` for improved log verbosity control. - Cleaned up unused imports and comments in various files to enhance code clarity and maintainability.
…on that handles delete files Updated the Iceberg scan functionality to utilize `scan_task_to_chunk_with_deletes` instead of the deprecated `scan_task_to_chunk`. This change enhances the handling of delete files during the scan process across multiple executors.
Introduced a new test case to validate the refresh functionality of Iceberg tables. The test covers creating a table, inserting data, performing deletes, and ensuring the refresh operation correctly updates the data in the batch table. This enhances the test coverage for Iceberg operations.
|
#23601 will help resolve multiple triggering load-finish issue |
| let matched = file_path == data_file_path | ||
| || data_file_path.ends_with(file_path) | ||
| || file_path.ends_with(&data_file_path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we just need file_path == data_file_path
| use std::collections::BTreeSet; | ||
|
|
||
| use risingwave_common::array::arrow::arrow_array_iceberg::Array; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move these use out this method?
| if matched_positions <= 5 { | ||
| tracing::debug!( | ||
| "Position delete match: file_path='{}' matches data_file='{}', pos={}", | ||
| file_path, | ||
| data_file_path, | ||
| pos | ||
| ); | ||
| } | ||
| } else if idx < 3 { | ||
| tracing::debug!( | ||
| "Position delete NO match [row {}]: file_path='{}' vs data_file='{}'", | ||
| idx, | ||
| file_path, | ||
| data_file_path | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this?
| if row_idx < 3 { | ||
| // Log first 3 data row keys for comparison | ||
| tracing::debug!( | ||
| "Data row key example [row {}]: {:?}", | ||
| row_idx, | ||
| row_key | ||
| ); | ||
| } | ||
|
|
||
| if delete_key_set.contains(&row_key) { | ||
| *item = false; | ||
| deleted_count += 1; | ||
| if deleted_count <= 5 { | ||
| // Log first 5 matched deletions | ||
| tracing::debug!( | ||
| "Row {} matched delete key: {:?}", | ||
| row_idx, | ||
| row_key | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log is weird 🥵
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
following #23491 , change source catalog accordingly
Checklist
Documentation
Release note
RisingWave Release Notes — Iceberg Manual Refresh (User-facing)
On-demand refresh for Iceberg batch tables
refresh_mode = 'MANUAL_TRIGGER'forconnector = 'iceberg'tables.REFRESH TABLE <table_name>;to pull the latest Iceberg snapshot on demand.Accurate delete handling
rw_iceberg_filessystem catalog.Great for
Quick start
refresh_modeis set.