Skip to content

Commit 4c92b74

Browse files
Update vendored DuckDB sources to 1265bd0
1 parent 1265bd0 commit 4c92b74

File tree

28 files changed

+187
-63
lines changed

28 files changed

+187
-63
lines changed

src/duckdb/extension/parquet/column_reader.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -318,12 +318,14 @@ void ColumnReader::PreparePageV2(PageHeader &page_hdr) {
318318

319319
auto compressed_bytes = page_hdr.compressed_page_size - uncompressed_bytes;
320320

321-
ResizeableBuffer compressed_buffer;
322-
compressed_buffer.resize(GetAllocator(), compressed_bytes);
323-
reader.ReadData(*protocol, compressed_buffer.ptr, compressed_bytes);
321+
if (compressed_bytes > 0) {
322+
ResizeableBuffer compressed_buffer;
323+
compressed_buffer.resize(GetAllocator(), compressed_bytes);
324+
reader.ReadData(*protocol, compressed_buffer.ptr, compressed_bytes);
324325

325-
DecompressInternal(chunk->meta_data.codec, compressed_buffer.ptr, compressed_bytes, block->ptr + uncompressed_bytes,
326-
page_hdr.uncompressed_page_size - uncompressed_bytes);
326+
DecompressInternal(chunk->meta_data.codec, compressed_buffer.ptr, compressed_bytes,
327+
block->ptr + uncompressed_bytes, page_hdr.uncompressed_page_size - uncompressed_bytes);
328+
}
327329
}
328330

