Skip to content

Commit b878de6

Browse files
authored
feat(core): pk table scan support data manifest value_stats_cols filter (#157)
1 parent 11ec51d commit b878de6

File tree

7 files changed

+166
-41
lines changed

7 files changed

+166
-41
lines changed

src/paimon/core/operation/append_only_file_store_scan.cpp

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -157,30 +157,4 @@ Result<bool> AppendOnlyFileStoreScan::TestFileIndex(
157157
return index_result->IsRemain();
158158
}
159159

160-
Result<std::shared_ptr<Predicate>> AppendOnlyFileStoreScan::ReconstructPredicateWithNonCastedFields(
161-
const std::shared_ptr<Predicate>& predicate,
162-
const std::shared_ptr<SimpleStatsEvolution>& evolution) {
163-
const auto& id_to_data_fields = evolution->GetFieldIdToDataField();
164-
const auto& name_to_table_fields = evolution->GetFieldNameToTableField();
165-
166-
std::set<std::string> field_names_in_predicate;
167-
PAIMON_RETURN_NOT_OK(PredicateUtils::GetAllNames(predicate, &field_names_in_predicate));
168-
std::set<std::string> excluded_field_names;
169-
for (const auto& field_name : field_names_in_predicate) {
170-
auto table_iter = name_to_table_fields.find(field_name);
171-
if (table_iter == name_to_table_fields.end()) {
172-
return Status::Invalid(
173-
fmt::format("field {} in predicate is not included in table schema", field_name));
174-
}
175-
auto data_iter = id_to_data_fields.find(table_iter->second.Id());
176-
if (data_iter != id_to_data_fields.end()) {
177-
// TODO(liancheng.lsz): isnull/notnull predicates trimming might not be required
178-
if (!data_iter->second.second.Type()->Equals(table_iter->second.Type())) {
179-
excluded_field_names.insert(field_name);
180-
}
181-
}
182-
}
183-
return PredicateUtils::ExcludePredicateWithFields(predicate, excluded_field_names);
184-
}
185-
186160
} // namespace paimon

