Skip to content

Commit d261a82

Browse files
authored
apacheGH-42146: [MATLAB] Add IPC RecordBatchFileReader and RecordBatchFileWriter MATLAB classes (apache#42201)
### Rationale for this change To enable initial IPC I/O support in the MATLAB interface, we should add a `RecordBatchFileReader` class and a `RecordBatchFileWriter` class. ### What changes are included in this PR? 1. Added a new `arrow.io.ipc.RecordBatchFileWriter` class. 2. Added a new `arrow.io.ipc.RecordBatchFileReader` class. **Example** ```matlab >> city = ["Boston" "Seattle" "Denver" "Juno" "Anchorage" "Chicago"]'; >> daylength = duration(["15:17:01" "15:59:16" "14:59:14" "19:21:23" "14:18:24" "15:13:39"])'; >> matlabTable = table(city, daylength, VariableNames=["City", "DayLength"]); >> recordBatch1 = arrow.recordBatch(matlabTable(1:4, :)) >> recordBatch2 = arrow.recordBatch(matlabTable(5:end, :)); >> writer = arrow.io.ipc.RecordBatchFileWriter("daylight.arrow", recordBatch1.Schema); >> writer.writeRecordBatch(recordBatch1); >> writer.writeRecordBatch(recordBatch2); >> writer.close(); >> reader = arrow.io.ipc.RecordBatchFileReader("daylight.arrow"); reader = RecordBatchFileReader with properties: NumRecordBatches: 2 Schema: [1×1 arrow.tabular.Schema] >> reader.Schema ans = Arrow Schema with 2 fields: City: String | DayLength: Time64 >> rb1 = reader.read(1); >> isequal(rb1, recordBatch1) ans = logical 1 >> rb2 = reader.read(2); >> isequal(rb2, recordBatch2) ans = logical 1 ``` ### Are these changes tested? Yes. Added two new test files: 1. `arrow/matlab/test/io/ipc/tRecordBatchFileWriter.m` 2. `arrow/matlab/test/io/ipc/tRecordBatchFileReader.m` ### Are there any user-facing changes? Yes. Users can now serialize `RecordBatch`es and `Table`s to files using the Arrow IPC data format as well as read in `RecordBatch`es from Arrow IPC data files. ### Future Directions 1. Add `RecordBatchStreamWriter` and `RecordBatchStreamReader` 2. Expose options for [controlling](https://github.com/apache/arrow/blob/main/cpp/src/arrow/ipc/options.h) IPC reading and writing in MATLAB. 3. Add more methods to `RecordBatchFileReader` to read in multiple record batches at once as well as importing the data as an Arrow `Table`. * GitHub Issue: apache#42146 Authored-by: Sarah Gilmore <[email protected]> Signed-off-by: Sarah Gilmore <[email protected]>
1 parent 89d6354 commit d261a82

11 files changed

+871
-1
lines changed

matlab/src/cpp/arrow/matlab/error/error.h

+7
Original file line numberDiff line numberDiff line change
@@ -242,5 +242,12 @@ static const char* ARRAY_SLICE_FAILED_TO_CREATE_ARRAY_PROXY =
242242
"arrow:array:slice:FailedToCreateArrayProxy";
243243
static const char* C_EXPORT_FAILED = "arrow:c:export:ExportFailed";
244244
static const char* C_IMPORT_FAILED = "arrow:c:import:ImportFailed";
245+
static const char* IPC_RECORD_BATCH_WRITE_FAILED =
246+
"arrow:io:ipc:FailedToWriteRecordBatch";
247+
static const char* IPC_RECORD_BATCH_WRITE_CLOSE_FAILED = "arrow:io:ipc:CloseFailed";
248+
static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED =
249+
"arrow:io:ipc:FailedToOpenRecordBatchReader";
250+
static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX = "arrow:io:ipc:InvalidIndex";
251+
static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed";
245252

246253
} // namespace arrow::matlab::error
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/matlab/io/ipc/proxy/record_batch_file_reader.h"
19+
#include "arrow/io/file.h"
20+
#include "arrow/matlab/error/error.h"
21+
#include "arrow/matlab/tabular/proxy/record_batch.h"
22+
#include "arrow/matlab/tabular/proxy/schema.h"
23+
#include "arrow/util/utf8.h"
24+
25+
#include "libmexclass/proxy/ProxyManager.h"
26+
27+
namespace arrow::matlab::io::ipc::proxy {
28+
29+
namespace {
30+
libmexclass::error::Error makeInvalidNumericIndexError(const int32_t matlab_index,
31+
const int32_t num_batches) {
32+
std::stringstream error_message_stream;
33+
error_message_stream << "Invalid record batch index: ";
34+
error_message_stream << matlab_index;
35+
error_message_stream
36+
<< ". Record batch index must be between 1 and the number of record batches (";
37+
error_message_stream << num_batches;
38+
error_message_stream << ").";
39+
return libmexclass::error::Error{error::IPC_RECORD_BATCH_READ_INVALID_INDEX,
40+
error_message_stream.str()};
41+
}
42+
} // namespace
43+
44+
RecordBatchFileReader::RecordBatchFileReader(
45+
const std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader)
46+
: reader{std::move(reader)} {
47+
REGISTER_METHOD(RecordBatchFileReader, getNumRecordBatches);
48+
REGISTER_METHOD(RecordBatchFileReader, getSchema);
49+
REGISTER_METHOD(RecordBatchFileReader, readRecordBatchAtIndex);
50+
}
51+
52+
libmexclass::proxy::MakeResult RecordBatchFileReader::make(
53+
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
54+
namespace mda = ::matlab::data;
55+
using RecordBatchFileReaderProxy = arrow::matlab::io::ipc::proxy::RecordBatchFileReader;
56+
57+
const mda::StructArray opts = constructor_arguments[0];
58+
59+
const mda::StringArray filename_mda = opts[0]["Filename"];
60+
const auto filename_utf16 = std::u16string(filename_mda[0]);
61+
MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8,
62+
arrow::util::UTF16StringToUTF8(filename_utf16),
63+
error::UNICODE_CONVERSION_ERROR_ID);
64+
65+
MATLAB_ASSIGN_OR_ERROR(auto input_stream, arrow::io::ReadableFile::Open(filename_utf8),
66+
error::FAILED_TO_OPEN_FILE_FOR_WRITE);
67+
68+
MATLAB_ASSIGN_OR_ERROR(auto reader,
69+
arrow::ipc::RecordBatchFileReader::Open(input_stream),
70+
error::IPC_RECORD_BATCH_READER_OPEN_FAILED);
71+
72+
return std::make_shared<RecordBatchFileReaderProxy>(std::move(reader));
73+
}
74+
75+
void RecordBatchFileReader::getNumRecordBatches(
76+
libmexclass::proxy::method::Context& context) {
77+
namespace mda = ::matlab::data;
78+
79+
mda::ArrayFactory factory;
80+
const auto num_batches = reader->num_record_batches();
81+
context.outputs[0] = factory.createScalar(num_batches);
82+
}
83+
84+
void RecordBatchFileReader::getSchema(libmexclass::proxy::method::Context& context) {
85+
namespace mda = ::matlab::data;
86+
using SchemaProxy = arrow::matlab::tabular::proxy::Schema;
87+
88+
auto schema = reader->schema();
89+
90+
auto schema_proxy = std::make_shared<SchemaProxy>(std::move(schema));
91+
const auto schema_proxy_id =
92+
libmexclass::proxy::ProxyManager::manageProxy(schema_proxy);
93+
94+
mda::ArrayFactory factory;
95+
const auto schema_proxy_id_mda = factory.createScalar(schema_proxy_id);
96+
context.outputs[0] = schema_proxy_id_mda;
97+
}
98+
99+
void RecordBatchFileReader::readRecordBatchAtIndex(
100+
libmexclass::proxy::method::Context& context) {
101+
namespace mda = ::matlab::data;
102+
using RecordBatchProxy = arrow::matlab::tabular::proxy::RecordBatch;
103+
104+
mda::StructArray opts = context.inputs[0];
105+
const mda::TypedArray<int32_t> matlab_index_mda = opts[0]["Index"];
106+
107+
const auto matlab_index = matlab_index_mda[0];
108+
const auto num_record_batches = reader->num_record_batches();
109+
if (matlab_index < 1 || matlab_index > num_record_batches) {
110+
context.error = makeInvalidNumericIndexError(matlab_index, num_record_batches);
111+
return;
112+
}
113+
const auto arrow_index = matlab_index - 1;
114+
115+
MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(const auto record_batch,
116+
reader->ReadRecordBatch(arrow_index), context,
117+
error::IPC_RECORD_BATCH_READ_FAILED);
118+
119+
auto record_batch_proxy = std::make_shared<RecordBatchProxy>(std::move(record_batch));
120+
const auto record_batch_proxy_id =
121+
libmexclass::proxy::ProxyManager::manageProxy(record_batch_proxy);
122+
123+
mda::ArrayFactory factory;
124+
const auto record_batch_proxyy_id_mda = factory.createScalar(record_batch_proxy_id);
125+
context.outputs[0] = record_batch_proxyy_id_mda;
126+
}
127+
128+
} // namespace arrow::matlab::io::ipc::proxy
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include "arrow/ipc/reader.h"
21+
#include "libmexclass/proxy/Proxy.h"
22+
23+
namespace arrow::matlab::io::ipc::proxy {
24+
25+
class RecordBatchFileReader : public libmexclass::proxy::Proxy {
26+
public:
27+
RecordBatchFileReader(std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader);
28+
29+
~RecordBatchFileReader() = default;
30+
31+
static libmexclass::proxy::MakeResult make(
32+
const libmexclass::proxy::FunctionArguments& constructor_arguments);
33+
34+
protected:
35+
std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader;
36+
37+
void getNumRecordBatches(libmexclass::proxy::method::Context& context);
38+
39+
void getSchema(libmexclass::proxy::method::Context& context);
40+
41+
void readRecordBatchAtIndex(libmexclass::proxy::method::Context& context);
42+
};
43+
44+
} // namespace arrow::matlab::io::ipc::proxy
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/matlab/io/ipc/proxy/record_batch_file_writer.h"
19+
#include "arrow/io/file.h"
20+
#include "arrow/matlab/error/error.h"
21+
#include "arrow/matlab/tabular/proxy/record_batch.h"
22+
#include "arrow/matlab/tabular/proxy/schema.h"
23+
#include "arrow/matlab/tabular/proxy/table.h"
24+
#include "arrow/util/utf8.h"
25+
26+
#include "libmexclass/proxy/ProxyManager.h"
27+
28+
namespace arrow::matlab::io::ipc::proxy {
29+
30+
RecordBatchFileWriter::RecordBatchFileWriter(
31+
const std::shared_ptr<arrow::ipc::RecordBatchWriter> writer)
32+
: writer{std::move(writer)} {
33+
REGISTER_METHOD(RecordBatchFileWriter, close);
34+
REGISTER_METHOD(RecordBatchFileWriter, writeRecordBatch);
35+
REGISTER_METHOD(RecordBatchFileWriter, writeTable);
36+
}
37+
38+
libmexclass::proxy::MakeResult RecordBatchFileWriter::make(
39+
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
40+
namespace mda = ::matlab::data;
41+
using RecordBatchFileWriterProxy = arrow::matlab::io::ipc::proxy::RecordBatchFileWriter;
42+
using SchemaProxy = arrow::matlab::tabular::proxy::Schema;
43+
44+
const mda::StructArray opts = constructor_arguments[0];
45+
46+
const mda::StringArray filename_mda = opts[0]["Filename"];
47+
const auto filename_utf16 = std::u16string(filename_mda[0]);
48+
MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8,
49+
arrow::util::UTF16StringToUTF8(filename_utf16),
50+
error::UNICODE_CONVERSION_ERROR_ID);
51+
52+
const mda::TypedArray<uint64_t> arrow_schema_proxy_id_mda = opts[0]["SchemaProxyID"];
53+
auto proxy = libmexclass::proxy::ProxyManager::getProxy(arrow_schema_proxy_id_mda[0]);
54+
auto arrow_schema_proxy = std::static_pointer_cast<SchemaProxy>(proxy);
55+
auto arrow_schema = arrow_schema_proxy->unwrap();
56+
57+
MATLAB_ASSIGN_OR_ERROR(auto output_stream,
58+
arrow::io::FileOutputStream::Open(filename_utf8),
59+
error::FAILED_TO_OPEN_FILE_FOR_WRITE);
60+
61+
MATLAB_ASSIGN_OR_ERROR(auto writer,
62+
arrow::ipc::MakeFileWriter(output_stream, arrow_schema),
63+
"arrow:matlab:MakeFailed");
64+
65+
return std::make_shared<RecordBatchFileWriterProxy>(std::move(writer));
66+
}
67+
68+
void RecordBatchFileWriter::writeRecordBatch(
69+
libmexclass::proxy::method::Context& context) {
70+
namespace mda = ::matlab::data;
71+
using RecordBatchProxy = ::arrow::matlab::tabular::proxy::RecordBatch;
72+
73+
mda::StructArray opts = context.inputs[0];
74+
const mda::TypedArray<uint64_t> record_batch_proxy_id_mda =
75+
opts[0]["RecordBatchProxyID"];
76+
const uint64_t record_batch_proxy_id = record_batch_proxy_id_mda[0];
77+
78+
auto proxy = libmexclass::proxy::ProxyManager::getProxy(record_batch_proxy_id);
79+
auto record_batch_proxy = std::static_pointer_cast<RecordBatchProxy>(proxy);
80+
auto record_batch = record_batch_proxy->unwrap();
81+
82+
MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->WriteRecordBatch(*record_batch), context,
83+
error::IPC_RECORD_BATCH_WRITE_FAILED);
84+
}
85+
86+
void RecordBatchFileWriter::writeTable(libmexclass::proxy::method::Context& context) {
87+
namespace mda = ::matlab::data;
88+
using TableProxy = ::arrow::matlab::tabular::proxy::Table;
89+
90+
mda::StructArray opts = context.inputs[0];
91+
const mda::TypedArray<uint64_t> table_proxy_id_mda = opts[0]["TableProxyID"];
92+
const uint64_t table_proxy_id = table_proxy_id_mda[0];
93+
94+
auto proxy = libmexclass::proxy::ProxyManager::getProxy(table_proxy_id);
95+
auto table_proxy = std::static_pointer_cast<TableProxy>(proxy);
96+
auto table = table_proxy->unwrap();
97+
98+
MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->WriteTable(*table), context,
99+
error::IPC_RECORD_BATCH_WRITE_FAILED);
100+
}
101+
102+
void RecordBatchFileWriter::close(libmexclass::proxy::method::Context& context) {
103+
MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->Close(), context,
104+
error::IPC_RECORD_BATCH_WRITE_CLOSE_FAILED);
105+
}
106+
107+
} // namespace arrow::matlab::io::ipc::proxy
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/ipc/writer.h"
19+
#include "libmexclass/proxy/Proxy.h"
20+
21+
namespace arrow::matlab::io::ipc::proxy {
22+
23+
class RecordBatchFileWriter : public libmexclass::proxy::Proxy {
24+
public:
25+
RecordBatchFileWriter(std::shared_ptr<arrow::ipc::RecordBatchWriter> writer);
26+
27+
~RecordBatchFileWriter() = default;
28+
29+
static libmexclass::proxy::MakeResult make(
30+
const libmexclass::proxy::FunctionArguments& constructor_arguments);
31+
32+
protected:
33+
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
34+
35+
void writeRecordBatch(libmexclass::proxy::method::Context& context);
36+
37+
void writeTable(libmexclass::proxy::method::Context& context);
38+
39+
void close(libmexclass::proxy::method::Context& context);
40+
};
41+
42+
} // namespace arrow::matlab::io::ipc::proxy

