Skip to content

Commit 92cfd99

Browse files
authored
Refactor arrow-ipc: Rename ArrayReader to RecodeBatchDecoder (apache#7028)
* Rename `ArrayReader` to `RecordBatchDecoder` * Remove alias for `self`
1 parent 7302888 commit 92cfd99

File tree

2 files changed

+58
-56
lines changed

2 files changed

+58
-56
lines changed

arrow-ipc/src/reader.rs

+56-54
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ fn read_buffer(
6565
(false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data),
6666
}
6767
}
68-
impl ArrayReader<'_> {
68+
impl RecordBatchDecoder<'_> {
6969
/// Coordinates reading arrays based on data types.
7070
///
7171
/// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
@@ -83,18 +83,17 @@ impl ArrayReader<'_> {
8383
field: &Field,
8484
variadic_counts: &mut VecDeque<i64>,
8585
) -> Result<ArrayRef, ArrowError> {
86-
let reader = self;
8786
let data_type = field.data_type();
8887
match data_type {
8988
Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
90-
reader.next_node(field)?,
89+
self.next_node(field)?,
9190
data_type,
9291
&[
93-
reader.next_buffer()?,
94-
reader.next_buffer()?,
95-
reader.next_buffer()?,
92+
self.next_buffer()?,
93+
self.next_buffer()?,
94+
self.next_buffer()?,
9695
],
97-
reader.require_alignment,
96+
self.require_alignment,
9897
),
9998
BinaryView | Utf8View => {
10099
let count = variadic_counts
@@ -104,55 +103,55 @@ impl ArrayReader<'_> {
104103
)))?;
105104
let count = count + 2; // view and null buffer.
106105
let buffers = (0..count)
107-
.map(|_| reader.next_buffer())
106+
.map(|_| self.next_buffer())
108107
.collect::<Result<Vec<_>, _>>()?;
109108
create_primitive_array(
110-
reader.next_node(field)?,
109+
self.next_node(field)?,
111110
data_type,
112111
&buffers,
113-
reader.require_alignment,
112+
self.require_alignment,
114113
)
115114
}
116115
FixedSizeBinary(_) => create_primitive_array(
117-
reader.next_node(field)?,
116+
self.next_node(field)?,
118117
data_type,
119-
&[reader.next_buffer()?, reader.next_buffer()?],
120-
reader.require_alignment,
118+
&[self.next_buffer()?, self.next_buffer()?],
119+
self.require_alignment,
121120
),
122121
List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
123-
let list_node = reader.next_node(field)?;
124-
let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
125-
let values = reader.create_array(list_field, variadic_counts)?;
122+
let list_node = self.next_node(field)?;
123+
let list_buffers = [self.next_buffer()?, self.next_buffer()?];
124+
let values = self.create_array(list_field, variadic_counts)?;
126125
create_list_array(
127126
list_node,
128127
data_type,
129128
&list_buffers,
130129
values,
131-
reader.require_alignment,
130+
self.require_alignment,
132131
)
133132
}
134133
FixedSizeList(ref list_field, _) => {
135-
let list_node = reader.next_node(field)?;
136-
let list_buffers = [reader.next_buffer()?];
137-
let values = reader.create_array(list_field, variadic_counts)?;
134+
let list_node = self.next_node(field)?;
135+
let list_buffers = [self.next_buffer()?];
136+
let values = self.create_array(list_field, variadic_counts)?;
138137
create_list_array(
139138
list_node,
140139
data_type,
141140
&list_buffers,
142141
values,
143-
reader.require_alignment,
142+
self.require_alignment,
144143
)
145144
}
146145
Struct(struct_fields) => {
147-
let struct_node = reader.next_node(field)?;
148-
let null_buffer = reader.next_buffer()?;
146+
let struct_node = self.next_node(field)?;
147+
let null_buffer = self.next_buffer()?;
149148

150149
// read the arrays for each field
151150
let mut struct_arrays = vec![];
152151
// TODO investigate whether just knowing the number of buffers could
153152
// still work
154153
for struct_field in struct_fields {
155-
let child = reader.create_array(struct_field, variadic_counts)?;
154+
let child = self.create_array(struct_field, variadic_counts)?;
156155
struct_arrays.push(child);
157156
}
158157
let null_count = struct_node.null_count() as usize;
@@ -175,32 +174,32 @@ impl ArrayReader<'_> {
175174
Ok(Arc::new(struct_array))
176175
}
177176
RunEndEncoded(run_ends_field, values_field) => {
178-
let run_node = reader.next_node(field)?;
179-
let run_ends = reader.create_array(run_ends_field, variadic_counts)?;
180-
let values = reader.create_array(values_field, variadic_counts)?;
177+
let run_node = self.next_node(field)?;
178+
let run_ends = self.create_array(run_ends_field, variadic_counts)?;
179+
let values = self.create_array(values_field, variadic_counts)?;
181180

182181
let run_array_length = run_node.length() as usize;
183182
let array_data = ArrayData::builder(data_type.clone())
184183
.len(run_array_length)
185184
.offset(0)
186185
.add_child_data(run_ends.into_data())
187186
.add_child_data(values.into_data())
188-
.align_buffers(!reader.require_alignment)
187+
.align_buffers(!self.require_alignment)
189188
.build()?;
190189

191190
Ok(make_array(array_data))
192191
}
193192
// Create dictionary array from RecordBatch
194193
Dictionary(_, _) => {
195-
let index_node = reader.next_node(field)?;
196-
let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];
194+
let index_node = self.next_node(field)?;
195+
let index_buffers = [self.next_buffer()?, self.next_buffer()?];
197196

