Skip to content

Commit e4393e5

Browse files
committed
fix pipeline
1 parent b468b7f commit e4393e5

12 files changed

Lines changed: 249 additions & 198 deletions

File tree

be/src/exec/scan/file_scanner.cpp

Lines changed: 16 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
#include "format/table/paimon_jni_reader.h"
7676
#include "format/table/paimon_predicate_converter.h"
7777
#include "format/table/paimon_reader.h"
78+
#include "format/table/partition_column_filler.h"
7879
#include "format/table/remote_doris_reader.h"
7980
#include "format/table/transactional_hive_reader.h"
8081
#include "format/table/trino_connector_jni_reader.h"
@@ -272,33 +273,12 @@ Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_al
272273
for (auto const& partition_col_desc : _partition_col_descs) {
273274
const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
274275
auto data_type = partition_slot_desc->get_data_type_ptr();
275-
auto test_serde = data_type->get_serde();
276276
auto partition_value_column = data_type->create_column();
277-
auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
278-
Slice slice(partition_value.data(), partition_value.size());
279-
uint64_t num_deserialized = 0;
280-
DataTypeSerDe::FormatOptions options {};
281-
if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
282-
// for iceberg/paimon table
283-
// NOTICE: column is always be nullable for iceberg/paimon table now
284-
DCHECK(data_type->is_nullable());
285-
test_serde = test_serde->get_nested_serdes()[0];
286-
auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
287-
if (_partition_value_is_null[partition_slot_desc->col_name()]) {
288-
null_column->insert_many_defaults(partition_value_column_size);
289-
} else {
290-
// If the partition value is not null, we set null map to 0 and deserialize it normally.
291-
null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
292-
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
293-
null_column->get_nested_column(), slice, partition_value_column_size,
294-
&num_deserialized, options));
295-
}
296-
} else {
297-
// for hive/hudi table, the null value is set as "\\N"
298-
// TODO: this will be unified as iceberg/paimon table in the future
299-
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
300-
*col_ptr, slice, partition_value_column_size, &num_deserialized, options));
301-
}
277+
auto null_it = _partition_value_is_null.find(partition_slot_desc->col_name());
278+
DORIS_CHECK(null_it != _partition_value_is_null.end());
279+
RETURN_IF_ERROR(fill_partition_column_from_path_value(
280+
*partition_value_column, *partition_slot_desc, partition_value,
281+
partition_value_column_size, null_it->second));
302282

303283
partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
304284
}
@@ -1547,6 +1527,10 @@ Status FileScanner::_generate_partition_columns() {
15471527
if (!range.__isset.columns_from_path_keys) {
15481528
return Status::OK();
15491529
}
1530+
DORIS_CHECK(range.__isset.columns_from_path);
1531+
DORIS_CHECK(range.__isset.columns_from_path_is_null);
1532+
DORIS_CHECK(range.columns_from_path.size() == range.columns_from_path_keys.size());
1533+
DORIS_CHECK(range.columns_from_path_is_null.size() == range.columns_from_path_keys.size());
15501534

15511535
std::unordered_map<std::string, int> partition_name_to_key_index;
15521536
int index = 0;
@@ -1561,16 +1545,12 @@ Status FileScanner::_generate_partition_columns() {
15611545
}
15621546
auto pit = partition_name_to_key_index.find(col_desc.name);
15631547
if (pit != partition_name_to_key_index.end()) {
1564-
int values_index = pit->second;
1565-
if (range.__isset.columns_from_path && values_index < range.columns_from_path.size()) {
1566-
_partition_col_descs.emplace(
1567-
col_desc.name,
1568-
std::make_tuple(range.columns_from_path[values_index], col_desc.slot_desc));
1569-
if (range.__isset.columns_from_path_is_null) {
1570-
_partition_value_is_null.emplace(col_desc.name,
1571-
range.columns_from_path_is_null[values_index]);
1572-
}
1573-
}
1548+
auto values_index = static_cast<size_t>(pit->second);
1549+
_partition_col_descs.emplace(
1550+
col_desc.name,
1551+
std::make_tuple(range.columns_from_path[values_index], col_desc.slot_desc));
1552+
_partition_value_is_null.emplace(col_desc.name,
1553+
range.columns_from_path_is_null[values_index]);
15741554
}
15751555
}
15761556
return Status::OK();

