Skip to content

Commit 9565239

Browse files
Update vendored DuckDB sources to 971fd3f
1 parent 971fd3f commit 9565239

File tree

21 files changed

+137
-69
lines changed

21 files changed

+137
-69
lines changed

src/duckdb/extension/parquet/decoder/dictionary_decoder.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,11 @@ void DictionaryDecoder::Filter(uint8_t *defines, const idx_t read_count, Vector
186186
D_ASSERT(filter_count > 0);
187187
// read the dictionary values
188188
const auto valid_count = Read(defines, read_count, result, 0);
189+
if (valid_count == 0) {
190+
// all values are NULL
191+
approved_tuple_count = 0;
192+
return;
193+
}
189194

190195
// apply the filter by checking the dictionary offsets directly
191196
uint32_t *offsets;

src/duckdb/extension/parquet/include/parquet_writer.hpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "duckdb/common/encryption_state.hpp"
1414
#include "duckdb/common/exception.hpp"
1515
#include "duckdb/common/mutex.hpp"
16+
#include "duckdb/common/atomic.hpp"
1617
#include "duckdb/common/serializer/buffered_file_writer.hpp"
1718
#include "duckdb/common/types/column/column_data_collection.hpp"
1819
#include "duckdb/function/copy_function.hpp"
@@ -113,8 +114,7 @@ class ParquetWriter {
113114
return *writer;
114115
}
115116
idx_t FileSize() {
116-
lock_guard<mutex> glock(lock);
117-
return writer->total_written;
117+
return total_written;
118118
}
119119
idx_t DictionarySizeLimit() const {
120120
return dictionary_size_limit;
@@ -129,8 +129,7 @@ class ParquetWriter {
129129
return compression_level;
130130
}
131131
idx_t NumberOfRowGroups() {
132-
lock_guard<mutex> glock(lock);
133-
return file_meta_data.row_groups.size();
132+
return num_row_groups;
134133
}
135134
ParquetVersion GetParquetVersion() const {
136135
return parquet_version;
@@ -170,6 +169,9 @@ class ParquetWriter {
170169
vector<ParquetColumnSchema> column_schemas;
171170

172171
unique_ptr<BufferedFileWriter> writer;
172+
//! Atomics to reduce contention when rotating writes to multiple Parquet files
173+
atomic<idx_t> total_written;
174+
atomic<idx_t> num_row_groups;
173175
std::shared_ptr<duckdb_apache::thrift::protocol::TProtocol> protocol;
174176
duckdb_parquet::FileMetaData file_meta_data;
175177
std::mutex lock;

src/duckdb/extension/parquet/parquet_writer.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,11 +352,12 @@ ParquetWriter::ParquetWriter(ClientContext &context, FileSystem &fs, string file
352352
encryption_config(std::move(encryption_config_p)), dictionary_size_limit(dictionary_size_limit_p),
353353
string_dictionary_page_size_limit(string_dictionary_page_size_limit_p),
354354
bloom_filter_false_positive_ratio(bloom_filter_false_positive_ratio_p), compression_level(compression_level_p),
355-
debug_use_openssl(debug_use_openssl_p), parquet_version(parquet_version) {
355+
debug_use_openssl(debug_use_openssl_p), parquet_version(parquet_version), total_written(0), num_row_groups(0) {
356356

357357
// initialize the file writer
358358
writer = make_uniq<BufferedFileWriter>(fs, file_name.c_str(),
359359
FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE_NEW);
360+
360361
if (encryption_config) {
361362
auto &config = DBConfig::GetConfig(context);
362363
if (config.encryption_util && debug_use_openssl) {
@@ -373,6 +374,7 @@ ParquetWriter::ParquetWriter(ClientContext &context, FileSystem &fs, string file
373374
// parquet files start with the string "PAR1"
374375
writer->WriteData(const_data_ptr_cast("PAR1"), 4);
375376
}
377+
376378
TCompactProtocolFactoryT<MyTransport> tproto_factory;
377379
protocol = tproto_factory.getProtocol(std::make_shared<MyTransport>(*writer));
378380

@@ -542,6 +544,9 @@ void ParquetWriter::FlushRowGroup(PreparedRowGroup &prepared) {
542544
// append the row group to the file meta data
543545
file_meta_data.row_groups.push_back(row_group);
544546
file_meta_data.num_rows += row_group.num_rows;
547+
548+
total_written = writer->GetTotalWritten();
549+
num_row_groups++;
545550
}
546551

547552
void ParquetWriter::Flush(ColumnDataCollection &buffer) {

src/duckdb/extension/parquet/reader/list_column_reader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ ListColumnReader::ListColumnReader(ParquetReader &reader, const ParquetColumnSch
184184

185185
void ListColumnReader::ApplyPendingSkips(data_ptr_t define_out, data_ptr_t repeat_out) {
186186
ReadInternal<TemplatedListSkipper>(pending_skips, nullptr, nullptr, nullptr);
187+
pending_skips = 0;
187188
}
188189

189190
} // namespace duckdb

src/duckdb/src/execution/operator/csv_scanner/scanner/string_value_scanner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1894,6 +1894,7 @@ void StringValueScanner::FinalizeChunkProcess() {
18941894
}
18951895
if (states.IsQuotedCurrent() && !found_error &&
18961896
state_machine->dialect_options.state_machine_options.strict_mode.GetValue()) {
1897+
type = UNTERMINATED_QUOTES;
18971898
// If we finish the execution of a buffer, and we end in a quoted state, it means we have unterminated
18981899
// quotes
18991900
result.current_errors.Insert(type, result.cur_col_id, result.chunk_col_id, result.last_position);

src/duckdb/src/execution/operator/csv_scanner/sniffer/dialect_detection.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,8 @@ void CSVSniffer::AnalyzeDialectCandidate(unique_ptr<ColumnCountScanner> scanner,
391391
return;
392392
} else {
393393
// Give preference to one that got escaped
394-
if (!scanner->ever_escaped && candidates.front()->ever_escaped) {
394+
if (!scanner->ever_escaped && candidates.front()->ever_escaped &&
395+
sniffing_state_machine.dialect_options.state_machine_options.strict_mode.GetValue()) {
395396
return;
396397
}
397398
if (best_consistent_rows == consistent_rows && num_cols >= max_columns_found) {
@@ -413,9 +414,19 @@ void CSVSniffer::AnalyzeDialectCandidate(unique_ptr<ColumnCountScanner> scanner,
413414
return;
414415
}
415416
if (quoted && num_cols < max_columns_found) {
416-
for (auto &candidate : candidates) {
417-
if (candidate->ever_quoted) {
418-
return;
417+
if (scanner->ever_escaped &&
418+
sniffing_state_machine.dialect_options.state_machine_options.strict_mode.GetValue()) {
419+
for (auto &candidate : candidates) {
420+
if (candidate->ever_quoted && candidate->ever_escaped) {
421+
return;
422+
}
423+
}
424+
425+
} else {
426+
for (auto &candidate : candidates) {
427+
if (candidate->ever_quoted) {
428+
return;
429+
}
419430
}
420431
}
421432
}
@@ -434,7 +445,6 @@ void CSVSniffer::AnalyzeDialectCandidate(unique_ptr<ColumnCountScanner> scanner,
434445
} else if (!options.null_padding) {
435446
sniffing_state_machine.dialect_options.skip_rows = dirty_notes;
436447
}
437-
438448
candidates.clear();
439449
sniffing_state_machine.dialect_options.num_cols = num_cols;
440450
lines_sniffed = sniffed_column_counts.result_position;

src/duckdb/src/execution/operator/persistent/physical_copy_to_file.cpp

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ using vector_of_value_map_t = unordered_map<vector<Value>, T, VectorOfValuesHash
4848
class CopyToFunctionGlobalState : public GlobalSinkState {
4949
public:
5050
explicit CopyToFunctionGlobalState(ClientContext &context)
51-
: initialized(false), rows_copied(0), last_file_offset(0) {
51+
: initialized(false), rows_copied(0), last_file_offset(0),
52+
file_write_lock_if_rotating(make_uniq<StorageLock>()) {
5253
max_open_files = ClientConfig::GetConfig(context).partitioned_write_max_open_files;
5354
}
5455
StorageLock lock;
@@ -64,6 +65,8 @@ class CopyToFunctionGlobalState : public GlobalSinkState {
6465
vector<unique_ptr<CopyToFileInfo>> written_files;
6566
//! Max open files
6667
idx_t max_open_files;
68+
//! If rotate is true, this lock is used
69+
unique_ptr<StorageLock> file_write_lock_if_rotating;
6770

6871
void Initialize(ClientContext &context, const PhysicalCopyToFile &op) {
6972
if (initialized) {
@@ -472,6 +475,43 @@ PhysicalCopyToFile::PhysicalCopyToFile(vector<LogicalType> types, CopyFunction f
472475
function(std::move(function_p)), bind_data(std::move(bind_data)), parallel(false) {
473476
}
474477

478+
void PhysicalCopyToFile::WriteRotateInternal(ExecutionContext &context, GlobalSinkState &global_state,
479+
const std::function<void(GlobalFunctionData &)> &fun) const {
480+
auto &g = global_state.Cast<CopyToFunctionGlobalState>();
481+
482+
// Loop until we can write (synchronize using locks when using parallel writes to the same files and "rotate")
483+
while (true) {
484+
// Grab global lock and dereference the current file state (and corresponding lock)
485+
auto global_guard = g.lock.GetExclusiveLock();
486+
auto &file_state = *g.global_state;
487+
auto &file_lock = *g.file_write_lock_if_rotating;
488+
if (rotate && function.rotate_next_file(file_state, *bind_data, file_size_bytes)) {
489+
// Global state must be rotated. Move to local scope, create an new one, and immediately release global lock
490+
auto owned_gstate = std::move(g.global_state);
491+
g.global_state = CreateFileState(context.client, *sink_state, *global_guard);
492+
auto owned_lock = std::move(g.file_write_lock_if_rotating);
493+
g.file_write_lock_if_rotating = make_uniq<StorageLock>();
494+
global_guard.reset();
495+
496+
// This thread now waits for the exclusive lock on this file while other threads complete their writes
497+
// Note that new writes can still start, as there is already a new global state
498+
auto file_guard = owned_lock->GetExclusiveLock();
499+
function.copy_to_finalize(context.client, *bind_data, *owned_gstate);
500+
} else {
501+
// Get shared file write lock while holding global lock,
502+
// so file can't be rotated before we get the write lock
503+
auto file_guard = file_lock.GetSharedLock();
504+
505+
// Because we got the shared lock on the file, we're sure that it will keep existing until we release it
506+
global_guard.reset();
507+
508+
// Sink/Combine!
509+
fun(file_state);
510+
break;
511+
}
512+
}
513+
}
514+
475515
SinkResultType PhysicalCopyToFile::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
476516
auto &g = input.global_state.Cast<CopyToFunctionGlobalState>();
477517
auto &l = input.local_state.Cast<CopyToFunctionLocalState>();
@@ -507,20 +547,9 @@ SinkResultType PhysicalCopyToFile::Sink(ExecutionContext &context, DataChunk &ch
507547
return SinkResultType::NEED_MORE_INPUT;
508548
}
509549

510-
// FILE_SIZE_BYTES/rotate is set, but threads write to the same file, synchronize using lock
511-
auto &gstate = g.global_state;
512-
auto global_lock = g.lock.GetExclusiveLock();
513-
if (rotate && function.rotate_next_file(*gstate, *bind_data, file_size_bytes)) {
514-
auto owned_gstate = std::move(gstate);
515-
gstate = CreateFileState(context.client, *sink_state, *global_lock);
516-
global_lock.reset();
517-
function.copy_to_finalize(context.client, *bind_data, *owned_gstate);
518-
} else {
519-
global_lock.reset();
520-
}
521-
522-
global_lock = g.lock.GetSharedLock();
523-
function.copy_to_sink(context, *bind_data, *gstate, *l.local_state, chunk);
550+
WriteRotateInternal(context, input.global_state, [&](GlobalFunctionData &gstate) {
551+
function.copy_to_sink(context, *bind_data, gstate, *l.local_state, chunk);
552+
});
524553

525554
return SinkResultType::NEED_MORE_INPUT;
526555
}
@@ -540,9 +569,9 @@ SinkCombineResultType PhysicalCopyToFile::Combine(ExecutionContext &context, Ope
540569
function.copy_to_finalize(context.client, *bind_data, *l.global_state);
541570
}
542571
} else if (rotate) {
543-
// File in global state may change with FILE_SIZE_BYTES/rotate, need to grab lock
544-
auto lock = g.lock.GetSharedLock();
545-
function.copy_to_combine(context, *bind_data, *g.global_state, *l.local_state);
572+
WriteRotateInternal(context, input.global_state, [&](GlobalFunctionData &gstate) {
573+
function.copy_to_combine(context, *bind_data, gstate, *l.local_state);
574+
});
546575
} else if (g.global_state) {
547576
function.copy_to_combine(context, *bind_data, *g.global_state, *l.local_state);
548577
}

src/duckdb/src/execution/operator/projection/physical_unnest.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ OperatorResultType PhysicalUnnest::ExecuteInternal(ExecutionContext &context, Da
186186
list_offset = list_entry.offset;
187187
}
188188
// unnest any entries we can
189-
idx_t unnest_length = MinValue<idx_t>(list_length - state.list_position, current_row_length);
189+
idx_t unnest_length = MinValue<idx_t>(
190+
list_length - MinValue<idx_t>(list_length, state.list_position), current_row_length);
190191
auto &unnest_sel = state.unnest_sels[col_idx];
191192
for (idx_t r = 0; r < unnest_length; r++) {
192193
unnest_sel.set_index(result_length + r, list_offset + state.list_position + r);
@@ -228,14 +229,12 @@ OperatorResultType PhysicalUnnest::ExecuteInternal(ExecutionContext &context, Da
228229
col_offset = input.ColumnCount();
229230
}
230231
for (idx_t col_idx = 0; col_idx < state.list_data.ColumnCount(); col_idx++) {
231-
if (state.list_data.data[col_idx].GetType() == LogicalType::SQLNULL) {
232-
// UNNEST(NULL)
233-
chunk.SetCardinality(0);
234-
break;
235-
}
236232
auto &list_vector = state.list_data.data[col_idx];
237233
auto &result_vector = chunk.data[col_offset + col_idx];
238-
if (ListVector::GetListSize(list_vector) == 0) {
234+
if (state.list_data.data[col_idx].GetType() == LogicalType::SQLNULL ||
235+
ListType::GetChildType(state.list_data.data[col_idx].GetType()) == LogicalType::SQLNULL ||
236+
ListVector::GetListSize(list_vector) == 0) {
237+
// UNNEST(NULL) or UNNEST([])
239238
// we cannot slice empty lists - but if our child list is empty we can only return NULL anyway
240239
result_vector.SetVectorType(VectorType::CONSTANT_VECTOR);
241240
ConstantVector::SetNull(result_vector, true);

src/duckdb/src/function/scalar/string/concat.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,9 +335,9 @@ static unique_ptr<BaseStatistics> ListConcatStats(ClientContext &context, Functi
335335

336336
ScalarFunction ListConcatFun::GetFunction() {
337337
// The arguments and return types are set in the binder function.
338-
auto fun = ScalarFunction({LogicalType::LIST(LogicalType::ANY), LogicalType::LIST(LogicalType::ANY)},
339-
LogicalType::LIST(LogicalType::ANY), ConcatFunction, BindConcatFunction, nullptr,
338+
auto fun = ScalarFunction({}, LogicalType::LIST(LogicalType::ANY), ConcatFunction, BindConcatFunction, nullptr,
340339
ListConcatStats);
340+
fun.varargs = LogicalType::LIST(LogicalType::ANY);
341341
fun.null_handling = FunctionNullHandling::SPECIAL_HANDLING;
342342
return fun;
343343
}

src/duckdb/src/function/table/version/pragma_version.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#ifndef DUCKDB_PATCH_VERSION
2-
#define DUCKDB_PATCH_VERSION "0-dev2049"
2+
#define DUCKDB_PATCH_VERSION "0-dev2110"
33
#endif
44
#ifndef DUCKDB_MINOR_VERSION
55
#define DUCKDB_MINOR_VERSION 3
@@ -8,10 +8,10 @@
88
#define DUCKDB_MAJOR_VERSION 1
99
#endif
1010
#ifndef DUCKDB_VERSION
11-
#define DUCKDB_VERSION "v1.3.0-dev2049"
11+
#define DUCKDB_VERSION "v1.3.0-dev2110"
1212
#endif
1313
#ifndef DUCKDB_SOURCE_ID
14-
#define DUCKDB_SOURCE_ID "daf2fa3674"
14+
#define DUCKDB_SOURCE_ID "7912713493"
1515
#endif
1616
#include "duckdb/function/table/system_functions.hpp"
1717
#include "duckdb/main/database.hpp"

0 commit comments

Comments
 (0)