Skip to content

Commit a55b218

Browse files
authored
fix: lazy create merge function in merge file split read (#58)
1 parent 302fabd commit a55b218

File tree

3 files changed

+106
-24
lines changed

3 files changed

+106
-24
lines changed

src/paimon/core/operation/merge_file_split_read.cpp

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,6 @@ Result<std::unique_ptr<MergeFileSplitRead>> MergeFileSplitRead::Create(
9595
PAIMON_RETURN_NOT_OK(GenerateKeyValueReadSchema(
9696
*table_schema, core_options, context->GetReadSchema(), &value_schema, &read_schema,
9797
&key_comparator, &interval_partition_comparator, &user_defined_seq_comparator));
98-
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<MergeFunction> merge_function,
99-
PrimaryKeyTableUtils::CreateMergeFunction(
100-
value_schema, table_schema->PrimaryKeys(), core_options));
101-
if (core_options.NeedLookup() && core_options.GetMergeEngine() != MergeEngine::FIRST_ROW) {
102-
// don't wrap first row, it is already OK
103-
merge_function = std::make_unique<LookupMergeFunction>(std::move(merge_function));
104-
}
105-
auto merge_function_wrapper =
106-
std::make_shared<ReducerMergeFunctionWrapper>(std::move(merge_function));
10798

10899
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Predicate> predicate_for_keys,
109100
GenerateKeyPredicates(context->GetPredicate(), *table_schema));
@@ -120,7 +111,7 @@ Result<std::unique_ptr<MergeFileSplitRead>> MergeFileSplitRead::Create(
120111
path_factory, context,
121112
std::make_unique<SchemaManager>(core_options.GetFileSystem(), context->GetPath(),
122113
context->GetCoreOptions().GetBranch()),
123-
key_arity, value_schema, read_schema, projection, merge_function_wrapper, key_comparator,
114+
key_arity, value_schema, read_schema, projection, key_comparator,
124115
interval_partition_comparator, user_defined_seq_comparator, predicate_for_keys, memory_pool,
125116
executor));
126117
}
@@ -144,11 +135,34 @@ Result<std::unique_ptr<BatchReader>> MergeFileSplitRead::CreateReader(
144135
CreateNoMergeReader(data_split, /*only_filter_key=*/data_split->IsStreaming(),
145136
data_file_path_factory));
146137
} else {
138+
if (!merge_function_wrapper_) {
139+
// In deletion vector mode, streaming data split or postpone bucket mode, we don't need
140+
// to use merge function. Even if the merge function in CoreOptions is not supported, it
141+
// should not affect data reading. So we create merge_function_wrapper_ lazily, to avoid
142+
// raise errors when creating MergeFileSplitRead at the beginning.
143+
PAIMON_ASSIGN_OR_RAISE(
144+
merge_function_wrapper_,
145+
CreateMergeFunctionWrapper(options_, context_->GetTableSchema(), value_schema_));
146+
}
147147
PAIMON_ASSIGN_OR_RAISE(batch_reader, CreateMergeReader(data_split, data_file_path_factory));
148148
}
149149
return std::make_unique<CompleteRowKindBatchReader>(std::move(batch_reader), pool_);
150150
}
151151

