Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 158 additions & 2 deletions crates/iceberg/src/spec/manifest_list/manifest_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::str::FromStr;
use serde_derive::{Deserialize, Serialize};

use super::ByteBuf;
use crate::encryption::{EncryptedInputFile, StandardKeyMetadata};
use crate::error::Result;
use crate::io::FileIO;
use crate::spec::Manifest;
Expand Down Expand Up @@ -178,7 +179,14 @@ impl ManifestFile {
///
/// This method will also initialize inherited values of [`ManifestEntry`](crate::spec::ManifestEntry), such as `sequence_number`.
pub async fn load_manifest(&self, file_io: &FileIO) -> Result<Manifest> {
let avro = file_io.new_input(&self.manifest_path)?.read().await?;
let input = file_io.new_input(&self.manifest_path)?;
let avro = match &self.key_metadata {
Some(key_metadata_bytes) => {
let key_metadata = StandardKeyMetadata::decode(key_metadata_bytes)?;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same hardcoded-decode point as #2584 (so I won't re-litigate it here) - flagging only that if the FileKeyResolver lands, this path needs to be included, and there's a threading wrinkle: load_manifest(&self, file_io: &FileIO) has no parameter to receive a resolver today. Whoever lands the seam will need to thread it (or an EncryptionManager) into load_manifest.

EncryptedInputFile::new(input, key_metadata).read().await?
}
None => input.read().await?,
};

let (metadata, mut entries) = Manifest::try_from_avro_bytes(&avro)?;

Expand Down Expand Up @@ -217,7 +225,17 @@ pub struct FieldSummary {

#[cfg(test)]
mod test {
use super::ManifestContentType;
use std::collections::HashMap;
use std::sync::Arc;

use super::{ManifestContentType, ManifestFile};
use crate::ErrorKind;
use crate::encryption::{EncryptedOutputFile, StandardKeyMetadata};
use crate::io::FileIO;
use crate::spec::{
DataContentType, DataFile, DataFileFormat, ManifestEntry, ManifestStatus,
ManifestWriterBuilder, NestedField, PartitionSpec, PrimitiveType, Schema, Struct, Type,
};

#[test]
fn test_manifest_content_type_default() {
Expand All @@ -228,4 +246,142 @@ mod test {
fn test_manifest_content_type_default_value() {
assert_eq!(ManifestContentType::default() as i32, 0);
}

/// Writes a single-entry manifest, encrypting it with `key_metadata`, and
/// returns the resulting [`ManifestFile`]. The manifest is stored in `io`
/// at `path`.
async fn write_encrypted_manifest(
io: &FileIO,
path: &str,
key_metadata: StandardKeyMetadata,
) -> ManifestFile {
let schema = Arc::new(
Schema::builder()
.with_fields(vec![Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
))])
.build()
.unwrap(),
);

let partition_spec = PartitionSpec::builder(schema.clone())
.with_spec_id(0)
.build()
.unwrap();

let output_file = io.new_output(path).unwrap();
let encrypted_output = EncryptedOutputFile::new(output_file, key_metadata);

let mut writer = ManifestWriterBuilder::new_from_encrypted(
encrypted_output,
Some(1),
schema.clone(),
partition_spec.clone(),
)
.expect("Expected a valid writer")
.build_v3_data();

writer
.add_entry(ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3://bucket/table/data/00000.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::empty(),
record_count: 100,
file_size_in_bytes: 4096,
column_sizes: HashMap::new(),
value_counts: HashMap::new(),
null_value_counts: HashMap::new(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: None,
equality_ids: None,
sort_order_id: None,
partition_spec_id: 0,
first_row_id: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
},
})
.unwrap();

writer.write_manifest_file().await.unwrap()
}

#[tokio::test]
async fn test_load_manifest_decrypts_when_key_metadata_present() {
let key_metadata =
StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"test-aad-prefix!");
let encoded_key_metadata = key_metadata.encode().unwrap().to_vec();

let io = FileIO::new_with_memory();
let path = "memory:///test/encrypted_manifest.avro";
let manifest_file = write_encrypted_manifest(&io, path, key_metadata).await;
assert_eq!(manifest_file.key_metadata, Some(encoded_key_metadata));

let manifest = manifest_file.load_manifest(&io).await.unwrap();
assert_eq!(manifest.entries().len(), 1);
assert_eq!(
manifest.entries()[0].file_path(),
"s3://bucket/table/data/00000.parquet"
);
assert_eq!(manifest.entries()[0].data_file.record_count, 100);
}

#[tokio::test]
async fn test_load_manifest_fails_with_wrong_key() {
let key_metadata =
StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"test-aad-prefix!");

let io = FileIO::new_with_memory();
let path = "memory:///test/wrong_key_manifest.avro";
let mut manifest_file = write_encrypted_manifest(&io, path, key_metadata).await;

// Point the manifest file at key metadata carrying a different DEK (but
// the same AAD prefix). The bytes on disk were encrypted with the
// original key, so GCM authentication must fail rather than silently
// returning garbage.
let wrong_key_metadata =
StandardKeyMetadata::new(b"fedcba9876543210").with_aad_prefix(b"test-aad-prefix!");
manifest_file.key_metadata = Some(wrong_key_metadata.encode().unwrap().to_vec());

let err = manifest_file
.load_manifest(&io)
.await
.expect_err("load_manifest must fail when decrypting with the wrong key");
assert_eq!(err.kind(), ErrorKind::Unexpected);
}

#[tokio::test]
async fn test_load_manifest_fails_with_wrong_aad() {
let key_metadata =
StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"test-aad-prefix!");

let io = FileIO::new_with_memory();
let path = "memory:///test/wrong_aad_manifest.avro";
let mut manifest_file = write_encrypted_manifest(&io, path, key_metadata).await;

// Point the manifest file at key metadata carrying the correct DEK but a
// different AAD prefix. The per-block AAD is `aad_prefix || block_index`,
// so GCM authentication must fail even though the key is right.
let wrong_aad_metadata =
StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"wrong-aad-prefix");
manifest_file.key_metadata = Some(wrong_aad_metadata.encode().unwrap().to_vec());

let err = manifest_file
.load_manifest(&io)
.await
.expect_err("load_manifest must fail when decrypting with the wrong AAD prefix");
assert_eq!(err.kind(), ErrorKind::Unexpected);
}
}
Loading