Skip to content

Commit 9507d49

Browse files
Update vendored DuckDB sources to d392e43
1 parent d392e43 commit 9507d49

File tree

12 files changed

+159
-68
lines changed

12 files changed

+159
-68
lines changed

src/duckdb/extension/parquet/include/parquet_statistics.hpp

+3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ struct ParquetStatisticsUtils {
3636
static bool BloomFilterExcludes(const TableFilter &filter, const duckdb_parquet::ColumnMetaData &column_meta_data,
3737
duckdb_apache::thrift::protocol::TProtocol &file_proto, Allocator &allocator);
3838

39+
static unique_ptr<BaseStatistics> CreateNumericStats(const LogicalType &type, const ParquetColumnSchema &schema_ele,
40+
const duckdb_parquet::Statistics &parquet_stats);
41+
3942
private:
4043
static Value ConvertValueInternal(const LogicalType &type, const ParquetColumnSchema &schema_ele,
4144
const std::string &stats);

src/duckdb/extension/parquet/parquet_reader.cpp

+69-30
Original file line numberDiff line numberDiff line change
@@ -859,18 +859,62 @@ idx_t ParquetReader::GetGroupOffset(ParquetReaderScanState &state) {
859859

860860
static FilterPropagateResult CheckParquetStringFilter(BaseStatistics &stats, const Statistics &pq_col_stats,
861861
TableFilter &filter) {
862-
if (filter.filter_type == TableFilterType::CONSTANT_COMPARISON) {
862+
switch (filter.filter_type) {
863+
case TableFilterType::CONJUNCTION_AND: {
864+
auto &conjunction_filter = filter.Cast<ConjunctionAndFilter>();
865+
auto and_result = FilterPropagateResult::FILTER_ALWAYS_TRUE;
866+
for (auto &child_filter : conjunction_filter.child_filters) {
867+
auto child_prune_result = CheckParquetStringFilter(stats, pq_col_stats, *child_filter);
868+
if (child_prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
869+
return FilterPropagateResult::FILTER_ALWAYS_FALSE;
870+
}
871+
if (child_prune_result != and_result) {
872+
and_result = FilterPropagateResult::NO_PRUNING_POSSIBLE;
873+
}
874+
}
875+
return and_result;
876+
}
877+
case TableFilterType::CONSTANT_COMPARISON: {
863878
auto &constant_filter = filter.Cast<ConstantFilter>();
864879
auto &min_value = pq_col_stats.min_value;
865880
auto &max_value = pq_col_stats.max_value;
866881
return StringStats::CheckZonemap(const_data_ptr_cast(min_value.c_str()), min_value.size(),
867882
const_data_ptr_cast(max_value.c_str()), max_value.size(),
868883
constant_filter.comparison_type, StringValue::Get(constant_filter.constant));
869-
} else {
884+
}
885+
default:
870886
return filter.CheckStatistics(stats);
871887
}
872888
}
873889

890+
static FilterPropagateResult CheckParquetFloatFilter(ColumnReader &reader, const Statistics &pq_col_stats,
891+
TableFilter &filter) {
892+
// floating point values can have values in the [min, max] domain AND nan values
893+
// check both stats against the filter
894+
auto &type = reader.Type();
895+
auto nan_stats = NumericStats::CreateUnknown(type);
896+
auto nan_value = Value("nan").DefaultCastAs(type);
897+
NumericStats::SetMin(nan_stats, nan_value);
898+
NumericStats::SetMax(nan_stats, nan_value);
899+
auto nan_prune = filter.CheckStatistics(nan_stats);
900+
901+
auto min_max_stats = ParquetStatisticsUtils::CreateNumericStats(reader.Type(), reader.Schema(), pq_col_stats);
902+
auto prune = filter.CheckStatistics(*min_max_stats);
903+
904+
// if EITHER of them cannot be pruned - we cannot prune
905+
if (prune == FilterPropagateResult::NO_PRUNING_POSSIBLE ||
906+
nan_prune == FilterPropagateResult::NO_PRUNING_POSSIBLE) {
907+
return FilterPropagateResult::NO_PRUNING_POSSIBLE;
908+
}
909+
// if both are the same we can return that value
910+
if (prune == nan_prune) {
911+
return prune;
912+
}
913+
// if they are different we need to return that we cannot prune
914+
// e.g. prune = always false, nan_prune = always true -> we don't know
915+
return FilterPropagateResult::NO_PRUNING_POSSIBLE;
916+
}
917+
874918
void ParquetReader::PrepareRowGroupBuffer(ParquetReaderScanState &state, idx_t i) {
875919
auto &group = GetGroup(state);
876920
auto col_idx = MultiFileLocalIndex(i);
@@ -889,43 +933,38 @@ void ParquetReader::PrepareRowGroupBuffer(ParquetReaderScanState &state, idx_t i
889933
// check the bloom filter if present
890934
bool is_generated_column = column_reader.ColumnIndex() >= group.columns.size();
891935
bool is_expression = column_reader.Schema().schema_type == ::duckdb::ParquetColumnSchemaType::EXPRESSION;
936+
bool has_min_max = false;
937+
if (!is_generated_column) {
938+
has_min_max = group.columns[column_reader.ColumnIndex()].meta_data.statistics.__isset.min_value &&
939+
group.columns[column_reader.ColumnIndex()].meta_data.statistics.__isset.max_value;
940+
}
892941
if (is_expression) {
893942
// no pruning possible for expressions
894943
prune_result = FilterPropagateResult::NO_PRUNING_POSSIBLE;
895-
} else if (!column_reader.Type().IsNested() && !is_generated_column &&
896-
ParquetStatisticsUtils::BloomFilterSupported(column_reader.Type().id()) &&
897-
ParquetStatisticsUtils::BloomFilterExcludes(filter,
898-
group.columns[column_reader.ColumnIndex()].meta_data,
899-
*state.thrift_file_proto, allocator)) {
900-
prune_result = FilterPropagateResult::FILTER_ALWAYS_FALSE;
901-
} else if (column_reader.Type().id() == LogicalTypeId::VARCHAR && !is_generated_column &&
902-
group.columns[column_reader.ColumnIndex()].meta_data.statistics.__isset.min_value &&
903-
group.columns[column_reader.ColumnIndex()].meta_data.statistics.__isset.max_value) {
904-
944+
} else if (!is_generated_column && has_min_max && column_reader.Type().id() == LogicalTypeId::VARCHAR) {
905945
// our StringStats only store the first 8 bytes of strings (even if Parquet has longer string stats)
906946
// however, when reading remote Parquet files, skipping row groups is really important
907947
// here, we implement a special case to check the full length for string filters
908-
if (filter.filter_type == TableFilterType::CONJUNCTION_AND) {
909-
const auto &and_filter = filter.Cast<ConjunctionAndFilter>();
910-
auto and_result = FilterPropagateResult::FILTER_ALWAYS_TRUE;
911-
for (auto &child_filter : and_filter.child_filters) {
912-
auto child_prune_result = CheckParquetStringFilter(
913-
*stats, group.columns[column_reader.ColumnIndex()].meta_data.statistics, *child_filter);
914-
if (child_prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
915-
and_result = FilterPropagateResult::FILTER_ALWAYS_FALSE;
916-
break;
917-
} else if (child_prune_result != and_result) {
918-
and_result = FilterPropagateResult::NO_PRUNING_POSSIBLE;
919-
}
920-
}
921-
prune_result = and_result;
922-
} else {
923-
prune_result = CheckParquetStringFilter(
924-
*stats, group.columns[column_reader.ColumnIndex()].meta_data.statistics, filter);
925-
}
948+
prune_result = CheckParquetStringFilter(
949+
*stats, group.columns[column_reader.ColumnIndex()].meta_data.statistics, filter);
950+
} else if (!is_generated_column && has_min_max &&
951+
(column_reader.Type().id() == LogicalTypeId::FLOAT ||
952+
column_reader.Type().id() == LogicalTypeId::DOUBLE)) {
953+
// floating point columns can have NaN values in addition to the min/max bounds defined in the file
954+
// in order to do optimal pruning - we prune based on the [min, max] of the file followed by pruning
955+
// based on nan
956+
prune_result = CheckParquetFloatFilter(
957+
column_reader, group.columns[column_reader.ColumnIndex()].meta_data.statistics, filter);
926958
} else {
927959
prune_result = filter.CheckStatistics(*stats);
928960
}
961+
if (prune_result == FilterPropagateResult::NO_PRUNING_POSSIBLE && !column_reader.Type().IsNested() &&
962+
!is_generated_column && ParquetStatisticsUtils::BloomFilterSupported(column_reader.Type().id()) &&
963+
ParquetStatisticsUtils::BloomFilterExcludes(filter,
964+
group.columns[column_reader.ColumnIndex()].meta_data,
965+
*state.thrift_file_proto, allocator)) {
966+
prune_result = FilterPropagateResult::FILTER_ALWAYS_FALSE;
967+
}
929968

930969
if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
931970
// this effectively will skip this chunk

src/duckdb/extension/parquet/parquet_statistics.cpp

+28-4
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ namespace duckdb {
2323
using duckdb_parquet::ConvertedType;
2424
using duckdb_parquet::Type;
2525

26-
static unique_ptr<BaseStatistics> CreateNumericStats(const LogicalType &type, const ParquetColumnSchema &schema_ele,
27-
const duckdb_parquet::Statistics &parquet_stats) {
26+
unique_ptr<BaseStatistics> ParquetStatisticsUtils::CreateNumericStats(const LogicalType &type,
27+
const ParquetColumnSchema &schema_ele,
28+
const duckdb_parquet::Statistics &parquet_stats) {
2829
auto stats = NumericStats::CreateUnknown(type);
2930

3031
// for reasons unknown to science, Parquet defines *both* `min` and `min_value` as well as `max` and
@@ -50,6 +51,27 @@ static unique_ptr<BaseStatistics> CreateNumericStats(const LogicalType &type, co
5051
return stats.ToUnique();
5152
}
5253

54+
static unique_ptr<BaseStatistics> CreateFloatingPointStats(const LogicalType &type,
55+
const ParquetColumnSchema &schema_ele,
56+
const duckdb_parquet::Statistics &parquet_stats) {
57+
auto stats = NumericStats::CreateUnknown(type);
58+
59+
// floating point values can always have NaN values - hence we cannot use the max value from the file
60+
Value min;
61+
Value max;
62+
if (parquet_stats.__isset.min_value) {
63+
min = ParquetStatisticsUtils::ConvertValue(type, schema_ele, parquet_stats.min_value);
64+
} else if (parquet_stats.__isset.min) {
65+
min = ParquetStatisticsUtils::ConvertValue(type, schema_ele, parquet_stats.min);
66+
} else {
67+
min = Value(type);
68+
}
69+
max = Value("nan").DefaultCastAs(type);
70+
NumericStats::SetMin(stats, min);
71+
NumericStats::SetMax(stats, max);
72+
return stats.ToUnique();
73+
}
74+
5375
Value ParquetStatisticsUtils::ConvertValue(const LogicalType &type, const ParquetColumnSchema &schema_ele,
5476
const std::string &stats) {
5577
Value result;
@@ -328,8 +350,6 @@ unique_ptr<BaseStatistics> ParquetStatisticsUtils::TransformColumnStatistics(con
328350
case LogicalTypeId::SMALLINT:
329351
case LogicalTypeId::INTEGER:
330352
case LogicalTypeId::BIGINT:
331-
case LogicalTypeId::FLOAT:
332-
case LogicalTypeId::DOUBLE:
333353
case LogicalTypeId::DATE:
334354
case LogicalTypeId::TIME:
335355
case LogicalTypeId::TIME_TZ:
@@ -341,6 +361,10 @@ unique_ptr<BaseStatistics> ParquetStatisticsUtils::TransformColumnStatistics(con
341361
case LogicalTypeId::DECIMAL:
342362
row_group_stats = CreateNumericStats(type, schema, parquet_stats);
343363
break;
364+
case LogicalTypeId::FLOAT:
365+
case LogicalTypeId::DOUBLE:
366+
row_group_stats = CreateFloatingPointStats(type, schema, parquet_stats);
367+
break;
344368
case LogicalTypeId::VARCHAR: {
345369
auto string_stats = StringStats::CreateEmpty(type);
346370
if (parquet_stats.__isset.min_value) {

src/duckdb/src/common/multi_file/multi_file_column_mapper.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,7 @@ static bool EvaluateFilterAgainstConstant(TableFilter &filter, const Value &cons
583583
//! No filter_data assigned (does this mean the DynamicFilter is broken??)
584584
return true;
585585
}
586+
lock_guard<mutex> lock(dynamic_filter.filter_data->lock);
586587
if (!dynamic_filter.filter_data->initialized) {
587588
//! Not initialized
588589
return true;
@@ -591,7 +592,6 @@ static bool EvaluateFilterAgainstConstant(TableFilter &filter, const Value &cons
591592
//! No filter present
592593
return true;
593594
}
594-
lock_guard<mutex> lock(dynamic_filter.filter_data->lock);
595595
return EvaluateFilterAgainstConstant(*dynamic_filter.filter_data->filter, constant);
596596
}
597597
default:

src/duckdb/src/execution/physical_plan_generator.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ PhysicalOperator &PhysicalPlanGenerator::ResolveAndPlan(unique_ptr<LogicalOperat
5050

5151
unique_ptr<PhysicalPlan> PhysicalPlanGenerator::PlanInternal(LogicalOperator &op) {
5252
if (!physical_plan) {
53-
physical_plan = make_uniq<PhysicalPlan>();
53+
physical_plan = make_uniq<PhysicalPlan>(Allocator::Get(context));
5454
}
5555
op.estimated_cardinality = op.EstimateCardinality(context);
5656
physical_plan->SetRoot(CreatePlan(op));

src/duckdb/src/function/table/version/pragma_version.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#ifndef DUCKDB_PATCH_VERSION
2-
#define DUCKDB_PATCH_VERSION "0-dev2171"
2+
#define DUCKDB_PATCH_VERSION "0-dev2195"
33
#endif
44
#ifndef DUCKDB_MINOR_VERSION
55
#define DUCKDB_MINOR_VERSION 3
@@ -8,10 +8,10 @@
88
#define DUCKDB_MAJOR_VERSION 1
99
#endif
1010
#ifndef DUCKDB_VERSION
11-
#define DUCKDB_VERSION "v1.3.0-dev2171"
11+
#define DUCKDB_VERSION "v1.3.0-dev2195"
1212
#endif
1313
#ifndef DUCKDB_SOURCE_ID
14-
#define DUCKDB_SOURCE_ID "422538e013"
14+
#define DUCKDB_SOURCE_ID "cf02bffeb0"
1515
#endif
1616
#include "duckdb/function/table/system_functions.hpp"
1717
#include "duckdb/main/database.hpp"

src/duckdb/src/include/duckdb/execution/physical_plan_generator.hpp

+20-7
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,25 @@ class ClientContext;
2222
class ColumnDataCollection;
2323

2424
class PhysicalPlan {
25+
public:
26+
explicit PhysicalPlan(Allocator &allocator) : arena(allocator) {};
27+
28+
~PhysicalPlan() {
29+
// Call the destructor of each physical operator.
30+
for (auto &op : ops) {
31+
auto &op_ref = op.get();
32+
op_ref.~PhysicalOperator();
33+
}
34+
}
35+
2536
public:
2637
template <class T, class... ARGS>
2738
PhysicalOperator &Make(ARGS &&... args) {
28-
auto op = make_uniq_base<PhysicalOperator, T>(std::forward<ARGS>(args)...);
29-
D_ASSERT(op);
30-
auto &op_ref = *op;
31-
ops.push_back(std::move(op));
32-
return op_ref;
39+
static_assert(std::is_base_of<PhysicalOperator, T>::value, "T must be a physical operator");
40+
auto mem = arena.AllocateAligned(sizeof(T));
41+
auto ptr = new (mem) T(std::forward<ARGS>(args)...);
42+
ops.push_back(*ptr);
43+
return *ptr;
3344
}
3445

3546
PhysicalOperator &Root() {
@@ -41,8 +52,10 @@ class PhysicalPlan {
4152
}
4253

4354
private:
44-
//! Contains the memory of the physical plan.
45-
vector<unique_ptr<PhysicalOperator>> ops;
55+
//! The arena allocator storing the physical operator memory.
56+
ArenaAllocator arena;
57+
//! References to the physical operators.
58+
vector<reference<PhysicalOperator>> ops;
4659
//! The root of the physical plan.
4760
optional_ptr<PhysicalOperator> root;
4861
};

src/duckdb/src/include/duckdb/storage/checkpoint/string_checkpoint_state.hpp

+5-11
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ struct UncompressedStringSegmentState : public CompressedSegmentState {
4343
unordered_map<block_id_t, reference<StringBlock>> overflow_blocks;
4444
//! Overflow string writer (if any), if not set overflow strings will be written to memory blocks
4545
unique_ptr<OverflowStringWriter> overflow_writer;
46+
//! The block manager with which to write
47+
optional_ptr<BlockManager> block_manager;
4648
//! The set of overflow blocks written to disk (if any)
4749
vector<block_id_t> on_disk_blocks;
4850

@@ -51,18 +53,10 @@ struct UncompressedStringSegmentState : public CompressedSegmentState {
5153

5254
void RegisterBlock(BlockManager &manager, block_id_t block_id);
5355

54-
string GetSegmentInfo() const override {
55-
if (on_disk_blocks.empty()) {
56-
return "";
57-
}
58-
string result = StringUtil::Join(on_disk_blocks, on_disk_blocks.size(), ", ",
59-
[&](block_id_t block) { return to_string(block); });
60-
return "Overflow String Block Ids: " + result;
61-
}
56+
string GetSegmentInfo() const override;
6257

63-
vector<block_id_t> GetAdditionalBlocks() const override {
64-
return on_disk_blocks;
65-
}
58+
vector<block_id_t> GetAdditionalBlocks() const override;
59+
void Cleanup(BlockManager &manager);
6660

6761
private:
6862
mutex block_lock;

src/duckdb/src/storage/checkpoint/write_overflow_strings_to_disk.cpp

+25-2
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,52 @@ WriteOverflowStringsToDisk::~WriteOverflowStringsToDisk() {
1414
D_ASSERT(Exception::UncaughtException() || offset == 0);
1515
}
1616

17-
shared_ptr<BlockHandle> UncompressedStringSegmentState::GetHandle(BlockManager &manager, block_id_t block_id) {
17+
shared_ptr<BlockHandle> UncompressedStringSegmentState::GetHandle(BlockManager &manager_p, block_id_t block_id) {
1818
lock_guard<mutex> lock(block_lock);
1919
auto entry = handles.find(block_id);
2020
if (entry != handles.end()) {
2121
return entry->second;
2222
}
23+
auto &manager = block_manager ? *block_manager : manager_p;
2324
auto result = manager.RegisterBlock(block_id);
2425
handles.insert(make_pair(block_id, result));
2526
return result;
2627
}
2728

28-
void UncompressedStringSegmentState::RegisterBlock(BlockManager &manager, block_id_t block_id) {
29+
void UncompressedStringSegmentState::RegisterBlock(BlockManager &manager_p, block_id_t block_id) {
2930
lock_guard<mutex> lock(block_lock);
3031
auto entry = handles.find(block_id);
3132
if (entry != handles.end()) {
3233
throw InternalException("UncompressedStringSegmentState::RegisterBlock - block id %llu already exists",
3334
block_id);
3435
}
36+
auto &manager = block_manager ? *block_manager : manager_p;
3537
auto result = manager.RegisterBlock(block_id);
3638
handles.insert(make_pair(block_id, std::move(result)));
3739
on_disk_blocks.push_back(block_id);
3840
}
3941

42+
string UncompressedStringSegmentState::GetSegmentInfo() const {
43+
if (on_disk_blocks.empty()) {
44+
return "";
45+
}
46+
string result = StringUtil::Join(on_disk_blocks, on_disk_blocks.size(), ", ",
47+
[&](block_id_t block) { return to_string(block); });
48+
return "Overflow String Block Ids: " + result;
49+
}
50+
51+
vector<block_id_t> UncompressedStringSegmentState::GetAdditionalBlocks() const {
52+
return on_disk_blocks;
53+
}
54+
55+
void UncompressedStringSegmentState::Cleanup(BlockManager &manager_p) {
56+
auto &manager = block_manager ? *block_manager : manager_p;
57+
for (auto &block_id : on_disk_blocks) {
58+
manager.MarkBlockAsModified(block_id);
59+
}
60+
on_disk_blocks.clear();
61+
}
62+
4063
void WriteOverflowStringsToDisk::WriteString(UncompressedStringSegmentState &state, string_t string,
4164
block_id_t &result_block, int32_t &result_offset) {
4265
auto &block_manager = partial_block_manager.GetBlockManager();

src/duckdb/src/storage/compression/fixed_size_uncompressed.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@ void UncompressedCompressState::CreateEmptySegment(idx_t row_start) {
7272
ColumnSegment::CreateTransientSegment(db, function, type, row_start, info.GetBlockSize(), info.GetBlockSize());
7373
if (type.InternalType() == PhysicalType::VARCHAR) {
7474
auto &state = compressed_segment->GetSegmentState()->Cast<UncompressedStringSegmentState>();
75-
state.overflow_writer =
76-
make_uniq<WriteOverflowStringsToDisk>(checkpoint_data.GetCheckpointState().GetPartialBlockManager());
75+
auto &partial_block_manager = checkpoint_data.GetCheckpointState().GetPartialBlockManager();
76+
state.block_manager = partial_block_manager.GetBlockManager();
77+
state.overflow_writer = make_uniq<WriteOverflowStringsToDisk>(partial_block_manager);
7778
}
7879
current_segment = std::move(compressed_segment);
7980
current_segment->InitializeAppend(append_state);

src/duckdb/src/storage/compression/string_uncompressed.cpp

+1-3
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,7 @@ unique_ptr<ColumnSegmentState> UncompressedStringStorage::DeserializeState(Deser
260260
void UncompressedStringStorage::CleanupState(ColumnSegment &segment) {
261261
auto &state = segment.GetSegmentState()->Cast<UncompressedStringSegmentState>();
262262
auto &block_manager = segment.GetBlockManager();
263-
for (auto &block_id : state.on_disk_blocks) {
264-
block_manager.MarkBlockAsModified(block_id);
265-
}
263+
state.Cleanup(block_manager);
266264
}
267265

268266
//===--------------------------------------------------------------------===//

0 commit comments

Comments
 (0)