152+
Result<std::shared_ptr<MergeFunctionWrapper<KeyValue>>>
153+
MergeFileSplitRead::CreateMergeFunctionWrapper(const CoreOptions& core_options,
154+
const std::shared_ptr<TableSchema>& table_schema,
155+
const std::shared_ptr<arrow::Schema>& value_schema) {
156+
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<MergeFunction> merge_function,
157+
PrimaryKeyTableUtils::CreateMergeFunction(
158+
value_schema, table_schema->PrimaryKeys(), core_options));
159+
if (core_options.NeedLookup() && core_options.GetMergeEngine() != MergeEngine::FIRST_ROW) {
160+
// don't wrap first row, it is already OK
161+
merge_function = std::make_unique<LookupMergeFunction>(std::move(merge_function));
162+
}
163+
return std::make_shared<ReducerMergeFunctionWrapper>(std::move(merge_function));
164+
}
165+
152166
Result<std::unique_ptr<BatchReader>> MergeFileSplitRead::ApplyIndexAndDvReaderIfNeeded(
153167
std::unique_ptr<FileBatchReader>&& file_reader, const std::shared_ptr<DataFileMeta>& file,
154168
const std::shared_ptr<arrow::Schema>& data_schema,
@@ -223,7 +237,6 @@ MergeFileSplitRead::MergeFileSplitRead(
223237
std::unique_ptr<SchemaManager>&& schema_manager, int32_t key_arity,
224238
const std::shared_ptr<arrow::Schema>& value_schema,
225239
const std::shared_ptr<arrow::Schema>& read_schema, const std::vector<int32_t>& projection,
226-
const std::shared_ptr<MergeFunctionWrapper<KeyValue>>& merge_function_wrapper,
227240
const std::shared_ptr<FieldsComparator>& key_comparator,
228241
const std::shared_ptr<FieldsComparator>& interval_partition_comparator,
229242
const std::shared_ptr<FieldsComparator>& user_defined_seq_comparator,
@@ -234,7 +247,6 @@ MergeFileSplitRead::MergeFileSplitRead(
234247
value_schema_(value_schema),
235248
read_schema_(read_schema),
236249
projection_(projection),
237-
merge_function_wrapper_(merge_function_wrapper),
238250
key_comparator_(key_comparator),
239251
interval_partition_comparator_(interval_partition_comparator),
240252
user_defined_seq_comparator_(user_defined_seq_comparator),

src/paimon/core/operation/merge_file_split_read.h

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -117,20 +117,24 @@ class MergeFileSplitRead : public AbstractSplitRead {
117117
Result<std::unique_ptr<SortMergeReader>> CreateSortMergeReader(
118118
std::vector<std::unique_ptr<KeyValueRecordReader>>&& record_readers) const;
119119

120-
MergeFileSplitRead(
121-
const std::shared_ptr<FileStorePathFactory>& path_factory,
122-
const std::shared_ptr<InternalReadContext>& context,
123-
std::unique_ptr<SchemaManager>&& schema_manager, int32_t key_arity,
124-
const std::shared_ptr<arrow::Schema>& value_schema,
125-
const std::shared_ptr<arrow::Schema>& read_schema, const std::vector<int32_t>& projection,
126-
const std::shared_ptr<MergeFunctionWrapper<KeyValue>>& merge_function_wrapper,
127-
const std::shared_ptr<FieldsComparator>& key_comparator,
128-
const std::shared_ptr<FieldsComparator>& interval_partition_comparator,
129-
const std::shared_ptr<FieldsComparator>& user_defined_seq_comparator,
130-
const std::shared_ptr<Predicate>& predicate_for_keys,
131-
const std::shared_ptr<MemoryPool>& memory_pool, const std::shared_ptr<Executor>& executor);
120+
MergeFileSplitRead(const std::shared_ptr<FileStorePathFactory>& path_factory,
121+
const std::shared_ptr<InternalReadContext>& context,
122+
std::unique_ptr<SchemaManager>&& schema_manager, int32_t key_arity,
123+
const std::shared_ptr<arrow::Schema>& value_schema,
124+
const std::shared_ptr<arrow::Schema>& read_schema,
125+
const std::vector<int32_t>& projection,
126+
const std::shared_ptr<FieldsComparator>& key_comparator,
127+
const std::shared_ptr<FieldsComparator>& interval_partition_comparator,
128+
const std::shared_ptr<FieldsComparator>& user_defined_seq_comparator,
129+
const std::shared_ptr<Predicate>& predicate_for_keys,
130+
const std::shared_ptr<MemoryPool>& memory_pool,
131+
const std::shared_ptr<Executor>& executor);
132132

133133
private:
134+
static Result<std::shared_ptr<MergeFunctionWrapper<KeyValue>>> CreateMergeFunctionWrapper(
135+
const CoreOptions& core_options, const std::shared_ptr<TableSchema>& table_schema,
136+
const std::shared_ptr<arrow::Schema>& value_schema);
137+
134138
static Status GenerateKeyValueReadSchema(
135139
const TableSchema& table_schema, const CoreOptions& options,
136140
const std::shared_ptr<arrow::Schema>& raw_read_schema,

test/inte/scan_and_read_inte_test.cpp

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,6 +1228,72 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot3WithPredicate) {
12281228
ASSERT_TRUE(result_plan->Splits().empty());
12291229
}
12301230

1231+
TEST_P(ScanAndReadInteTest, TestWithPKWithDvWithInvalidAggregateBatchScanSnapshot3) {
1232+
auto [file_format, enable_prefetch] = GetParam();
1233+
std::string table_path = paimon::test::GetDataDir() + file_format +
1234+
"/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/";
1235+
1236+
ScanContextBuilder scan_context_builder(table_path);
1237+
scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3")
1238+
.AddOption(Options::MERGE_ENGINE, "aggregation")
1239+
.AddOption("fields.f3.aggregate-function", "rbm32");
1240+
scan_context_builder.SetBucketFilter(1).SetPartitionFilter({{{"f1", "10"}}});
1241+
ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish());
1242+
ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context)));
1243+
1244+
ReadContextBuilder read_context_builder(table_path);
1245+
AddReadOptionsForPrefetch(&read_context_builder);
1246+
read_context_builder.AddOption(Options::MERGE_ENGINE, "aggregation")
1247+
.AddOption("fields.f3.aggregate-function", "rbm32");
1248+
ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish());
1249+
ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context)));
1250+
1251+
ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan());
1252+
ASSERT_EQ(result_plan->SnapshotId().value(), 3);
1253+
ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits()));
1254+
ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get()));
1255+
1256+
// check result
1257+
auto expected = std::make_shared<arrow::ChunkedArray>(
1258+
arrow::ipc::internal::json::ArrayFromJSON(arrow_data_type_, R"([
1259+
[0, "Alex", 10, 0, 16.1],
1260+
[0, "Bob", 10, 0, 12.1],
1261+
[0, "David", 10, 0, 17.1],
1262+
[0, "Emily", 10, 0, 13.1],
1263+
[0, "Tony", 10, 0, 14.1]
1264+
])")
1265+
.ValueOrDie());
1266+
ASSERT_TRUE(expected);
1267+
ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString();
1268+
}
1269+
1270+
TEST_P(ScanAndReadInteTest, TestWithPKWithMorWithInvalidAggregateBatchScanSnapshot3) {
1271+
auto [file_format, enable_prefetch] = GetParam();
1272+
std::string table_path = paimon::test::GetDataDir() + file_format +
1273+
"/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/";
1274+
1275+
ScanContextBuilder scan_context_builder(table_path);
1276+
scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3")
1277+
.AddOption(Options::MERGE_ENGINE, "aggregation")
1278+
.AddOption("fields.f3.aggregate-function", "rbm32");
1279+
scan_context_builder.SetBucketFilter(1).SetPartitionFilter({{{"f1", "10"}}});
1280+
ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish());
1281+
ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context)));
1282+
1283+
ReadContextBuilder read_context_builder(table_path);
1284+
AddReadOptionsForPrefetch(&read_context_builder);
1285+
read_context_builder.AddOption(Options::MERGE_ENGINE, "aggregation")
1286+
.AddOption("fields.f3.aggregate-function", "rbm32");
1287+
ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish());
1288+
ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context)));
1289+
1290+
ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan());
1291+
ASSERT_EQ(result_plan->SnapshotId().value(), 3);
1292+
ASSERT_NOK_WITH_MSG(
1293+
table_read->CreateReader(result_plan->Splits()),
1294+
"Use unsupported aggregation rbm32 or spell aggregate function incorrectly");
1295+
}
1296+
12311297
TEST_P(ScanAndReadInteTest, TestWithPKWithAggregateBatchScanSnapshot3WithPredicate) {
12321298
auto [file_format, enable_prefetch] = GetParam();
12331299
std::string table_path = paimon::test::GetDataDir() + file_format +

0 commit comments

Comments
 (0)