Skip to content

Commit 971fd3f

Browse files
Update vendored DuckDB sources to 3f01673
1 parent 3f01673 commit 971fd3f

39 files changed

+533
-388
lines changed

src/duckdb/extension/parquet/parquet_reader.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,8 @@ ParquetColumnSchema ParquetReader::ParseSchemaRecursive(idx_t depth, idx_t max_d
558558
} else { // leaf node
559559
if (!s_ele.__isset.type) {
560560
throw InvalidInputException(
561-
"Node has neither num_children nor type set - this violates the Parquet spec (corrupted file)");
561+
"Node '%s' has neither num_children nor type set - this violates the Parquet spec (corrupted file)",
562+
s_ele.name.c_str());
562563
}
563564
auto result = ParseColumnSchema(s_ele, max_define, max_repeat, this_idx, next_file_idx++);
564565
if (s_ele.repetition_type == FieldRepetitionType::REPEATED) {

src/duckdb/src/common/radix_partitioning.cpp

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,34 @@ idx_t RadixPartitioning::Select(Vector &hashes, const SelectionVector *sel, cons
8383

8484
struct ComputePartitionIndicesFunctor {
8585
template <idx_t radix_bits>
86-
static void Operation(Vector &hashes, Vector &partition_indices, const SelectionVector &append_sel,
87-
const idx_t append_count) {
86+
static void Operation(Vector &hashes, Vector &partition_indices, const idx_t original_count,
87+
const SelectionVector &append_sel, const idx_t append_count) {
8888
using CONSTANTS = RadixPartitioningConstants<radix_bits>;
89-
if (append_sel.IsSet()) {
90-
auto hashes_sliced = Vector(hashes, append_sel, append_count);
91-
UnaryExecutor::Execute<hash_t, hash_t>(hashes_sliced, partition_indices, append_count,
92-
[&](hash_t hash) { return CONSTANTS::ApplyMask(hash); });
93-
} else {
89+
if (!append_sel.IsSet() || hashes.GetVectorType() == VectorType::CONSTANT_VECTOR) {
9490
UnaryExecutor::Execute<hash_t, hash_t>(hashes, partition_indices, append_count,
9591
[&](hash_t hash) { return CONSTANTS::ApplyMask(hash); });
92+
} else {
93+
// We could just slice the "hashes" vector and use the UnaryExecutor
94+
// But slicing a dictionary vector causes SelectionData to be allocated
95+
// Instead, we just directly compute the partition indices using the selection vectors
96+
UnifiedVectorFormat format;
97+
hashes.ToUnifiedFormat(original_count, format);
98+
const auto source_data = UnifiedVectorFormat::GetData<hash_t>(format);
99+
const auto &source_sel = *format.sel;
100+
101+
const auto target = FlatVector::GetData<hash_t>(partition_indices);
102+
103+
if (source_sel.IsSet()) {
104+
for (idx_t i = 0; i < append_count; i++) {
105+
const auto source_idx = source_sel.get_index(append_sel[i]);
106+
target[i] = CONSTANTS::ApplyMask(source_data[source_idx]);
107+
}
108+
} else {
109+
for (idx_t i = 0; i < append_count; i++) {
110+
const auto source_idx = append_sel[i];
111+
target[i] = CONSTANTS::ApplyMask(source_data[source_idx]);
112+
}
113+
}
96114
}
97115
}
98116
};
@@ -143,24 +161,20 @@ void RadixPartitionedColumnData::ComputePartitionIndices(PartitionedColumnDataAp
143161
D_ASSERT(partitions.size() == RadixPartitioning::NumberOfPartitions(radix_bits));
144162
D_ASSERT(state.partition_buffers.size() == RadixPartitioning::NumberOfPartitions(radix_bits));
145163
RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, input.data[hash_col_idx], state.partition_indices,
146-
*FlatVector::IncrementalSelectionVector(), input.size());
164+
input.size(), *FlatVector::IncrementalSelectionVector(),
165+
input.size());
147166
}
148167

149168
//===--------------------------------------------------------------------===//
150169
// Tuple Data Partitioning
151170
//===--------------------------------------------------------------------===//
152-
RadixPartitionedTupleData::RadixPartitionedTupleData(BufferManager &buffer_manager, const TupleDataLayout &layout_p,
153-
const idx_t radix_bits_p, const idx_t hash_col_idx_p)
154-
: PartitionedTupleData(PartitionedTupleDataType::RADIX, buffer_manager, layout_p.Copy()), radix_bits(radix_bits_p),
171+
RadixPartitionedTupleData::RadixPartitionedTupleData(BufferManager &buffer_manager,
172+
shared_ptr<TupleDataLayout> layout_ptr, const idx_t radix_bits_p,
173+
const idx_t hash_col_idx_p)
174+
: PartitionedTupleData(PartitionedTupleDataType::RADIX, buffer_manager, layout_ptr), radix_bits(radix_bits_p),
155175
hash_col_idx(hash_col_idx_p) {
156176
D_ASSERT(radix_bits <= RadixPartitioning::MAX_RADIX_BITS);
157177
D_ASSERT(hash_col_idx < layout.GetTypes().size());
158-
const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
159-
allocators->allocators.reserve(num_partitions);
160-
for (idx_t i = 0; i < num_partitions; i++) {
161-
CreateAllocator();
162-
}
163-
D_ASSERT(allocators->allocators.size() == num_partitions);
164178
Initialize();
165179
}
166180

@@ -186,8 +200,8 @@ void RadixPartitionedTupleData::InitializeAppendStateInternal(PartitionedTupleDa
186200
const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
187201
state.partition_pin_states.reserve(num_partitions);
188202
for (idx_t i = 0; i < num_partitions; i++) {
189-
state.partition_pin_states.emplace_back(make_unsafe_uniq<TupleDataPinState>());
190-
partitions[i]->InitializeAppend(*state.partition_pin_states[i], properties);
203+
state.partition_pin_states.emplace_back();
204+
partitions[i]->InitializeAppend(state.partition_pin_states[i], properties);
191205
}
192206

193207
// Init single chunk state
@@ -207,15 +221,18 @@ void RadixPartitionedTupleData::ComputePartitionIndices(PartitionedTupleDataAppe
207221
const SelectionVector &append_sel, const idx_t append_count) {
208222
D_ASSERT(partitions.size() == RadixPartitioning::NumberOfPartitions(radix_bits));
209223
RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, input.data[hash_col_idx], state.partition_indices,
210-
append_sel, append_count);
224+
input.size(), append_sel, append_count);
211225
}
212226

213-
void RadixPartitionedTupleData::ComputePartitionIndices(Vector &row_locations, idx_t count,
214-
Vector &partition_indices) const {
215-
Vector intermediate(LogicalType::HASH);
227+
void RadixPartitionedTupleData::ComputePartitionIndices(Vector &row_locations, idx_t count, Vector &partition_indices,
228+
unique_ptr<Vector> &utility_vector) const {
229+
if (!utility_vector) {
230+
utility_vector = make_uniq<Vector>(LogicalType::HASH);
231+
}
232+
Vector &intermediate = *utility_vector;
216233
partitions[0]->Gather(row_locations, *FlatVector::IncrementalSelectionVector(), count, hash_col_idx, intermediate,
217234
*FlatVector::IncrementalSelectionVector(), nullptr);
218-
RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, intermediate, partition_indices,
235+
RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, intermediate, partition_indices, count,
219236
*FlatVector::IncrementalSelectionVector(), count);
220237
}
221238

@@ -240,7 +257,7 @@ void RadixPartitionedTupleData::RepartitionFinalizeStates(PartitionedTupleData &
240257
auto &partitions = new_partitioned_data.GetPartitions();
241258
for (idx_t partition_index = from_idx; partition_index < to_idx; partition_index++) {
242259
auto &partition = *partitions[partition_index];
243-
auto &partition_pin_state = *state.partition_pin_states[partition_index];
260+
auto &partition_pin_state = state.partition_pin_states[partition_index];
244261
partition.FinalizePinState(partition_pin_state);
245262
}
246263
}

src/duckdb/src/common/row_operations/row_aggregate.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,10 @@ void RowOperations::CombineStates(RowOperationsState &state, TupleDataLayout &la
102102
void RowOperations::FinalizeStates(RowOperationsState &state, TupleDataLayout &layout, Vector &addresses,
103103
DataChunk &result, idx_t aggr_idx) {
104104
// Copy the addresses
105-
Vector addresses_copy(LogicalType::POINTER);
105+
if (!state.addresses) {
106+
state.addresses = make_uniq<Vector>(LogicalType::POINTER);
107+
}
108+
auto &addresses_copy = *state.addresses;
106109
VectorOperations::Copy(addresses, addresses_copy, result.size(), 0, 0);
107110

108111
// Move to the first aggregate state

src/duckdb/src/common/sort/partition_state.cpp

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
#include "duckdb/common/sort/partition_state.hpp"
22

3-
#include "duckdb/common/types/column/column_data_consumer.hpp"
43
#include "duckdb/common/row_operations/row_operations.hpp"
54
#include "duckdb/main/config.hpp"
65
#include "duckdb/parallel/executor_task.hpp"
76

8-
#include <numeric>
9-
107
namespace duckdb {
118

129
PartitionGlobalHashGroup::PartitionGlobalHashGroup(ClientContext &context, const Orders &partitions,
@@ -99,16 +96,17 @@ PartitionGlobalSinkState::PartitionGlobalSinkState(ClientContext &context,
9996
++max_bits;
10097
}
10198

99+
grouping_types_ptr = make_shared_ptr<TupleDataLayout>();
102100
if (!orders.empty()) {
103101
if (partitions.empty()) {
104102
// Sort early into a dedicated hash group if we only sort.
105-
grouping_types.Initialize(payload_types);
103+
grouping_types_ptr->Initialize(payload_types);
106104
auto new_group = make_uniq<PartitionGlobalHashGroup>(context, partitions, orders, payload_types, external);
107105
hash_groups.emplace_back(std::move(new_group));
108106
} else {
109107
auto types = payload_types;
110108
types.push_back(LogicalType::HASH);
111-
grouping_types.Initialize(types);
109+
grouping_types_ptr->Initialize(types);
112110
ResizeGroupingData(estimated_cardinality);
113111
}
114112
}
@@ -132,13 +130,14 @@ void PartitionGlobalSinkState::SyncPartitioning(const PartitionGlobalSinkState &
132130
const auto old_bits = grouping_data ? grouping_data->GetRadixBits() : 0;
133131
if (fixed_bits != old_bits) {
134132
const auto hash_col_idx = payload_types.size();
135-
grouping_data = make_uniq<RadixPartitionedTupleData>(buffer_manager, grouping_types, fixed_bits, hash_col_idx);
133+
grouping_data =
134+
make_uniq<RadixPartitionedTupleData>(buffer_manager, grouping_types_ptr, fixed_bits, hash_col_idx);
136135
}
137136
}
138137

139138
unique_ptr<RadixPartitionedTupleData> PartitionGlobalSinkState::CreatePartition(idx_t new_bits) const {
140139
const auto hash_col_idx = payload_types.size();
141-
return make_uniq<RadixPartitionedTupleData>(buffer_manager, grouping_types, new_bits, hash_col_idx);
140+
return make_uniq<RadixPartitionedTupleData>(buffer_manager, grouping_types_ptr, new_bits, hash_col_idx);
142141
}
143142

144143
void PartitionGlobalSinkState::ResizeGroupingData(idx_t cardinality) {
@@ -476,7 +475,7 @@ void PartitionLocalMergeState::ExecuteTask() {
476475
bool PartitionGlobalMergeState::AssignTask(PartitionLocalMergeState &local_state) {
477476
lock_guard<mutex> guard(lock);
478477

479-
if (tasks_assigned >= total_tasks) {
478+
if (tasks_assigned >= total_tasks && !TryPrepareNextStage()) {
480479
return false;
481480
}
482481

@@ -495,15 +494,13 @@ void PartitionGlobalMergeState::CompleteTask() {
495494
}
496495

497496
bool PartitionGlobalMergeState::TryPrepareNextStage() {
498-
lock_guard<mutex> guard(lock);
499-
500497
if (tasks_completed < total_tasks) {
501498
return false;
502499
}
503500

504501
tasks_assigned = tasks_completed = 0;
505502

506-
switch (stage) {
503+
switch (stage.load()) {
507504
case PartitionSortStage::INIT:
508505
// If the partitions are unordered, don't scan in parallel
509506
// because it produces non-deterministic orderings.
@@ -632,23 +629,6 @@ bool PartitionGlobalMergeStates::ExecuteTask(PartitionLocalMergeState &local_sta
632629
break;
633630
}
634631

635-
// Hash group global state couldn't assign a task to this thread
636-
// Try to prepare the next stage
637-
if (!global_state->TryPrepareNextStage()) {
638-
// This current hash group is not yet done
639-
// But we were not able to assign a task for it to this thread
640-
// See if the next hash group is better
641-
continue;
642-
}
643-
644-
// We were able to prepare the next stage for this hash group!
645-
// Try to assign a task once more
646-
if (global_state->AssignTask(local_state)) {
647-
// We assigned a task to this thread!
648-
// Break out of this loop to re-enter the top-level loop and execute the task
649-
break;
650-
}
651-
652632
// We were able to prepare the next merge round,
653633
// but we were not able to assign a task for it to this thread
654634
// The tasks were assigned to other threads while this thread waited for the lock

src/duckdb/src/common/types/hash.cpp

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
#include "duckdb/common/types/string_type.hpp"
55
#include "duckdb/common/types/interval.hpp"
66
#include "duckdb/common/types/uhugeint.hpp"
7-
#include "duckdb/common/fast_mem.hpp"
87

98
#include <functional>
109
#include <cmath>
@@ -76,6 +75,43 @@ hash_t Hash(const char *str) {
7675
return Hash(str, strlen(str));
7776
}
7877

78+
template <bool AT_LEAST_8_BYTES = false>
79+
hash_t HashBytes(const_data_ptr_t ptr, const idx_t len) noexcept {
80+
// This seed slightly improves bit distribution, taken from here:
81+
// https://github.com/martinus/robin-hood-hashing/blob/3.11.5/LICENSE
82+
// MIT License Copyright (c) 2018-2021 Martin Ankerl
83+
hash_t h = 0xe17a1465U ^ (len * 0xc6a4a7935bd1e995U);
84+
85+
// Hash/combine in blocks of 8 bytes
86+
const auto remainder = len & 7U;
87+
for (const auto end = ptr + len - remainder; ptr != end; ptr += 8U) {
88+
h ^= Load<hash_t>(ptr);
89+
h *= 0xd6e8feb86659fd93U;
90+
}
91+
92+
if (remainder != 0) {
93+
if (AT_LEAST_8_BYTES) {
94+
D_ASSERT(len >= 8);
95+
// Load remaining (<8) bytes (with a Load instead of a memcpy)
96+
const auto inv_rem = 8U - remainder;
97+
const auto hr = Load<hash_t>(ptr - inv_rem) >> (inv_rem * 8U);
98+
99+
h ^= hr;
100+
h *= 0xd6e8feb86659fd93U;
101+
} else {
102+
// Load remaining (<8) bytes (with a memcpy)
103+
hash_t hr = 0;
104+
memcpy(&hr, ptr, remainder);
105+
106+
h ^= hr;
107+
h *= 0xd6e8feb86659fd93U;
108+
}
109+
}
110+
111+
// Finalize
112+
return Hash(h);
113+
}
114+
79115
template <>
80116
hash_t Hash(string_t val) {
81117
// If the string is inlined, we can do a branchless hash
@@ -86,64 +122,38 @@ hash_t Hash(string_t val) {
86122
hash_t h = 0xe17a1465U ^ (val.GetSize() * 0xc6a4a7935bd1e995U);
87123

88124
// Hash/combine the first 8-byte block
89-
const bool not_an_empty_string = !val.Empty();
90-
h ^= Load<hash_t>(const_data_ptr_cast(val.GetPrefix()));
91-
h *= 0xd6e8feb86659fd93U * not_an_empty_string + (1 - not_an_empty_string);
125+
if (!val.Empty()) {
126+
h ^= Load<hash_t>(const_data_ptr_cast(val.GetPrefix()));
127+
h *= 0xd6e8feb86659fd93U;
128+
}
92129

93130
// Load remaining 4 bytes
94-
hash_t hr = 0;
95-
memcpy(&hr, const_data_ptr_cast(val.GetPrefix()) + sizeof(hash_t), 4U);
131+
if (val.GetSize() > sizeof(hash_t)) {
132+
hash_t hr = 0;
133+
memcpy(&hr, const_data_ptr_cast(val.GetPrefix()) + sizeof(hash_t), 4U);
96134

97-
// Process the remainder the same an 8-byte block
98-
// This operation is a NOP if the string is <= 8 bytes
99-
const bool not_a_nop = val.GetSize() > sizeof(hash_t);
100-
h ^= hr;
101-
h *= 0xd6e8feb86659fd93U * not_a_nop + (1 - not_a_nop);
135+
h ^= hr;
136+
h *= 0xd6e8feb86659fd93U;
137+
}
102138

103139
// Finalize
104140
h = Hash(h);
105141

106142
// This is just an optimization. It should not change the result
107143
// This property is important for verification (e.g., DUCKDB_DEBUG_NO_INLINE)
108-
// We achieved this with the NOP trick above (and in HashBytes)
109144
D_ASSERT(h == Hash(val.GetData(), val.GetSize()));
110145

111146
return h;
112147
}
113-
return Hash(val.GetData(), val.GetSize());
148+
// Required for DUCKDB_DEBUG_NO_INLINE
149+
return HashBytes<string_t::INLINE_LENGTH >= sizeof(hash_t)>(const_data_ptr_cast(val.GetData()), val.GetSize());
114150
}
115151

116152
template <>
117153
hash_t Hash(char *val) {
118154
return Hash<const char *>(val);
119155
}
120156

121-
hash_t HashBytes(const_data_ptr_t ptr, const idx_t len) noexcept {
122-
// This seed slightly improves bit distribution, taken from here:
123-
// https://github.com/martinus/robin-hood-hashing/blob/3.11.5/LICENSE
124-
// MIT License Copyright (c) 2018-2021 Martin Ankerl
125-
hash_t h = 0xe17a1465U ^ (len * 0xc6a4a7935bd1e995U);
126-
127-
// Hash/combine in blocks of 8 bytes
128-
for (const auto end = ptr + len - (len & 7U); ptr != end; ptr += 8U) {
129-
h ^= Load<hash_t>(ptr);
130-
h *= 0xd6e8feb86659fd93U;
131-
}
132-
133-
// Load remaining (<8) bytes
134-
hash_t hr = 0;
135-
memcpy(&hr, ptr, len & 7U);
136-
137-
// Process the remainder same as an 8-byte block
138-
// This operation is a NOP if the number of remaining bytes is 0
139-
const bool not_a_nop = len & 7U;
140-
h ^= hr;
141-
h *= 0xd6e8feb86659fd93U * not_a_nop + (1 - not_a_nop);
142-
143-
// Finalize
144-
return Hash(h);
145-
}
146-
147157
hash_t Hash(const char *val, size_t size) {
148158
return HashBytes(const_data_ptr_cast(val), size);
149159
}

0 commit comments

Comments
 (0)