198197
#[allow(deprecated)]
199198
let dict_id = field.dict_id().ok_or_else(|| {
200199
ArrowError::ParseError(format!("Field {field} does not have dict id"))
201200
})?;
202201

203-
let value_array = reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
202+
let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
204203
ArrowError::ParseError(format!(
205204
"Cannot find a dictionary batch with dict id: {dict_id}"
206205
))
@@ -211,26 +210,26 @@ impl ArrayReader<'_> {
211210
data_type,
212211
&index_buffers,
213212
value_array.clone(),
214-
reader.require_alignment,
213+
self.require_alignment,
215214
)
216215
}
217216
Union(fields, mode) => {
218-
let union_node = reader.next_node(field)?;
217+
let union_node = self.next_node(field)?;
219218
let len = union_node.length() as usize;
220219

221220
// In V4, union types has validity bitmap
222221
// In V5 and later, union types have no validity bitmap
223-
if reader.version < MetadataVersion::V5 {
224-
reader.next_buffer()?;
222+
if self.version < MetadataVersion::V5 {
223+
self.next_buffer()?;
225224
}
226225

227226
let type_ids: ScalarBuffer<i8> =
228-
reader.next_buffer()?.slice_with_length(0, len).into();
227+
self.next_buffer()?.slice_with_length(0, len).into();
229228

230229
let value_offsets = match mode {
231230
UnionMode::Dense => {
232231
let offsets: ScalarBuffer<i32> =
233-
reader.next_buffer()?.slice_with_length(0, len * 4).into();
232+
self.next_buffer()?.slice_with_length(0, len * 4).into();
234233
Some(offsets)
235234
}
236235
UnionMode::Sparse => None,
@@ -239,15 +238,15 @@ impl ArrayReader<'_> {
239238
let mut children = Vec::with_capacity(fields.len());
240239

241240
for (_id, field) in fields.iter() {
242-
let child = reader.create_array(field, variadic_counts)?;
241+
let child = self.create_array(field, variadic_counts)?;
243242
children.push(child);
244243
}
245244

246245
let array = UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?;
247246
Ok(Arc::new(array))
248247
}
249248
Null => {
250-
let node = reader.next_node(field)?;
249+
let node = self.next_node(field)?;
251250
let length = node.length();
252251
let null_count = node.null_count();
253252

@@ -260,17 +259,17 @@ impl ArrayReader<'_> {
260259
let array_data = ArrayData::builder(data_type.clone())
261260
.len(length as usize)
262261
.offset(0)
263-
.align_buffers(!reader.require_alignment)
262+
.align_buffers(!self.require_alignment)
264263
.build()?;
265264

266265
// no buffer increases
267266
Ok(Arc::new(NullArray::from(array_data)))
268267
}
269268
_ => create_primitive_array(
270-
reader.next_node(field)?,
269+
self.next_node(field)?,
271270
data_type,
272-
&[reader.next_buffer()?, reader.next_buffer()?],
273-
reader.require_alignment,
271+
&[self.next_buffer()?, self.next_buffer()?],
272+
self.require_alignment,
274273
),
275274
}
276275
}
@@ -370,8 +369,11 @@ fn create_dictionary_array(
370369
}
371370
}
372371