329331
void ColumnReader::AllocateBlock(idx_t size) {

src/duckdb/extension/parquet/include/writer/templated_column_writer.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
224224

225225
auto &state = state_p.Cast<StandardColumnWriterState<SRC, TGT, OP>>();
226226
if (state.dictionary.GetSize() == 0 || state.dictionary.IsFull()) {
227+
state.dictionary.Reset();
227228
if (writer.GetParquetVersion() == ParquetVersion::V1) {
228229
// Can't do the cool stuff for V1
229230
state.encoding = duckdb_parquet::Encoding::PLAIN;

src/duckdb/extension/parquet/parquet_extension.cpp

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,9 @@ struct ParquetWriteBindData : public TableFunctionData {
781781

782782
struct ParquetWriteGlobalState : public GlobalFunctionData {
783783
unique_ptr<ParquetWriter> writer;
784+
785+
mutex lock;
786+
unique_ptr<ColumnDataCollection> combine_buffer;
784787
};
785788

786789
struct ParquetWriteLocalState : public LocalFunctionData {
@@ -986,16 +989,45 @@ void ParquetWriteSink(ExecutionContext &context, FunctionData &bind_data_p, Glob
986989
}
987990
}
988991

989-
void ParquetWriteCombine(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate,
992+
void ParquetWriteCombine(ExecutionContext &context, FunctionData &bind_data_p, GlobalFunctionData &gstate,
990993
LocalFunctionData &lstate) {
994+
auto &bind_data = bind_data_p.Cast<ParquetWriteBindData>();
991995
auto &global_state = gstate.Cast<ParquetWriteGlobalState>();
992996
auto &local_state = lstate.Cast<ParquetWriteLocalState>();
993-
// flush any data left in the local state to the file
994-
global_state.writer->Flush(local_state.buffer);
997+
998+
if (local_state.buffer.Count() >= bind_data.row_group_size / 2 ||
999+
local_state.buffer.SizeInBytes() >= bind_data.row_group_size_bytes / 2) {
1000+
// local state buffer is more than half of the row_group_size(_bytes), just flush it
1001+
global_state.writer->Flush(local_state.buffer);
1002+
return;
1003+
}
1004+
1005+
unique_lock<mutex> guard(global_state.lock);
1006+
if (global_state.combine_buffer) {
1007+
// There is still some data, combine it
1008+
global_state.combine_buffer->Combine(local_state.buffer);
1009+
if (global_state.combine_buffer->Count() >= bind_data.row_group_size / 2 ||
1010+
global_state.combine_buffer->SizeInBytes() >= bind_data.row_group_size_bytes / 2) {
1011+
// After combining, the combine buffer is more than half of the row_group_size(_bytes), so we flush
1012+
auto owned_combine_buffer = std::move(global_state.combine_buffer);
1013+
guard.unlock();
1014+
// Lock free, of course
1015+
global_state.writer->Flush(*owned_combine_buffer);
1016+
}
1017+
return;
1018+
}
1019+
1020+
global_state.combine_buffer = make_uniq<ColumnDataCollection>(context.client, local_state.buffer.Types());
1021+
global_state.combine_buffer->Combine(local_state.buffer);
9951022
}
9961023

9971024
void ParquetWriteFinalize(ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate) {
9981025
auto &global_state = gstate.Cast<ParquetWriteGlobalState>();
1026+
// flush the combine buffer (if it's there)
1027+
if (global_state.combine_buffer) {
1028+
global_state.writer->Flush(*global_state.combine_buffer);
1029+
}
1030+
9991031
// finalize: write any additional metadata to the file here
10001032
global_state.writer->Finalize();
10011033
}

src/duckdb/extension/parquet/writer/primitive_column_writer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,12 @@ void PrimitiveColumnWriter::WriteDictionary(PrimitiveColumnWriterState &state, u
391391
write_info.compressed_buf);
392392
hdr.compressed_page_size = UnsafeNumericCast<int32_t>(write_info.compressed_size);
393393

394+
if (write_info.compressed_buf) {
395+
// if the data has been compressed, we no longer need the uncompressed data
396+
D_ASSERT(write_info.compressed_buf.get() == write_info.compressed_data);
397+
write_info.temp_writer.reset();
398+
}
399+
394400
// insert the dictionary page as the first page to write for this column
395401
state.write_info.insert(state.write_info.begin(), std::move(write_info));
396402
}

src/duckdb/src/catalog/catalog_entry/table_catalog_entry.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,4 +343,10 @@ virtual_column_map_t TableCatalogEntry::GetVirtualColumns() const {
343343
return virtual_columns;
344344
}
345345

346+
vector<column_t> TableCatalogEntry::GetRowIdColumns() const {
347+
vector<column_t> result;
348+
result.push_back(COLUMN_IDENTIFIER_ROW_ID);
349+
return result;
350+
}
351+
346352
} // namespace duckdb

src/duckdb/src/common/exception.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,7 @@ FatalException::FatalException(ExceptionType type, const string &msg) : Exceptio
334334

335335
InternalException::InternalException(const string &msg) : Exception(ExceptionType::INTERNAL, msg) {
336336
#ifdef DUCKDB_CRASH_ON_ASSERT
337-
Printer::Print("ABORT THROWN BY INTERNAL EXCEPTION: " + msg);
338-
Printer::Print(StackTrace::GetStackTrace());
337+
Printer::Print("ABORT THROWN BY INTERNAL EXCEPTION: " + msg + "\n" + StackTrace::GetStackTrace());
339338
abort();
340339
#endif
341340
}

src/duckdb/src/common/file_system.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,11 @@ time_t FileSystem::GetLastModifiedTime(FileHandle &handle) {
399399
throw NotImplementedException("%s: GetLastModifiedTime is not implemented!", GetName());
400400
}
401401

402+
string FileSystem::GetVersionTag(FileHandle &handle) {
403+
// Used to check cache invalidation for httpfs files with an ETag in CachingFileSystem
404+
return "";
405+
}
406+
402407
FileType FileSystem::GetFileType(FileHandle &handle) {
403408
return FileType::FILE_TYPE_INVALID;
404409
}

src/duckdb/src/common/multi_file/multi_file_reader.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,8 @@ void MultiFileReader::FinalizeBind(MultiFileReaderData &reader_data, const Multi
275275
auto global_idx = MultiFileGlobalIndex(i);
276276
auto &col_id = global_column_ids[i];
277277
auto column_id = col_id.GetPrimaryIndex();
278-
if (options.filename_idx.IsValid() && column_id == options.filename_idx.GetIndex()) {
278+
if ((options.filename_idx.IsValid() && column_id == options.filename_idx.GetIndex()) ||
279+
column_id == MultiFileReader::COLUMN_IDENTIFIER_FILENAME) {
279280
// filename
280281
reader_data.constant_map.Add(global_idx, Value(filename));
281282
continue;

src/duckdb/src/common/printer.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,17 @@ void Printer::RawPrint(OutputStream stream, const string &str) {
3030
#endif
3131
}
3232

33-
// LCOV_EXCL_START
34-
void Printer::Print(OutputStream stream, const string &str) {
33+
void Printer::DefaultLinePrint(OutputStream stream, const string &str) {
3534
Printer::RawPrint(stream, str);
3635
Printer::RawPrint(stream, "\n");
3736
}
37+
38+
line_printer_f Printer::line_printer = Printer::DefaultLinePrint;
39+
40+
// LCOV_EXCL_START
41+
void Printer::Print(OutputStream stream, const string &str) {
42+
Printer::line_printer(stream, str);
43+
}
3844
void Printer::Flush(OutputStream stream) {
3945
#ifndef DUCKDB_DISABLE_PRINT
4046
fflush(stream == OutputStream::STREAM_STDERR ? stderr : stdout);

src/duckdb/src/execution/operator/persistent/physical_copy_to_file.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -580,9 +580,8 @@ SinkCombineResultType PhysicalCopyToFile::Combine(ExecutionContext &context, Ope
580580
return SinkCombineResultType::FINISHED;
581581
}
582582

583-
SinkFinalizeType PhysicalCopyToFile::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
584-
OperatorSinkFinalizeInput &input) const {
585-
auto &gstate = input.global_state.Cast<CopyToFunctionGlobalState>();
583+
SinkFinalizeType PhysicalCopyToFile::FinalizeInternal(ClientContext &context, GlobalSinkState &global_state) const {
584+
auto &gstate = global_state.Cast<CopyToFunctionGlobalState>();
586585
if (partition_output) {
587586
// finalize any outstanding partitions
588587
gstate.FinalizePartitions(context, *this);
@@ -612,6 +611,11 @@ SinkFinalizeType PhysicalCopyToFile::Finalize(Pipeline &pipeline, Event &event,
612611
return SinkFinalizeType::READY;
613612
}
614613

614+
SinkFinalizeType PhysicalCopyToFile::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
615+
OperatorSinkFinalizeInput &input) const {
616+
return FinalizeInternal(context, input.global_state);
617+
}
618+
615619
//===--------------------------------------------------------------------===//
616620
// Source
617621
//===--------------------------------------------------------------------===//

0 commit comments

Comments
 (0)