Skip to content

Commit 908ad09

Browse files
Fokkosungwy
authored andcommitted
fix: Reading a table with positional deletes should fail (#826)
* A table with positional deletes shoulds fail * Add possible fix * Comment and refactor
1 parent 74a85e7 commit 908ad09

File tree

4 files changed

+44
-11
lines changed

4 files changed

+44
-11
lines changed

Diff for: crates/iceberg/src/scan.rs

+14-8
Original file line numberDiff line numberDiff line change
@@ -364,9 +364,8 @@ impl TableScan {
364364

365365
let manifest_list = self.plan_context.get_manifest_list().await?;
366366

367-
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
368-
// whose content type is not Data or whose partitions cannot match this
369-
// scan's filter
367+
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out
368+
// partitions cannot match the scan's filter
370369
let manifest_file_contexts = self
371370
.plan_context
372371
.build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?;
@@ -619,15 +618,22 @@ impl PlanContext {
619618
manifest_list: Arc<ManifestList>,
620619
sender: Sender<ManifestEntryContext>,
621620
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
622-
let filtered_entries = manifest_list
623-
.entries()
621+
let entries = manifest_list.entries();
622+
623+
if entries
624624
.iter()
625-
.filter(|manifest_file| manifest_file.content == ManifestContentType::Data);
625+
.any(|e| e.content != ManifestContentType::Data)
626+
{
627+
return Err(Error::new(
628+
ErrorKind::FeatureUnsupported,
629+
"Merge-on-read is not yet supported",
630+
));
631+
}
626632

627633
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
628634
let mut filtered_mfcs = vec![];
629635
if self.predicate.is_some() {
630-
for manifest_file in filtered_entries {
636+
for manifest_file in entries {
631637
let partition_bound_predicate = self.get_partition_filter(manifest_file)?;
632638

633639
// evaluate the ManifestFile against the partition filter. Skip
@@ -649,7 +655,7 @@ impl PlanContext {
649655
}
650656
}
651657
} else {
652-
for manifest_file in filtered_entries {
658+
for manifest_file in entries {
653659
let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone());
654660
filtered_mfcs.push(Ok(mfc));
655661
}

Diff for: crates/integration_tests/testdata/spark/entrypoint.sh

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
# under the License.
1919
#
2020

21+
set -e
22+
2123
start-master.sh -p 7077
2224
start-worker.sh spark://spark-iceberg:7077
2325
start-history-server.sh

Diff for: crates/integration_tests/testdata/spark/provision.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,17 @@
1818
from pyspark.sql import SparkSession
1919
from pyspark.sql.functions import current_date, date_add, expr
2020

21-
spark = SparkSession.builder.getOrCreate()
21+
# The configuration is important, otherwise we get many small
22+
# parquet files with a single row. When a positional delete
23+
# hits the Parquet file with one row, the parquet file gets
24+
# dropped instead of having a merge-on-read delete file.
25+
spark = (
26+
SparkSession
27+
.builder
28+
.config("spark.sql.shuffle.partitions", "1")
29+
.config("spark.default.parallelism", "1")
30+
.getOrCreate()
31+
)
2232

2333
spark.sql(
2434
f"""

Diff for: crates/integration_tests/tests/read_positional_deletes.rs

+17-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Integration tests for rest catalog.
1919
20+
use iceberg::ErrorKind::FeatureUnsupported;
2021
use iceberg::{Catalog, TableIdent};
2122
use iceberg_integration_tests::set_test_fixture;
2223

@@ -34,6 +35,20 @@ async fn test_read_table_with_positional_deletes() {
3435
.await
3536
.unwrap();
3637

37-
// 😱 If we don't support positional deletes, we should not be able to plan them
38-
println!("{:?}", table.scan().build().unwrap());
38+
let scan = table.scan().build().unwrap();
39+
println!("{:?}", scan);
40+
41+
assert!(scan
42+
.to_arrow()
43+
.await
44+
.is_err_and(|e| e.kind() == FeatureUnsupported));
45+
46+
// 😱 If we don't support positional deletes, we should fail when we try to read a table that
47+
// has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py
48+
49+
// When we get support for it:
50+
// let batch_stream = scan.to_arrow().await.unwrap();
51+
// let batches: Vec<_> = batch_stream.try_collect().await.is_err();
52+
// let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
53+
// assert_eq!(num_rows, 10);
3954
}

0 commit comments

Comments
 (0)