Skip to content

feat: Add optional prefetch hint for parsing Puffin Footer #1207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
144 changes: 130 additions & 14 deletions crates/iceberg/src/puffin/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl Flag {
}

/// Metadata about a puffin file.
///
/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub struct FileMetadata {
Expand All @@ -144,18 +145,28 @@ impl FileMetadata {
pub(crate) const MAGIC_LENGTH: u8 = 4;
pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31];

// We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below.
//
// Footer
// |
// -------------------------------------------------
// | |
// Magic FooterPayload FooterPayloadLength Flags Magic
// | |
// -----------------------------
// |
// FOOTER_STRUCT

/// We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer.
/// The structure of the Footer specification is illustrated below:
///
/// ```text
/// Footer
/// ┌────────────────────┐
/// │ Magic (4 bytes) │
/// │ │
/// ├────────────────────┤
/// │ FooterPayload │
/// │ (PAYLOAD_LENGTH) │
/// ├────────────────────┤ ◀─┐
/// │ FooterPayloadSize │ │
/// │ (4 bytes) │ │
/// ├────────────────────┤
/// │ Flags (4 bytes) │ FOOTER_STRUCT
/// │ │
/// ├────────────────────┤ │
/// │ Magic (4 bytes) │ │
/// │ │ │
/// └────────────────────┘ ◀─┘
/// ```
const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0;
const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4;
const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET
Expand All @@ -166,6 +177,11 @@ impl FileMetadata {
pub(crate) const FOOTER_STRUCT_LENGTH: u8 =
FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH;

/// Constructs new puffin `FileMetadata`
pub fn new(blobs: Vec<BlobMetadata>, properties: HashMap<String, String>) -> Self {
Self { blobs, properties }
}

fn check_magic(bytes: &[u8]) -> Result<()> {
if bytes == FileMetadata::MAGIC {
Ok(())
Expand Down Expand Up @@ -285,9 +301,73 @@ impl FileMetadata {

let footer_payload_str =
FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?;

FileMetadata::from_json_str(&footer_payload_str)
}

/// Reads file_metadata in puffin file with a prefetch hint
///
/// `prefetch_hint` is used to try to fetch the entire footer in one read. If
/// the entire footer isn't fetched in one read the function will call the regular
/// read option.
#[allow(dead_code)]
pub(crate) async fn read_with_prefetch(
input_file: &InputFile,
prefetch_hint: u8,
) -> Result<FileMetadata> {
if prefetch_hint > 16 {
let input_file_length = input_file.metadata().await?.size;
let file_read = input_file.reader().await?;

// Hint cannot be larger than input file
if prefetch_hint as u64 > input_file_length {
return FileMetadata::read(input_file).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to add some warning log here.

}

// Read footer based on prefetchi hint
let start = input_file_length - prefetch_hint as u64;
let end = input_file_length;
let footer_bytes = file_read.read(start..end).await?;

let payload_length_start =
footer_bytes.len() - (FileMetadata::FOOTER_STRUCT_LENGTH as usize);
let payload_length_end =
payload_length_start + (FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as usize);
let payload_length_bytes = &footer_bytes[payload_length_start..payload_length_end];

let mut buf = [0; 4];
buf.copy_from_slice(payload_length_bytes);
let footer_payload_length = u32::from_le_bytes(buf);

// If the (footer payload length + FOOTER_STRUCT_LENGTH + MAGIC_LENGTH) is greater
// than the fetched footer then you can have it read regularly from a read with no
// prefetch while passing in the footer_payload_length.
let footer_length = (footer_payload_length as usize)
+ FileMetadata::FOOTER_STRUCT_LENGTH as usize
+ FileMetadata::MAGIC_LENGTH as usize;
if footer_length > prefetch_hint as usize {
return FileMetadata::read(input_file).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

}

// Read footer bytes
let footer_start = footer_bytes.len() - footer_length;
let footer_end = footer_bytes.len();
let footer_bytes = &footer_bytes[footer_start..footer_end];

let magic_length = FileMetadata::MAGIC_LENGTH as usize;
// check first four bytes of footer
FileMetadata::check_magic(&footer_bytes[..magic_length])?;
// check last four bytes of footer
FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;

let footer_payload_str =
FileMetadata::extract_footer_payload_as_str(footer_bytes, footer_payload_length)?;
return FileMetadata::from_json_str(&footer_payload_str);
}

FileMetadata::read(input_file).await
}

#[inline]
/// Metadata about blobs in file
pub fn blobs(&self) -> &[BlobMetadata] {
Expand Down Expand Up @@ -787,7 +867,7 @@ mod tests {
.await;

assert_eq!(
FileMetadata::read(&input_file).await.unwrap_err().to_string(),
FileMetadata::read(&input_file, ).await.unwrap_err().to_string(),
format!(
"DataInvalid => Given string is not valid JSON, source: invalid value: integer `{}`, expected i32 at line 5 column 51",
out_of_i32_range_number
Expand All @@ -800,6 +880,7 @@ mod tests {
let temp_dir = TempDir::new().unwrap();

let input_file = input_file_with_payload(&temp_dir, r#""blobs" = []"#).await;

assert_eq!(
FileMetadata::read(&input_file).await.unwrap_err().to_string(),
"DataInvalid => Given string is not valid JSON, source: invalid type: string \"blobs\", expected struct FileMetadata at line 1 column 7",
Expand All @@ -809,21 +890,56 @@ mod tests {
#[tokio::test]
async fn test_read_file_metadata_of_uncompressed_empty_file() {
let input_file = java_empty_uncompressed_input_file();

let file_metadata = FileMetadata::read(&input_file).await.unwrap();
assert_eq!(file_metadata, empty_footer_payload())
}

#[tokio::test]
async fn test_read_file_metadata_of_uncompressed_metric_data() {
let input_file = java_uncompressed_metric_input_file();

let file_metadata = FileMetadata::read(&input_file).await.unwrap();
assert_eq!(file_metadata, uncompressed_metric_file_metadata())
}

#[tokio::test]
async fn test_read_file_metadata_of_zstd_compressed_metric_data() {
let input_file = java_zstd_compressed_metric_input_file();
let file_metadata = FileMetadata::read(&input_file).await.unwrap();

let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
.await
.unwrap();
assert_eq!(file_metadata, zstd_compressed_metric_file_metadata())
}

#[tokio::test]
async fn test_read_file_metadata_of_empty_file_with_prefetching() {
let input_file = java_empty_uncompressed_input_file();
let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
.await
.unwrap();

assert_eq!(file_metadata, empty_footer_payload());
}

#[tokio::test]
async fn test_read_file_metadata_of_uncompressed_metric_data_with_prefetching() {
let input_file = java_uncompressed_metric_input_file();
let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
.await
.unwrap();

assert_eq!(file_metadata, uncompressed_metric_file_metadata());
}

#[tokio::test]
async fn test_read_file_metadata_of_zstd_compressed_metric_data_with_prefetching() {
let input_file = java_zstd_compressed_metric_input_file();
let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
.await
.unwrap();

assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
}
}
Loading