Skip to content

Commit cf8748a

Browse files
committed
feat: minor cleanup: no need for offset index
1 parent c77f31f commit cf8748a

File tree

1 file changed

+37
-44
lines changed

1 file changed

+37
-44
lines changed

crates/iceberg/src/arrow/reader.rs

+37-44
Original file line numberDiff line numberDiff line change
@@ -215,38 +215,41 @@ impl ArrowReader {
215215
file_type,
216216
} = entry;
217217

218-
let record_batch_stream = Self::create_parquet_record_batch_stream_builder(
219-
file_path,
220-
file_io.clone(),
221-
false,
222-
)
223-
.await?
224-
.build()?
225-
.map(|item| match item {
226-
Ok(val) => Ok(val),
227-
Err(err) => Err(Error::new(ErrorKind::DataInvalid, err.to_string())
228-
.with_source(err)),
229-
})
230-
.boxed();
231-
232-
let result = match file_type {
233-
DataContentType::PositionDeletes => {
234-
parse_positional_delete_file(record_batch_stream).await
235-
}
236-
DataContentType::EqualityDeletes => {
237-
parse_equality_delete_file(record_batch_stream).await
238-
}
239-
_ => Err(Error::new(
240-
ErrorKind::Unexpected,
241-
"Expected equality or positional delete",
242-
)),
243-
}?;
244-
245-
tx.send(Ok(result)).await?;
246-
Ok(())
247-
}
248-
})
249-
.await;
218+
let record_batch_stream =
219+
Self::create_parquet_record_batch_stream_builder(
220+
file_path,
221+
file_io.clone(),
222+
false,
223+
)
224+
.await?
225+
.build()?
226+
.map(|item| match item {
227+
Ok(val) => Ok(val),
228+
Err(err) => {
229+
Err(Error::new(ErrorKind::DataInvalid, err.to_string())
230+
.with_source(err))
231+
}
232+
})
233+
.boxed();
234+
235+
let result = match file_type {
236+
DataContentType::PositionDeletes => {
237+
parse_positional_delete_file(record_batch_stream).await
238+
}
239+
DataContentType::EqualityDeletes => {
240+
parse_equality_delete_file(record_batch_stream).await
241+
}
242+
_ => Err(Error::new(
243+
ErrorKind::Unexpected,
244+
"Expected equality or positional delete",
245+
)),
246+
}?;
247+
248+
tx.send(Ok(result)).await?;
249+
Ok(())
250+
}
251+
})
252+
.await;
250253

251254
if let Err(error) = result {
252255
let _ = channel_for_error.send(Err(error)).await;
@@ -919,25 +922,15 @@ impl ArrowReader {
919922
selected_row_groups: &Option<Vec<usize>>,
920923
positional_deletes: &[usize],
921924
) -> Result<RowSelection> {
922-
let Some(offset_index) = parquet_metadata.offset_index() else {
923-
return Err(Error::new(
924-
ErrorKind::Unexpected,
925-
"Parquet file metadata does not contain an offset index",
926-
));
927-
};
928-
929925
let mut selected_row_groups_idx = 0;
930926
let mut curr_pos_del_idx = 0;
931927
let pos_del_len = positional_deletes.len();
932928

933-
let page_index = offset_index
934-
.iter()
935-
.enumerate()
936-
.zip(parquet_metadata.row_groups());
929+
let page_index = parquet_metadata.row_groups().iter().enumerate();
937930

938931
let mut results: Vec<RowSelector> = Vec::new();
939932
let mut current_page_base_idx: usize = 0;
940-
for ((idx, _offset_index), row_group_metadata) in page_index {
933+
for (idx, row_group_metadata) in page_index {
941934
let page_num_rows = row_group_metadata.num_rows() as usize;
942935
let next_page_base_idx = current_page_base_idx + page_num_rows;
943936

0 commit comments

Comments
 (0)