diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 439358435c..e49697e599 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -673,7 +673,9 @@ mod test { use crate::arrow::record_batch_transformer::{ RecordBatchTransformer, RecordBatchTransformerBuilder, }; - use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, Type}; + use crate::spec::{ + ListType, Literal, MapType, NestedField, PrimitiveType, Schema, Struct, Type, + }; /// Helper to extract string values from either StringArray or RunEndEncoded /// Returns empty string for null values @@ -921,6 +923,150 @@ mod test { assert!(struct_column.is_null(2)); } + /// Evolved table schema for the #2618 regression test: `id` plus three + /// later-added optional nested columns — a list, a map, and a struct that + /// itself contains a nested list (`ys`). The nested-in-struct list is the + /// case a per-type NULL-fill would miss. + fn schema_with_added_nested_columns() -> Schema { + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional( + 2, + "xs", + Type::List(ListType { + element_field: NestedField::list_element( + 3, + Type::Primitive(PrimitiveType::Int), + false, + ) + .into(), + }), + ) + .into(), + NestedField::optional( + 4, + "props", + Type::Map(MapType { + key_field: NestedField::map_key_element( + 5, + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 6, + Type::Primitive(PrimitiveType::Int), + false, + ) + .into(), + }), + ) + .into(), + NestedField::optional( + 7, + "s", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::optional(8, "a", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::optional( + 9, + "ys", + Type::List(ListType { + element_field: NestedField::list_element( + 10, + Type::Primitive(PrimitiveType::Long), + false, + ) + .into(), + }), + ) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap() + } + + #[test] + fn schema_evolution_adds_list_map_and_nested_struct_columns_with_nulls() { + // Regression test for https://github.com/apache/iceberg-rust/issues/2618. + // + // The story the test tells, in order: + // 1. An old data file was written with only the `id` column. + // 2. The table schema has since evolved, adding optional list / map / + // struct columns (see `schema_with_added_nested_columns`). + // 3. Reading the old file against the evolved schema must fill those + // absent columns with typed all-NULL arrays — previously this errored + // with "unexpected target column type" for the nested types. + + // (1) The old data file: just `id`. + let file_schema = Arc::new(ArrowSchema::new(vec![simple_field( + "id", + DataType::Int32, + false, + "1", + )])); + let file_batch = + RecordBatch::try_new(file_schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]) + .unwrap(); + + // (2) Read it against the evolved schema, projecting id + the three added columns. + let snapshot_schema = Arc::new(schema_with_added_nested_columns()); + let projected_iceberg_field_ids = [1, 2, 4, 7]; + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids) + .build(); + let result = transformer.process_record_batch(file_batch).unwrap(); + + // (3a) `id` survives unchanged. + assert_eq!(result.num_columns(), 4); + assert_eq!(result.num_rows(), 3); + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_column.values(), &[1, 2, 3]); + + // (3b) The added columns carry the evolved schema's Arrow types and are all-NULL. + assert!(matches!( + result.schema().field(1).data_type(), + DataType::List(_) + )); + assert!(matches!( + result.schema().field(2).data_type(), + DataType::Map(_, _) + )); + for (idx, name) in [(1, "xs"), (2, "props"), (3, "s")] { + assert_eq!( + result.column(idx).null_count(), + 3, + "added nested column `{name}` should be all-NULL" + ); + } + + // (3c) The all-NULL struct still carries its full nested shape (`a` plus the + // nested list `ys`), not a degenerate empty struct — this is what the + // type-preserving NULL fill guarantees over an enumerate-each-type fix. + let result_schema = result.schema(); + let DataType::Struct(struct_fields) = result_schema.field(3).data_type() else { + panic!("field `s` should be a struct"); + }; + let child_names: Vec<&str> = struct_fields.iter().map(|f| f.name().as_str()).collect(); + assert_eq!(child_names, vec!["a", "ys"]); + assert!(matches!( + struct_fields + .iter() + .find(|f| f.name() == "ys") + .unwrap() + .data_type(), + DataType::List(_) + )); + } + pub fn source_record_batch() -> RecordBatch { RecordBatch::try_new( arrow_schema_promotion_addition_and_renaming_required(), diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index d07233c420..2ca326f8e0 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -23,7 +23,6 @@ use arrow_array::{ LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, }; -use arrow_buffer::NullBuffer; use arrow_schema::{DataType, FieldRef, TimeUnit}; use uuid::Uuid; @@ -628,23 +627,24 @@ pub(crate) fn create_primitive_array_single_element( data_type: &DataType, prim_lit: &Option, ) -> Result { + // No value: a single NULL of any (possibly nested) type (#2618). The `1` is + // `new_null_array`'s row count. + if prim_lit.is_none() { + return Ok(arrow_array::new_null_array(data_type, 1)); + } match (data_type, prim_lit) { (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => { Ok(Arc::new(BooleanArray::from(vec![*v]))) } - (DataType::Boolean, None) => Ok(Arc::new(BooleanArray::from(vec![Option::::None]))), (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => { Ok(Arc::new(Int32Array::from(vec![*v]))) } - (DataType::Int32, None) => Ok(Arc::new(Int32Array::from(vec![Option::::None]))), (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => { Ok(Arc::new(Date32Array::from(vec![*v]))) } - (DataType::Date32, None) => Ok(Arc::new(Date32Array::from(vec![Option::::None]))), (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => { Ok(Arc::new(Int64Array::from(vec![*v]))) } - (DataType::Int64, None) => Ok(Arc::new(Int64Array::from(vec![Option::::None]))), (DataType::Timestamp(TimeUnit::Microsecond, timezone), Some(PrimitiveLiteral::Long(v))) => { let array = TimestampMicrosecondArray::from(vec![*v]); if let Some(timezone) = timezone { @@ -653,14 +653,6 @@ pub(crate) fn create_primitive_array_single_element( Ok(Arc::new(array)) } } - (DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => { - let array = TimestampMicrosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone()))) - } else { - Ok(Arc::new(array)) - } - } (DataType::Timestamp(TimeUnit::Nanosecond, timezone), Some(PrimitiveLiteral::Long(v))) => { let array = TimestampNanosecondArray::from(vec![*v]); if let Some(timezone) = timezone { @@ -669,32 +661,18 @@ pub(crate) fn create_primitive_array_single_element( Ok(Arc::new(array)) } } - (DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => { - let array = TimestampNanosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone()))) - } else { - Ok(Arc::new(array)) - } - } (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => { Ok(Arc::new(Float32Array::from(vec![v.0]))) } - (DataType::Float32, None) => Ok(Arc::new(Float32Array::from(vec![Option::::None]))), (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => { Ok(Arc::new(Float64Array::from(vec![v.0]))) } - (DataType::Float64, None) => Ok(Arc::new(Float64Array::from(vec![Option::::None]))), (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => { Ok(Arc::new(StringArray::from(vec![v.as_str()]))) } - (DataType::Utf8, None) => Ok(Arc::new(StringArray::from(vec![Option::<&str>::None]))), (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => { Ok(Arc::new(BinaryArray::from_vec(vec![v.as_slice()]))) } - (DataType::Binary, None) => Ok(Arc::new(BinaryArray::from_opt_vec(vec![ - Option::<&[u8]>::None, - ]))), (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(v))) => { let array = Decimal128Array::from(vec![{ *v }]) .with_precision_and_scale(*precision, *scale) @@ -721,81 +699,6 @@ pub(crate) fn create_primitive_array_single_element( })?; Ok(Arc::new(array)) } - (DataType::Decimal128(precision, scale), None) => { - let array = Decimal128Array::from(vec![Option::::None]) - .with_precision_and_scale(*precision, *scale) - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}" - ), - ) - })?; - Ok(Arc::new(array)) - } - (DataType::Struct(fields), None) => { - // Create a single-element StructArray with nulls - let null_arrays: Vec = fields - .iter() - .map(|f| { - // Recursively create null arrays for struct fields - // For primitive fields in structs, use simple null arrays (not REE within struct) - match f.data_type() { - DataType::Boolean => { - Ok(Arc::new(BooleanArray::from(vec![Option::::None])) - as ArrayRef) - } - DataType::Int32 | DataType::Date32 => { - Ok(Arc::new(Int32Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Int64 => { - Ok(Arc::new(Int64Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Timestamp(TimeUnit::Microsecond, timezone) => { - let array = TimestampMicrosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef) - } else { - Ok(Arc::new(array) as ArrayRef) - } - } - DataType::Timestamp(TimeUnit::Nanosecond, timezone) => { - let array = TimestampNanosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef) - } else { - Ok(Arc::new(array) as ArrayRef) - } - } - DataType::Float32 => { - Ok(Arc::new(Float32Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Float64 => { - Ok(Arc::new(Float64Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Utf8 => { - Ok(Arc::new(StringArray::from(vec![Option::<&str>::None])) as ArrayRef) - } - DataType::Binary => { - Ok( - Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None])) - as ArrayRef, - ) - } - _ => Err(Error::new( - ErrorKind::Unexpected, - format!("Unsupported struct field type: {:?}", f.data_type()), - )), - } - }) - .collect::>>()?; - Ok(Arc::new(arrow_array::StructArray::new( - fields.clone(), - null_arrays, - Some(arrow_buffer::NullBuffer::new_null(1)), - ))) - } _ => Err(Error::new( ErrorKind::Unexpected, format!("Unsupported constant type combination: {data_type:?} with {prim_lit:?}"), @@ -812,35 +715,23 @@ pub(crate) fn create_primitive_array_repeated( prim_lit: &Option, num_rows: usize, ) -> Result { + // No value to repeat: an all-NULL column of any (possibly nested) type (#2618). + if prim_lit.is_none() { + return Ok(arrow_array::new_null_array(data_type, num_rows)); + } Ok(match (data_type, prim_lit) { (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => { Arc::new(BooleanArray::from(vec![*value; num_rows])) } - (DataType::Boolean, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(BooleanArray::from(vals)) - } (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => { Arc::new(Int32Array::from(vec![*value; num_rows])) } - (DataType::Int32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int32Array::from(vals)) - } (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => { Arc::new(Date32Array::from(vec![*value; num_rows])) } - (DataType::Date32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Date32Array::from(vals)) - } (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { Arc::new(Int64Array::from(vec![*value; num_rows])) } - (DataType::Int64, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int64Array::from(vals)) - } ( DataType::Timestamp(TimeUnit::Microsecond, timezone), Some(PrimitiveLiteral::Long(value)), @@ -852,15 +743,6 @@ pub(crate) fn create_primitive_array_repeated( Arc::new(array) } } - (DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => { - let vals: Vec> = vec![None; num_rows]; - let array = TimestampMicrosecondArray::from(vals); - if let Some(timezone) = timezone { - Arc::new(array.with_timezone(timezone.clone())) - } else { - Arc::new(array) - } - } ( DataType::Timestamp(TimeUnit::Nanosecond, timezone), Some(PrimitiveLiteral::Long(value)), @@ -872,43 +754,18 @@ pub(crate) fn create_primitive_array_repeated( Arc::new(array) } } - (DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => { - let vals: Vec> = vec![None; num_rows]; - let array = TimestampNanosecondArray::from(vals); - if let Some(timezone) = timezone { - Arc::new(array.with_timezone(timezone.clone())) - } else { - Arc::new(array) - } - } (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => { Arc::new(Float32Array::from(vec![value.0; num_rows])) } - (DataType::Float32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Float32Array::from(vals)) - } (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => { Arc::new(Float64Array::from(vec![value.0; num_rows])) } - (DataType::Float64, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Float64Array::from(vals)) - } (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { Arc::new(StringArray::from(vec![value.clone(); num_rows])) } - (DataType::Utf8, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(StringArray::from(vals)) - } (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => { Arc::new(BinaryArray::from_vec(vec![value; num_rows])) } - (DataType::Binary, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(BinaryArray::from_opt_vec(vals)) - } (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(value))) => { Arc::new( Decimal128Array::from(vec![*value; num_rows]) @@ -937,34 +794,6 @@ pub(crate) fn create_primitive_array_repeated( })?, ) } - (DataType::Decimal128(precision, scale), None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new( - Decimal128Array::from(vals) - .with_precision_and_scale(*precision, *scale) - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}" - ), - ) - })?, - ) - } - (DataType::Struct(fields), None) => { - // Create a StructArray filled with nulls - let null_arrays: Vec = fields - .iter() - .map(|field| create_primitive_array_repeated(field.data_type(), &None, num_rows)) - .collect::>>()?; - - Arc::new(StructArray::new( - fields.clone(), - null_arrays, - Some(NullBuffer::new_null(num_rows)), - )) - } (DataType::Null, _) => Arc::new(arrow_array::NullArray::new(num_rows)), (dt, _) => { return Err(Error::new(