src/paimon/core/operation/append_only_file_store_scan.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,6 @@ class AppendOnlyFileStoreScan : public FileStoreScan {
6161
Result<bool> FilterByStats(const ManifestEntry& entry) const override;
6262

6363
private:
64-
// TODO(liancheng.lsz): to be moved in class FileStoreScan
65-
static Result<std::shared_ptr<Predicate>> ReconstructPredicateWithNonCastedFields(
66-
const std::shared_ptr<Predicate>& predicate,
67-
const std::shared_ptr<SimpleStatsEvolution>& evolution);
68-
6964
Result<bool> TestFileIndex(const std::shared_ptr<DataFileMeta>& meta,
7065
const std::shared_ptr<SimpleStatsEvolution>& evolution,
7166
const std::shared_ptr<TableSchema>& data_schema) const;

src/paimon/core/operation/file_store_scan.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <future>
2121
#include <list>
2222
#include <numeric>
23+
#include <set>
2324
#include <unordered_map>
2425
#include <unordered_set>
2526

@@ -28,6 +29,7 @@
2829
#include "paimon/common/data/binary_array.h"
2930
#include "paimon/common/executor/future.h"
3031
#include "paimon/common/predicate/literal_converter.h"
32+
#include "paimon/common/predicate/predicate_utils.h"
3133
#include "paimon/common/types/data_field.h"
3234
#include "paimon/common/utils/field_type_utils.h"
3335
#include "paimon/core/io/data_file_meta.h"
@@ -39,6 +41,7 @@
3941
#include "paimon/core/operation/metrics/scan_metrics.h"
4042
#include "paimon/core/partition/partition_info.h"
4143
#include "paimon/core/stats/simple_stats.h"
44+
#include "paimon/core/stats/simple_stats_evolution.h"
4245
#include "paimon/core/utils/duration.h"
4346
#include "paimon/core/utils/field_mapping.h"
4447
#include "paimon/core/utils/snapshot_manager.h"
@@ -50,6 +53,32 @@
5053
namespace paimon {
5154
enum class FieldType;
5255

56+
Result<std::shared_ptr<Predicate>> FileStoreScan::ReconstructPredicateWithNonCastedFields(
57+
const std::shared_ptr<Predicate>& predicate,
58+
const std::shared_ptr<SimpleStatsEvolution>& evolution) {
59+
const auto& id_to_data_fields = evolution->GetFieldIdToDataField();
60+
const auto& name_to_table_fields = evolution->GetFieldNameToTableField();
61+
62+
std::set<std::string> field_names_in_predicate;
63+
PAIMON_RETURN_NOT_OK(PredicateUtils::GetAllNames(predicate, &field_names_in_predicate));
64+
std::set<std::string> excluded_field_names;
65+
for (const auto& field_name : field_names_in_predicate) {
66+
auto table_iter = name_to_table_fields.find(field_name);
67+
if (table_iter == name_to_table_fields.end()) {
68+
return Status::Invalid(
69+
fmt::format("field {} in predicate is not included in table schema", field_name));
70+
}
71+
auto data_iter = id_to_data_fields.find(table_iter->second.Id());
72+
if (data_iter != id_to_data_fields.end()) {
73+
// Exclude fields requiring casting to avoid false negatives in stats filtering.
74+
if (!data_iter->second.second.Type()->Equals(table_iter->second.Type())) {
75+
excluded_field_names.insert(field_name);
76+
}
77+
}
78+
}
79+
return PredicateUtils::ExcludePredicateWithFields(predicate, excluded_field_names);
80+
}
81+
5382
std::vector<ManifestEntry> FileStoreScan::RawPlan::Files(const FileKind& kind) {
5483
std::vector<ManifestEntry> entries = Files();
5584
std::vector<ManifestEntry> filtered_entries;

src/paimon/core/operation/file_store_scan.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@ class ManifestFile;
6262
class ManifestFileMeta;
6363
class ManifestList;
6464
class MemoryPool;
65+
class Predicate;
6566
class ScanFilter;
6667
class SchemaManager;
68+
class SimpleStatsEvolution;
6769
class SnapshotManager;
6870
class TableSchema;
6971

@@ -214,6 +216,12 @@ class FileStoreScan {
214216
const std::shared_ptr<arrow::Schema>& arrow_schema,
215217
const std::shared_ptr<ScanFilter>& scan_filters);
216218

219+
// When schema evolves, predicates might contain fields requiring casting. To avoid false
220+
// negatives when filtering by stats, we exclude those fields from predicate.
221+
static Result<std::shared_ptr<Predicate>> ReconstructPredicateWithNonCastedFields(
222+
const std::shared_ptr<Predicate>& predicate,
223+
const std::shared_ptr<SimpleStatsEvolution>& evolution);
224+
217225
private:
218226
Status ReadManifests(std::optional<Snapshot>* snapshot_ptr,
219227
std::vector<ManifestFileMeta>* all_manifests_ptr,

src/paimon/core/operation/key_value_file_store_scan.cpp

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
#include "paimon/core/operation/key_value_file_store_scan.h"
1818

1919
#include <cstdint>
20+
#include <exception>
2021
#include <map>
2122
#include <optional>
2223
#include <set>
2324
#include <utility>
2425

26+
#include "fmt/format.h"
2527
#include "paimon/common/data/binary_array.h"
2628
#include "paimon/common/data/binary_row.h"
2729
#include "paimon/common/predicate/predicate_filter.h"
@@ -32,6 +34,8 @@
3234
#include "paimon/core/options/merge_engine.h"
3335
#include "paimon/core/schema/table_schema.h"
3436
#include "paimon/core/stats/simple_stats.h"
37+
#include "paimon/core/stats/simple_stats_evolution.h"
38+
#include "paimon/core/stats/simple_stats_evolutions.h"
3539
#include "paimon/predicate/predicate.h"
3640

3741
namespace arrow {
@@ -44,7 +48,6 @@ class ManifestFile;
4448
class ManifestList;
4549
class MemoryPool;
4650
class ScanFilter;
47-
class SchemaManager;
4851
class SnapshotManager;
4952

5053
Result<std::unique_ptr<KeyValueFileStoreScan>> KeyValueFileStoreScan::Create(
@@ -154,15 +157,41 @@ Result<bool> KeyValueFileStoreScan::IsValueFilterEnabled() const {
154157
}
155158

156159
Result<bool> KeyValueFileStoreScan::FilterByValueFilter(const ManifestEntry& entry) const {
157-
if (entry.File()->value_stats_cols != std::nullopt) {
158-
return Status::NotImplemented("do not support value stats cols in DataFileMeta");
160+
if (!value_filter_) {
161+
return true;
159162
}
160163
if (entry.File()->embedded_index != nullptr) {
161164
return Status::NotImplemented("do not support embedded index in DataFileMeta");
162165
}
163-
const auto& stats = entry.File()->value_stats;
164-
return value_filter_->Test(schema_, entry.File()->row_count, stats.MinValues(),
165-
stats.MaxValues(), stats.NullCounts());
166+
167+
const auto& meta = entry.File();
168+
169+
// Primary key table currently does not support schema evolution for value filtering.
170+
// Here we only handle `value_stats_cols` (dense stats) projection.
171+
if (meta->schema_id != table_schema_->Id()) {
172+
return Status::NotImplemented(
173+
"Primary key table does not support schema evolution in FilterByValueFilter");
174+
}
175+
176+
auto evolution = evolutions_->GetOrCreate(table_schema_);
177+
178+
PAIMON_ASSIGN_OR_RAISE(
179+
SimpleStatsEvolution::EvolutionStats new_stats,
180+
evolution->Evolution(meta->value_stats, meta->row_count, meta->value_stats_cols));
181+
182+
try {
183+
PAIMON_ASSIGN_OR_RAISE(
184+
bool predicate_result,
185+
value_filter_->Test(schema_, meta->row_count, *(new_stats.min_values),
186+
*(new_stats.max_values), *(new_stats.null_counts)));
187+
return predicate_result;
188+
} catch (const std::exception& e) {
189+
return Status::Invalid(fmt::format("FilterByValueFilter failed for file {}, with {} error",
190+
meta->file_name, e.what()));
191+
} catch (...) {
192+
return Status::Invalid(fmt::format(
193+
"FilterByValueFilter failed for file {}, with unknown error", meta->file_name));
194+
}
166195
}
167196

168197
bool KeyValueFileStoreScan::NoOverlapping(const std::vector<ManifestEntry>& entries) {
@@ -212,4 +241,17 @@ Result<std::vector<ManifestEntry>> KeyValueFileStoreScan::FilterWholeBucketAllFi
212241
return std::vector<ManifestEntry>();
213242
}
214243

244+
KeyValueFileStoreScan::KeyValueFileStoreScan(
245+
const std::shared_ptr<SnapshotManager>& snapshot_manager,
246+
const std::shared_ptr<SchemaManager>& schema_manager,
247+
const std::shared_ptr<ManifestList>& manifest_list,
248+
const std::shared_ptr<ManifestFile>& manifest_file,
249+
const std::shared_ptr<TableSchema>& table_schema, const std::shared_ptr<arrow::Schema>& schema,
250+
const CoreOptions& core_options, const std::shared_ptr<Executor>& executor,
251+
const std::shared_ptr<MemoryPool>& pool)
252+
: FileStoreScan(snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema,
253+
schema, core_options, executor, pool) {
254+
evolutions_ = std::make_shared<SimpleStatsEvolutions>(table_schema, pool);
255+
}
256+
215257
} // namespace paimon

src/paimon/core/operation/key_value_file_store_scan.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include <string>
2121
#include <vector>
2222

23-
#include "paimon/common/predicate/predicate_utils.h"
2423
#include "paimon/core/manifest/manifest_entry.h"
2524
#include "paimon/core/operation/file_store_scan.h"
2625
#include "paimon/core/table/source/scan_mode.h"
@@ -40,6 +39,7 @@ class MemoryPool;
4039
class PredicateFilter;
4140
class ScanFilter;
4241
class SchemaManager;
42+
class SimpleStatsEvolutions;
4343
class SnapshotManager;
4444
class TableSchema;
4545

@@ -105,13 +105,12 @@ class KeyValueFileStoreScan : public FileStoreScan {
105105
const std::shared_ptr<arrow::Schema>& schema,
106106
const CoreOptions& core_options,
107107
const std::shared_ptr<Executor>& executor,
108-
const std::shared_ptr<MemoryPool>& pool)
109-
: FileStoreScan(snapshot_manager, schema_manager, manifest_list, manifest_file,
110-
table_schema, schema, core_options, executor, pool) {}
108+
const std::shared_ptr<MemoryPool>& pool);
111109

112110
private:
113111
bool value_filter_force_enabled_ = false;
114112
std::shared_ptr<PredicateFilter> key_filter_;
115113
std::shared_ptr<PredicateFilter> value_filter_;
114+
std::shared_ptr<SimpleStatsEvolutions> evolutions_;
116115
};
117116
} // namespace paimon

src/paimon/core/operation/key_value_file_store_scan_test.cpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include "paimon/predicate/literal.h"
4848
#include "paimon/predicate/predicate_builder.h"
4949
#include "paimon/scan_context.h"
50+
#include "paimon/testing/utils/binary_row_generator.h"
5051
#include "paimon/testing/utils/testharness.h"
5152

5253
namespace arrow {
@@ -323,4 +324,81 @@ TEST_F(KeyValueFileStoreScanTest, TestNoOverlapping) {
323324
ASSERT_FALSE(KeyValueFileStoreScan::NoOverlapping(generate_manifest_entries({0, 1, 1})));
324325
ASSERT_FALSE(KeyValueFileStoreScan::NoOverlapping(generate_manifest_entries({2, 1, 1})));
325326
}
327+
328+
TEST_F(KeyValueFileStoreScanTest, TestFilterByValueFilterWithValueStatsCols) {
329+
std::string table_path =
330+
paimon::test::GetDataDir() + "orc/pk_table_with_mor.db/pk_table_with_mor";
331+
std::vector<std::map<std::string, std::string>> partition_filters = {};
332+
333+
// `v0` is at index 6 in schema-0 of pk_table_with_mor.
334+
auto greater_than = PredicateBuilder::GreaterThan(/*field_index=*/6, /*field_name=*/"v0",
335+
FieldType::DOUBLE, Literal(30.1));
336+
auto scan_filter = std::make_shared<ScanFilter>(/*predicate=*/greater_than,
337+
/*partition_filters=*/partition_filters,
338+
/*bucket_filter=*/0,
339+
/*vector_search=*/nullptr);
340+
ASSERT_OK_AND_ASSIGN(std::unique_ptr<KeyValueFileStoreScan> scan,
341+
CreateFileStoreScan(table_path, scan_filter,
342+
/*table_schema_id=*/0, /*snapshot_id=*/1));
343+
scan->EnableValueFilter();
344+
345+
// Build dense stats for only one column `v0`.
346+
auto pool = GetDefaultPool();
347+
SimpleStats value_stats = BinaryRowGenerator::GenerateStats(
348+
/*min=*/{10.0}, /*max=*/{20.0}, /*null=*/{0}, pool.get());
349+
std::vector<std::string> value_stats_cols = {"v0"};
350+
ManifestEntry entry(
351+
/*kind=*/FileKind::Add(), /*partition=*/BinaryRow::EmptyRow(), /*bucket=*/0,
352+
/*total_buckets=*/1,
353+
std::make_shared<DataFileMeta>(
354+
/*file_name=*/"name", /*file_size=*/1024, /*row_count=*/10,
355+
/*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(),
356+
/*key_stats=*/SimpleStats::EmptyStats(),
357+
/*value_stats=*/value_stats,
358+
/*min_sequence_number=*/0,
359+
/*max_sequence_number=*/10,
360+
/*schema_id=*/0,
361+
/*level=*/1,
362+
/*extra_files=*/std::vector<std::optional<std::string>>(),
363+
/*creation_time=*/Timestamp(0, 0),
364+
/*delete_row_count=*/std::nullopt,
365+
/*embedded_index=*/nullptr,
366+
/*file_source=*/FileSource::Append(),
367+
/*value_stats_cols=*/value_stats_cols,
368+
/*external_path=*/std::nullopt,
369+
/*first_row_id=*/std::nullopt,
370+
/*write_cols=*/std::nullopt));
371+
372+
// max(v0)=50 > 30.1, should be kept.
373+
SimpleStats value_stats_keep = BinaryRowGenerator::GenerateStats(
374+
/*min=*/{40.0}, /*max=*/{50.0}, /*null=*/{0}, pool.get());
375+
ManifestEntry entry_keep(
376+
/*kind=*/FileKind::Add(), /*partition=*/BinaryRow::EmptyRow(), /*bucket=*/0,
377+
/*total_buckets=*/1,
378+
std::make_shared<DataFileMeta>(
379+
/*file_name=*/"name_keep", /*file_size=*/1024, /*row_count=*/10,
380+
/*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(),
381+
/*key_stats=*/SimpleStats::EmptyStats(),
382+
/*value_stats=*/value_stats_keep,
383+
/*min_sequence_number=*/0,
384+
/*max_sequence_number=*/10,
385+
/*schema_id=*/0,
386+
/*level=*/1,
387+
/*extra_files=*/std::vector<std::optional<std::string>>(),
388+
/*creation_time=*/Timestamp(0, 0),
389+
/*delete_row_count=*/std::nullopt,
390+
/*embedded_index=*/nullptr,
391+
/*file_source=*/FileSource::Append(),
392+
/*value_stats_cols=*/value_stats_cols,
393+
/*external_path=*/std::nullopt,
394+
/*first_row_id=*/std::nullopt,
395+
/*write_cols=*/std::nullopt));
396+
397+
// max(v0)=20 <= 30.1, should be filtered out.
398+
ASSERT_OK_AND_ASSIGN(bool keep, scan->FilterByStats(entry));
399+
ASSERT_FALSE(keep);
400+
401+
ASSERT_OK_AND_ASSIGN(keep, scan->FilterByStats(entry_keep));
402+
ASSERT_TRUE(keep);
403+
}
326404
} // namespace paimon::test

0 commit comments

Comments
 (0)