373-
/// State for decoding arrays from an encoded [`RecordBatch`]
374-
struct ArrayReader<'a> {
372+
/// State for decoding Arrow arrays from an [IPC RecordBatch] structure to
373+
/// [`RecordBatch`]
374+
///
375+
/// [IPC RecordBatch]: crate::RecordBatch
376+
struct RecordBatchDecoder<'a> {
375377
/// The flatbuffers encoded record batch
376378
batch: crate::RecordBatch<'a>,
377379
/// The output schema
@@ -389,14 +391,14 @@ struct ArrayReader<'a> {
389391
/// The buffers comprising this array
390392
buffers: VectorIter<'a, crate::Buffer>,
391393
/// Projection (subset of columns) to read, if any
392-
/// See [`ArrayReader::with_projection`] for details
394+
/// See [`RecordBatchDecoder::with_projection`] for details
393395
projection: Option<&'a [usize]>,
394396
/// Are buffers required to already be aligned? See
395-
/// [`ArrayReader::with_require_alignment`] for details
397+
/// [`RecordBatchDecoder::with_require_alignment`] for details
396398
require_alignment: bool,
397399
}
398400

399-
impl<'a> ArrayReader<'a> {
401+
impl<'a> RecordBatchDecoder<'a> {
400402
/// Create a reader for decoding arrays from an encoded [`RecordBatch`]
401403
fn try_new(
402404
buf: &'a Buffer,
@@ -604,7 +606,7 @@ pub fn read_record_batch(
604606
projection: Option<&[usize]>,
605607
metadata: &MetadataVersion,
606608
) -> Result<RecordBatch, ArrowError> {
607-
ArrayReader::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
609+
RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
608610
.with_projection(projection)
609611
.with_require_alignment(false)
610612
.read_record_batch()
@@ -652,7 +654,7 @@ fn read_dictionary_impl(
652654
let value = value_type.as_ref().clone();
653655
let schema = Schema::new(vec![Field::new("", value, true)]);
654656
// Read a single column
655-
let record_batch = ArrayReader::try_new(
657+
let record_batch = RecordBatchDecoder::try_new(
656658
buf,
657659
batch.data().unwrap(),
658660
Arc::new(schema),
@@ -876,7 +878,7 @@ impl FileDecoder {
876878
ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
877879
})?;
878880
// read the block that makes up the record batch into a buffer
879-
ArrayReader::try_new(
881+
RecordBatchDecoder::try_new(
880882
&buf.slice(block.metaDataLength() as _),
881883
batch,
882884
self.schema.clone(),
@@ -1426,7 +1428,7 @@ impl<R: Read> StreamReader<R> {
14261428
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
14271429
self.reader.read_exact(&mut buf)?;
14281430

1429-
ArrayReader::try_new(
1431+
RecordBatchDecoder::try_new(
14301432
&buf.into(),
14311433
batch,
14321434
self.schema(),
@@ -2277,7 +2279,7 @@ mod tests {
22772279
assert_ne!(b.as_ptr().align_offset(8), 0);
22782280

22792281
let ipc_batch = message.header_as_record_batch().unwrap();
2280-
let roundtrip = ArrayReader::try_new(
2282+
let roundtrip = RecordBatchDecoder::try_new(
22812283
&b,
22822284
ipc_batch,
22832285
batch.schema(),
@@ -2316,7 +2318,7 @@ mod tests {
23162318
assert_ne!(b.as_ptr().align_offset(8), 0);
23172319

23182320
let ipc_batch = message.header_as_record_batch().unwrap();
2319-
let result = ArrayReader::try_new(
2321+
let result = RecordBatchDecoder::try_new(
23202322
&b,
23212323
ipc_batch,
23222324
batch.schema(),

arrow-ipc/src/reader/stream.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow_buffer::{Buffer, MutableBuffer};
2424
use arrow_schema::{ArrowError, SchemaRef};
2525

2626
use crate::convert::MessageBuffer;
27-
use crate::reader::{read_dictionary_impl, ArrayReader};
27+
use crate::reader::{read_dictionary_impl, RecordBatchDecoder};
2828
use crate::{MessageHeader, CONTINUATION_MARKER};
2929

3030
/// A low-level interface for reading [`RecordBatch`] data from a stream of bytes
@@ -211,7 +211,7 @@ impl StreamDecoder {
211211
let schema = self.schema.clone().ok_or_else(|| {
212212
ArrowError::IpcError("Missing schema".to_string())
213213
})?;
214-
let batch = ArrayReader::try_new(
214+
let batch = RecordBatchDecoder::try_new(
215215
&body,
216216
batch,
217217
schema,

0 commit comments

Comments
 (0)