matlab/src/cpp/arrow/matlab/proxy/factory.cc

+4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
#include "arrow/matlab/io/csv/proxy/table_writer.h"
3535
#include "arrow/matlab/io/feather/proxy/reader.h"
3636
#include "arrow/matlab/io/feather/proxy/writer.h"
37+
#include "arrow/matlab/io/ipc/proxy/record_batch_file_reader.h"
38+
#include "arrow/matlab/io/ipc/proxy/record_batch_file_writer.h"
3739
#include "arrow/matlab/tabular/proxy/record_batch.h"
3840
#include "arrow/matlab/tabular/proxy/schema.h"
3941
#include "arrow/matlab/tabular/proxy/table.h"
@@ -107,6 +109,8 @@ libmexclass::proxy::MakeResult Factory::make_proxy(
107109
REGISTER_PROXY(arrow.c.proxy.ArrayImporter , arrow::matlab::c::proxy::ArrayImporter);
108110
REGISTER_PROXY(arrow.c.proxy.Schema , arrow::matlab::c::proxy::Schema);
109111
REGISTER_PROXY(arrow.c.proxy.RecordBatchImporter , arrow::matlab::c::proxy::RecordBatchImporter);
112+
REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileReader , arrow::matlab::io::ipc::proxy::RecordBatchFileReader);
113+
REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileWriter , arrow::matlab::io::ipc::proxy::RecordBatchFileWriter);
110114
// clang-format on
111115

112116
return libmexclass::error::Error{error::UNKNOWN_PROXY_ERROR_ID,

0 commit comments

Comments
 (0)