be/src/format/jni/jni_reader.cpp

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "core/block/block.h"
2929
#include "core/types.h"
3030
#include "format/jni/jni_data_bridge.h"
31+
#include "format/table/partition_column_filler.h"
3132
#include "runtime/descriptors.h"
3233
#include "runtime/runtime_state.h"
3334
#include "util/jni-util.h"
@@ -81,6 +82,12 @@ Status JniReader::on_before_init_reader(ReaderInitContext* ctx) {
8182
return Status::OK();
8283
}
8384

85+
DORIS_CHECK(ctx->range->__isset.columns_from_path);
86+
DORIS_CHECK(ctx->range->__isset.columns_from_path_is_null);
87+
DORIS_CHECK(ctx->range->columns_from_path.size() == ctx->range->columns_from_path_keys.size());
88+
DORIS_CHECK(ctx->range->columns_from_path_is_null.size() ==
89+
ctx->range->columns_from_path_keys.size());
90+
8491
std::unordered_map<std::string, const SlotDescriptor*> name_to_slot;
8592
for (auto* slot : ctx->tuple_descriptor->slots()) {
8693
name_to_slot.emplace(slot->col_name(), slot);
@@ -91,15 +98,9 @@ Status JniReader::on_before_init_reader(ReaderInitContext* ctx) {
9198
if (slot_it == name_to_slot.end()) {
9299
continue;
93100
}
94-
std::string value;
95-
if (ctx->range->__isset.columns_from_path && i < ctx->range->columns_from_path.size()) {
96-
value = ctx->range->columns_from_path[i];
97-
}
98-
_partition_values.emplace(key, std::make_tuple(std::move(value), slot_it->second));
99-
if (ctx->range->__isset.columns_from_path_is_null &&
100-
i < ctx->range->columns_from_path_is_null.size()) {
101-
_partition_value_is_null.emplace(key, ctx->range->columns_from_path_is_null[i]);
102-
}
101+
_partition_values.emplace(
102+
key, std::make_tuple(ctx->range->columns_from_path[i], slot_it->second));
103+
_partition_value_is_null.emplace(key, ctx->range->columns_from_path_is_null[i]);
103104
}
104105
return Status::OK();
105106
}
@@ -329,7 +330,6 @@ Status JniReader::_fill_partition_columns(Block* block, size_t num_rows) {
329330
col_map = &local_name_to_idx;
330331
}
331332

