diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index 41d4d7691b..1c1ac2e7cb 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -2007,6 +2007,7 @@ pub fn iceberg::spec::ManifestFile::has_deleted_files(&self) -> bool pub fn iceberg::spec::ManifestFile::has_existing_files(&self) -> bool impl iceberg::spec::ManifestFile pub async fn iceberg::spec::ManifestFile::load_manifest(&self, file_io: &iceberg::io::FileIO) -> iceberg::Result +pub async fn iceberg::spec::ManifestFile::load_manifest_with(&self, file_io: &iceberg::io::FileIO, table_metadata: core::option::Option<&iceberg::spec::TableMetadataRef>) -> iceberg::Result impl core::clone::Clone for iceberg::spec::ManifestFile pub fn iceberg::spec::ManifestFile::clone(&self) -> iceberg::spec::ManifestFile impl core::cmp::Eq for iceberg::spec::ManifestFile @@ -2052,6 +2053,7 @@ impl iceberg::spec::ManifestMetadata pub fn iceberg::spec::ManifestMetadata::content(&self) -> &iceberg::spec::ManifestContentType pub fn iceberg::spec::ManifestMetadata::format_version(&self) -> &iceberg::spec::FormatVersion pub fn iceberg::spec::ManifestMetadata::parse(meta: &std::collections::hash::map::HashMap>) -> iceberg::Result +pub fn iceberg::spec::ManifestMetadata::parse_with(meta: &std::collections::hash::map::HashMap>, table_metadata: core::option::Option<&iceberg::spec::TableMetadataRef>) -> iceberg::Result pub fn iceberg::spec::ManifestMetadata::partition_spec(&self) -> &iceberg::spec::PartitionSpec pub fn iceberg::spec::ManifestMetadata::schema(&self) -> &iceberg::spec::SchemaRef pub fn iceberg::spec::ManifestMetadata::schema_id(&self) -> iceberg::spec::SchemaId diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 9d7815569b..523790d835 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -97,10 +97,14 @@ impl ObjectCache { /// Retrieves an Arc [`Manifest`] from the cache /// or retrieves one from FileIO and parses it if not present - pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result> { + pub(crate) async fn get_manifest( + &self, + manifest_file: &ManifestFile, + table_metadata: &TableMetadataRef, + ) -> Result> { if self.cache_disabled { return manifest_file - .load_manifest(&self.file_io) + .load_manifest_with(&self.file_io, Some(table_metadata)) .await .map(Arc::new); } @@ -110,7 +114,7 @@ impl ObjectCache { let cache_entry = self .cache .entry_by_ref(&key) - .or_try_insert_with(self.fetch_and_parse_manifest(manifest_file)) + .or_try_insert_with(self.fetch_and_parse_manifest(manifest_file, table_metadata)) .await .map_err(|err| { Error::new( @@ -179,8 +183,14 @@ impl ObjectCache { } } - async fn fetch_and_parse_manifest(&self, manifest_file: &ManifestFile) -> Result { - let manifest = manifest_file.load_manifest(&self.file_io).await?; + async fn fetch_and_parse_manifest( + &self, + manifest_file: &ManifestFile, + table_metadata: &TableMetadataRef, + ) -> Result { + let manifest = manifest_file + .load_manifest_with(&self.file_io, Some(table_metadata)) + .await?; Ok(CachedItem::Manifest(Arc::new(manifest))) } @@ -358,7 +368,10 @@ mod tests { assert_eq!(result_manifest_list.entries().len(), 1); let manifest_file = result_manifest_list.entries().first().unwrap(); - let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap(); + let result_manifest = object_cache + .get_manifest(manifest_file, &fixture.table.metadata_ref()) + .await + .unwrap(); assert_eq!( result_manifest @@ -405,7 +418,10 @@ mod tests { let manifest_file = result_manifest_list.entries().first().unwrap(); // not in cache - let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap(); + let result_manifest = object_cache + .get_manifest(manifest_file, &fixture.table.metadata_ref()) + .await + .unwrap(); assert_eq!( result_manifest @@ -420,7 +436,10 @@ mod tests { ); // retrieve cached version - let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap(); + let result_manifest = object_cache + .get_manifest(manifest_file, &fixture.table.metadata_ref()) + .await + .unwrap(); assert_eq!( result_manifest @@ -434,4 +453,81 @@ mod tests { "1.parquet" ); } + + #[test] + fn test_manifest_metadata_parse_prefers_table_metadata_over_bad_schema() { + use std::collections::HashMap; + + use crate::spec::ManifestMetadata; + + let fixture = TableTestFixture::new(); + let table_metadata = fixture.table.metadata_ref(); + let schema_id = table_metadata.current_schema().schema_id(); + let spec_id = table_metadata.default_partition_spec().spec_id(); + + // Manifest key-value metadata whose `schema` value is non-conformant + // (as written by some engines, e.g. duckdb-iceberg, which serialize the + // manifest_entry Avro schema there using Avro type names like `array`), + // but whose `schema-id` / `partition-spec-id` are valid. + let mut meta: HashMap> = HashMap::new(); + meta.insert("schema-id".to_string(), schema_id.to_string().into_bytes()); + meta.insert( + "partition-spec-id".to_string(), + spec_id.to_string().into_bytes(), + ); + meta.insert("format-version".to_string(), b"2".to_vec()); + meta.insert("content".to_string(), b"data".to_vec()); + meta.insert( + "schema".to_string(), + br#"{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"x","required":true,"type":{"type":"array","items":"int"}}]}"# + .to_vec(), + ); + + // Parsing from the manifest's own metadata rejects the non-conformant schema. + assert!(ManifestMetadata::parse(&meta).is_err()); + + // With table metadata available, the authoritative schema/spec are used + // (looked up by id) and the manifest's `schema` key is not parsed. + let parsed = ManifestMetadata::parse_with(&meta, Some(&table_metadata)).unwrap(); + assert_eq!(parsed.schema.schema_id(), schema_id); + assert_eq!(parsed.partition_spec.spec_id(), spec_id); + } + + #[test] + fn test_manifest_metadata_parse_self_describes_when_ids_not_recorded() { + use std::collections::HashMap; + + use crate::spec::{ManifestMetadata, Type}; + + let fixture = TableTestFixture::new(); + let table_metadata = fixture.table.metadata_ref(); + + // A manifest written WITHOUT `schema-id` / `partition-spec-id` keys + // (some writers omit them). Its self-described schema — a single long + // column that does NOT match the table's schemas — must win: assuming + // the default id 0 and looking that up in the table metadata would + // mistype this manifest's column bounds. + let mut meta: HashMap> = HashMap::new(); + meta.insert("format-version".to_string(), b"2".to_vec()); + meta.insert("content".to_string(), b"data".to_vec()); + meta.insert( + "schema".to_string(), + br#"{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"foo","required":false,"type":"long"}]}"# + .to_vec(), + ); + meta.insert("partition-spec".to_string(), b"[]".to_vec()); + + let parsed = ManifestMetadata::parse_with(&meta, Some(&table_metadata)).unwrap(); + let field = parsed.schema.field_by_id(1).unwrap(); + assert_eq!(field.name, "foo"); + assert_eq!( + *field.field_type, + Type::Primitive(crate::spec::PrimitiveType::Long) + ); + assert_ne!( + parsed.schema.as_ref().as_struct(), + table_metadata.current_schema().as_struct(), + "must not silently adopt a table schema the manifest never referenced" + ); + } } diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 75672d9cbb..c5c58581df 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -44,6 +44,7 @@ pub(crate) struct ManifestFileContext { bound_predicates: Option>, object_cache: Arc, snapshot_schema: SchemaRef, + table_metadata: TableMetadataRef, expression_evaluator_cache: Arc, delete_file_index: DeleteFileIndex, name_mapping: Option>, @@ -74,6 +75,7 @@ impl ManifestFileContext { manifest_file, bound_predicates, snapshot_schema, + table_metadata, field_ids, mut sender, expression_evaluator_cache, @@ -82,7 +84,9 @@ impl ManifestFileContext { case_sensitive, } = self; - let manifest = object_cache.get_manifest(&manifest_file).await?; + let manifest = object_cache + .get_manifest(&manifest_file, &table_metadata) + .await?; for manifest_entry in manifest.entries() { let manifest_entry_context = ManifestEntryContext { @@ -279,6 +283,7 @@ impl PlanContext { sender, object_cache: self.object_cache.clone(), snapshot_schema: self.snapshot_schema.clone(), + table_metadata: self.table_metadata.clone(), field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), delete_file_index, diff --git a/crates/iceberg/src/spec/manifest/metadata.rs b/crates/iceberg/src/spec/manifest/metadata.rs index 25e4ae7e06..c7a79afe97 100644 --- a/crates/iceberg/src/spec/manifest/metadata.rs +++ b/crates/iceberg/src/spec/manifest/metadata.rs @@ -22,7 +22,7 @@ use typed_builder::TypedBuilder; use super::{FormatVersion, ManifestContentType, PartitionSpec, Schema}; use crate::error::Result; -use crate::spec::{PartitionField, SchemaId, SchemaRef}; +use crate::spec::{PartitionField, SchemaId, SchemaRef, TableMetadataRef}; use crate::{Error, ErrorKind}; /// Meta data of a manifest that is stored in the key-value metadata of the Avro file @@ -44,22 +44,40 @@ pub struct ManifestMetadata { impl ManifestMetadata { /// Parse from metadata in avro file. pub fn parse(meta: &HashMap>) -> Result { - let schema = Arc::new({ - let bs = meta.get("schema").ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "schema is required in manifest metadata but not found", - ) - })?; - serde_json::from_slice::(bs).map_err(|err| { - Error::new( - ErrorKind::DataInvalid, - "Fail to parse schema in manifest metadata", - ) - .with_source(err) - })? - }); - let schema_id: i32 = meta + Self::parse_with(meta, None) + } + + /// Parse from the avro file's key-value metadata, preferring the table + /// metadata's schema and partition spec (looked up by the manifest's + /// **recorded** `schema-id` / `partition-spec-id`) over the manifest's + /// self-described `schema` / `partition-spec` keys. + /// + /// A manifest's embedded `schema` key is redundant with the authoritative + /// table metadata, and some writers (e.g. duckdb-iceberg) store a + /// non-conformant value there (the manifest_entry Avro record schema rather + /// than the Iceberg table schema). When the manifest records a `schema-id` + /// and `table_metadata` contains that schema, the table's schema is used + /// and the manifest's own `schema` key is not parsed — mirroring + /// iceberg-java's `ManifestReader(specsById)`, whose reading of the schema + /// from manifest file metadata is deprecated. The same applies to + /// `partition-spec-id` and the partition spec. + /// + /// The lookup happens ONLY for ids the manifest actually records. A writer + /// that omits the `schema-id` key (some engines do) may have written the + /// manifest under any historical schema — assuming the default id 0 would + /// mistype column bounds after a type promotion (e.g. 8-byte long bounds + /// decoded as int), so the manifest's self-described schema is the only + /// reliable description of its bytes and is parsed instead. When + /// `table_metadata` is `None` (or does not contain a recorded id) the + /// manifest's own metadata is likewise parsed, preserving the previous + /// self-describing behaviour. + pub fn parse_with( + meta: &HashMap>, + table_metadata: Option<&TableMetadataRef>, + ) -> Result { + // `None` when the writer omitted the key — deliberately NOT defaulted + // before the table-metadata lookup below. + let recorded_schema_id: Option = meta .get("schema-id") .map(|bs| { String::from_utf8_lossy(bs).parse().map_err(|err| { @@ -70,42 +88,21 @@ impl ManifestMetadata { .with_source(err) }) }) - .transpose()? - .unwrap_or(0); - let partition_spec = { - let fields = { - let bs = meta.get("partition-spec").ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "partition-spec is required in manifest metadata but not found", - ) - })?; - serde_json::from_slice::>(bs).map_err(|err| { + .transpose()?; + let recorded_spec_id: Option = meta + .get("partition-spec-id") + .map(|bs| { + String::from_utf8_lossy(bs).parse().map_err(|err| { Error::new( ErrorKind::DataInvalid, - "Fail to parse partition spec in manifest metadata", + "Fail to parse partition spec id in manifest metadata", ) .with_source(err) - })? - }; - let spec_id = meta - .get("partition-spec-id") - .map(|bs| { - String::from_utf8_lossy(bs).parse().map_err(|err| { - Error::new( - ErrorKind::DataInvalid, - "Fail to parse partition spec id in manifest metadata", - ) - .with_source(err) - }) }) - .transpose()? - .unwrap_or(0); - PartitionSpec::builder(schema.clone()) - .with_spec_id(spec_id) - .add_unbound_fields(fields.into_iter().map(|f| f.into_unbound()))? - .build()? - }; + }) + .transpose()?; + let schema_id = recorded_schema_id.unwrap_or(0); + let spec_id = recorded_spec_id.unwrap_or(0); let format_version = if let Some(bs) = meta.get("format-version") { serde_json::from_slice::(bs).map_err(|err| { Error::new( @@ -123,6 +120,64 @@ impl ManifestMetadata { } else { ManifestContentType::Data }; + + // Prefer the authoritative table schema + partition spec when the + // manifest RECORDS the ids and the table metadata contains them, + // bypassing the manifest's redundant (and sometimes non-conformant) + // `schema` / `partition-spec` metadata keys. Manifests that omit the + // ids stay on the self-describing path below. + if let Some(table_metadata) = table_metadata + && let (Some(schema), Some(partition_spec)) = ( + recorded_schema_id.and_then(|id| table_metadata.schema_by_id(id)), + recorded_spec_id.and_then(|id| table_metadata.partition_spec_by_id(id)), + ) + { + return Ok(ManifestMetadata { + schema: schema.clone(), + schema_id, + partition_spec: partition_spec.as_ref().clone(), + format_version, + content, + }); + } + + // Fallback: parse the schema + partition spec from the manifest's own + // key-value metadata (the manifest is self-describing). + let schema = Arc::new({ + let bs = meta.get("schema").ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "schema is required in manifest metadata but not found", + ) + })?; + serde_json::from_slice::(bs).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse schema in manifest metadata", + ) + .with_source(err) + })? + }); + let fields = { + let bs = meta.get("partition-spec").ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "partition-spec is required in manifest metadata but not found", + ) + })?; + serde_json::from_slice::>(bs).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse partition spec in manifest metadata", + ) + .with_source(err) + })? + }; + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(spec_id) + .add_unbound_fields(fields.into_iter().map(|f| f.into_unbound()))? + .build()?; + Ok(ManifestMetadata { schema, schema_id, diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index 4ba2645f51..b4ccda72a3 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -31,7 +31,7 @@ pub use writer::*; use super::{ Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, Schema, Struct, - UNASSIGNED_SEQUENCE_NUMBER, + TableMetadataRef, UNASSIGNED_SEQUENCE_NUMBER, }; use crate::error::Result; use crate::{Error, ErrorKind}; @@ -46,11 +46,22 @@ pub struct Manifest { impl Manifest { /// Parse manifest metadata and entries from bytes of avro file. pub(crate) fn try_from_avro_bytes(bs: &[u8]) -> Result<(ManifestMetadata, Vec)> { + Self::try_from_avro_bytes_with(bs, None) + } + + /// Like [`Self::try_from_avro_bytes`], but prefers the provided table + /// metadata's schema and partition spec over the manifest's own + /// self-described `schema` / `partition-spec` keys (see + /// [`ManifestMetadata::parse_with`]). + pub(crate) fn try_from_avro_bytes_with( + bs: &[u8], + table_metadata: Option<&TableMetadataRef>, + ) -> Result<(ManifestMetadata, Vec)> { let reader = AvroReader::new(bs)?; // Parse manifest metadata let meta = reader.user_metadata(); - let metadata = ManifestMetadata::parse(meta)?; + let metadata = ManifestMetadata::parse_with(meta, table_metadata)?; // Parse manifest entries let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?; diff --git a/crates/iceberg/src/spec/manifest_list/manifest_file.rs b/crates/iceberg/src/spec/manifest_list/manifest_file.rs index 2556305816..2bc4dcbfa7 100644 --- a/crates/iceberg/src/spec/manifest_list/manifest_file.rs +++ b/crates/iceberg/src/spec/manifest_list/manifest_file.rs @@ -22,7 +22,7 @@ use serde_derive::{Deserialize, Serialize}; use super::ByteBuf; use crate::error::Result; use crate::io::FileIO; -use crate::spec::Manifest; +use crate::spec::{Manifest, TableMetadataRef}; use crate::{Error, ErrorKind}; /// Entry in a manifest list. @@ -178,9 +178,21 @@ impl ManifestFile { /// /// This method will also initialize inherited values of [`ManifestEntry`](crate::spec::ManifestEntry), such as `sequence_number`. pub async fn load_manifest(&self, file_io: &FileIO) -> Result { + self.load_manifest_with(file_io, None).await + } + + /// Like [`Self::load_manifest`], but prefers the provided table metadata's + /// schema and partition spec over the manifest's own self-described + /// `schema` / `partition-spec` metadata (see + /// [`ManifestMetadata::parse_with`](crate::spec::ManifestMetadata::parse_with)). + pub async fn load_manifest_with( + &self, + file_io: &FileIO, + table_metadata: Option<&TableMetadataRef>, + ) -> Result { let avro = file_io.new_input(&self.manifest_path)?.read().await?; - let (metadata, mut entries) = Manifest::try_from_avro_bytes(&avro)?; + let (metadata, mut entries) = Manifest::try_from_avro_bytes_with(&avro, table_metadata)?; // Let entries inherit values from the manifest list entry. for entry in &mut entries {