332-
DataTypeSerDe::FormatOptions text_format_options;
333333
for (const auto& desc : *_column_descs) {
334334
if (desc.category != ColumnCategory::PARTITION_KEY) {
335335
continue;
@@ -346,34 +346,10 @@ Status JniReader::_fill_partition_columns(Block* block, size_t num_rows) {
346346

347347
auto mutable_column = block->get_by_position(col_it->second).column->assume_mutable();
348348
const auto& [value, slot_desc] = value_it->second;
349-
auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
350-
uint64_t num_deserialized = 0;
351-
bool is_null = _partition_value_is_null.contains(desc.name) &&
352-
_partition_value_is_null.at(desc.name);
353-
if (is_null) {
354-
DCHECK(slot_desc->get_data_type_ptr()->is_nullable());
355-
auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
356-
nullable_column->insert_many_defaults(num_rows);
357-
continue;
358-
}
359-
360-
Slice slice(value.data(), value.size());
361-
if (slot_desc->get_data_type_ptr()->is_nullable()) {
362-
auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
363-
auto nested_serde = text_serde->get_nested_serdes()[0];
364-
nullable_column->get_null_map_column().insert_many_vals(0, num_rows);
365-
RETURN_IF_ERROR(nested_serde->deserialize_column_from_fixed_json(
366-
nullable_column->get_nested_column(), slice, num_rows, &num_deserialized,
367-
text_format_options));
368-
} else {
369-
RETURN_IF_ERROR(text_serde->deserialize_column_from_fixed_json(
370-
*mutable_column, slice, num_rows, &num_deserialized, text_format_options));
371-
}
372-
if (num_deserialized != num_rows) {
373-
return Status::InternalError(
374-
"Failed to fill partition column: {}={}. Expected rows: {}, actual: {}",
375-
slot_desc->col_name(), value, num_rows, num_deserialized);
376-
}
349+
auto null_it = _partition_value_is_null.find(desc.name);
350+
DORIS_CHECK(null_it != _partition_value_is_null.end());
351+
RETURN_IF_ERROR(fill_partition_column_from_path_value(*mutable_column, *slot_desc, value,
352+
num_rows, null_it->second));
377353
}
378354
return Status::OK();
379355
}

be/src/format/table/paimon_cpp_reader.cpp

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "core/block/block.h"
3030
#include "core/block/column_with_type_and_name.h"
3131
#include "format/table/paimon_doris_file_system.h"
32+
#include "format/table/partition_column_filler.h"
3233
#include "paimon/defs.h"
3334
#include "paimon/memory/memory_pool.h"
3435
#include "paimon/read_context.h"
@@ -73,6 +74,12 @@ Status PaimonCppReader::on_before_init_reader(ReaderInitContext* ctx) {
7374
return Status::OK();
7475
}
7576

77+
DORIS_CHECK(ctx->range->__isset.columns_from_path);
78+
DORIS_CHECK(ctx->range->__isset.columns_from_path_is_null);
79+
DORIS_CHECK(ctx->range->columns_from_path.size() == ctx->range->columns_from_path_keys.size());
80+
DORIS_CHECK(ctx->range->columns_from_path_is_null.size() ==
81+
ctx->range->columns_from_path_keys.size());
82+
7683
std::unordered_map<std::string, const SlotDescriptor*> name_to_slot;
7784
for (auto* slot : ctx->tuple_descriptor->slots()) {
7885
name_to_slot.emplace(slot->col_name(), slot);
@@ -83,15 +90,9 @@ Status PaimonCppReader::on_before_init_reader(ReaderInitContext* ctx) {
8390
if (slot_it == name_to_slot.end()) {
8491
continue;
8592
}
86-
std::string value;
87-
if (ctx->range->__isset.columns_from_path && i < ctx->range->columns_from_path.size()) {
88-
value = ctx->range->columns_from_path[i];
89-
}
90-
_partition_values.emplace(key, std::make_tuple(std::move(value), slot_it->second));
91-
if (ctx->range->__isset.columns_from_path_is_null &&
92-
i < ctx->range->columns_from_path_is_null.size()) {
93-
_partition_value_is_null.emplace(key, ctx->range->columns_from_path_is_null[i]);
94-
}
93+
_partition_values.emplace(
94+
key, std::make_tuple(ctx->range->columns_from_path[i], slot_it->second));
95+
_partition_value_is_null.emplace(key, ctx->range->columns_from_path_is_null[i]);
9596
}
9697
return Status::OK();
9798
}
@@ -198,7 +199,6 @@ Status PaimonCppReader::_fill_partition_columns(Block* block, size_t num_rows) {
198199
_col_name_to_block_idx = block->get_name_to_pos_map();
199200
}
200201

201-
DataTypeSerDe::FormatOptions text_format_options;
202202
for (const auto& desc : *_column_descs) {
203203
if (desc.category != ColumnCategory::PARTITION_KEY) {
204204
continue;
@@ -215,34 +215,10 @@ Status PaimonCppReader::_fill_partition_columns(Block* block, size_t num_rows) {
215215

216216
auto mutable_column = block->get_by_position(col_it->second).column->assume_mutable();
217217
const auto& [value, slot_desc] = value_it->second;
218-
auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
219-
uint64_t num_deserialized = 0;
220-
bool is_null = _partition_value_is_null.contains(desc.name) &&
221-
_partition_value_is_null.at(desc.name);
222-
if (is_null) {
223-
DCHECK(slot_desc->get_data_type_ptr()->is_nullable());
224-
auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
225-
nullable_column->insert_many_defaults(num_rows);
226-
continue;
227-
}
228-
229-
Slice slice(value.data(), value.size());
230-
if (slot_desc->get_data_type_ptr()->is_nullable()) {
231-
auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
232-
auto nested_serde = text_serde->get_nested_serdes()[0];
233-
nullable_column->get_null_map_column().insert_many_vals(0, num_rows);
234-
RETURN_IF_ERROR(nested_serde->deserialize_column_from_fixed_json(
235-
nullable_column->get_nested_column(), slice, num_rows, &num_deserialized,
236-
text_format_options));
237-
} else {
238-
RETURN_IF_ERROR(text_serde->deserialize_column_from_fixed_json(
239-
*mutable_column, slice, num_rows, &num_deserialized, text_format_options));
240-
}
241-
if (num_deserialized != num_rows) {
242-
return Status::InternalError(
243-
"Failed to fill partition column: {}={}. Expected rows: {}, actual: {}",
244-
slot_desc->col_name(), value, num_rows, num_deserialized);
245-
}
218+
auto null_it = _partition_value_is_null.find(desc.name);
219+
DORIS_CHECK(null_it != _partition_value_is_null.end());
220+
RETURN_IF_ERROR(fill_partition_column_from_path_value(*mutable_column, *slot_desc, value,
221+
num_rows, null_it->second));
246222
}
247223
return Status::OK();
248224
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <glog/logging.h>
21+
22+
#include <string>
23+
24+
#include "common/status.h"
25+
#include "core/assert_cast.h"
26+
#include "core/column/column_nullable.h"
27+
#include "core/data_type_serde/data_type_serde.h"
28+
#include "runtime/descriptors.h"
29+
#include "util/slice.h"
30+
31+
namespace doris {
32+
33+
inline Status fill_partition_column_from_path_value(
34+
IColumn& column, const SlotDescriptor& slot_desc, const std::string& value, size_t rows,
35+
bool explicit_null_marker, DataTypeSerDe::FormatOptions text_format_options = {}) {
36+
auto data_type = slot_desc.get_data_type_ptr();
37+
auto text_serde = data_type->get_serde();
38+
uint64_t num_deserialized = 0;
39+
40+
if (explicit_null_marker) {
41+
DCHECK(data_type->is_nullable());
42+
auto* nullable_column = assert_cast<ColumnNullable*>(&column);
43+
nullable_column->insert_many_defaults(rows);
44+
return Status::OK();
45+
}
46+
47+
Slice slice(value.data(), value.size());
48+
Status status = Status::OK();
49+
if (data_type->is_nullable()) {
50+
auto* nullable_column = assert_cast<ColumnNullable*>(&column);
51+
auto nested_serde = text_serde->get_nested_serdes()[0];
52+
nullable_column->get_null_map_column().insert_many_vals(0, rows);
53+
status = nested_serde->deserialize_column_from_fixed_json(
54+
nullable_column->get_nested_column(), slice, rows, &num_deserialized,
55+
text_format_options);
56+
} else {
57+
status = text_serde->deserialize_column_from_fixed_json(
58+
column, slice, rows, &num_deserialized, text_format_options);
59+
}
60+
if (!status.ok()) {
61+
return Status::InternalError("Failed to fill partition column: {}={}", slot_desc.col_name(),
62+
value);
63+
}
64+
if (num_deserialized != rows) {
65+
return Status::InternalError(
66+
"Failed to fill partition column: {}={}. Expected rows: {}, actual: {}",
67+
slot_desc.col_name(), value, rows, num_deserialized);
68+
}
69+
return Status::OK();
70+
}
71+
72+
} // namespace doris

be/src/format/table/table_format_reader.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ Status TableFormatReader::_extract_partition_values(
3636
partition_value_is_null->clear();
3737
}
3838
if (range.__isset.columns_from_path_keys && tuple_descriptor != nullptr) {
39+
DORIS_CHECK(range.__isset.columns_from_path);
40+
DORIS_CHECK(range.__isset.columns_from_path_is_null);
41+
DORIS_CHECK(range.columns_from_path.size() == range.columns_from_path_keys.size());
42+
DORIS_CHECK(range.columns_from_path_is_null.size() == range.columns_from_path_keys.size());
3943
std::unordered_map<std::string, const SlotDescriptor*> name_to_slot;
4044
for (auto* slot : tuple_descriptor->slots()) {
4145
name_to_slot[slot->col_name()] = slot;
@@ -46,8 +50,7 @@ Status TableFormatReader::_extract_partition_values(
4650
auto slot_it = name_to_slot.find(key);
4751
if (slot_it != name_to_slot.end()) {
4852
partition_values.emplace(key, std::make_tuple(value, slot_it->second));
49-
if (partition_value_is_null != nullptr && range.__isset.columns_from_path_is_null &&
50-
i < range.columns_from_path_is_null.size()) {
53+
if (partition_value_is_null != nullptr) {
5154
partition_value_is_null->emplace(key, range.columns_from_path_is_null[i]);
5255
}
5356
}

0 commit comments

Comments
 (0)