Skip to content

Commit ae4f660

Browse files
Update vendored DuckDB sources to 9507d49
1 parent 9507d49 commit ae4f660

File tree

16 files changed

+130
-39
lines changed

16 files changed

+130
-39
lines changed

src/duckdb/extension/parquet/include/parquet_reader.hpp

+6
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ struct ParquetUnionData : public BaseUnionData {
132132
};
133133

134134
class ParquetReader : public BaseFileReader {
135+
public:
136+
// Reserved field id used for the "ord" field according to the iceberg spec (used for file_row_number)
137+
static constexpr int32_t ORDINAL_FIELD_ID = 2147483645;
138+
135139
public:
136140
ParquetReader(ClientContext &context, string file_name, ParquetOptions parquet_options,
137141
shared_ptr<ParquetFileMetadataCache> metadata = nullptr);
@@ -174,6 +178,8 @@ class ParquetReader : public BaseFileReader {
174178
return "Parquet";
175179
}
176180

181+
void AddVirtualColumn(column_t virtual_column_id) override;
182+
177183
private:
178184
//! Construct a parquet reader but **do not** open a file, used in ReadStatistics only
179185
ParquetReader(ClientContext &context, ParquetOptions parquet_options,

src/duckdb/extension/parquet/parquet_extension.cpp

+11-8
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ static void ParseFileRowNumberOption(MultiFileReaderBindData &bind_data, Parquet
134134
"Using file_row_number option on file with column named file_row_number is not supported");
135135
}
136136

137-
bind_data.file_row_number_idx = names.size();
138137
return_types.emplace_back(LogicalType::BIGINT);
139138
names.emplace_back("file_row_number");
140139
}
@@ -186,6 +185,14 @@ static void BindSchema(ClientContext &context, vector<LogicalType> &return_types
186185
res.default_expression = make_uniq<ConstantExpression>(column.default_value);
187186
reader_bind.schema.emplace_back(std::move(res));
188187
}
188+
ParseFileRowNumberOption(reader_bind, options, return_types, names);
189+
if (options.file_row_number) {
190+
MultiFileColumnDefinition res("file_row_number", LogicalType::BIGINT);
191+
res.identifier = Value::INTEGER(ParquetReader::ORDINAL_FIELD_ID);
192+
schema_col_names.push_back(res.name);
193+
schema_col_types.push_back(res.type);
194+
reader_bind.schema.emplace_back(std::move(res));
195+
}
189196

190197
if (match_by_field_id) {
191198
reader_bind.mapping = MultiFileColumnMappingMode::BY_FIELD_ID;
@@ -200,8 +207,6 @@ static void BindSchema(ClientContext &context, vector<LogicalType> &return_types
200207
names = schema_col_names;
201208
return_types = schema_col_types;
202209
D_ASSERT(names.size() == return_types.size());
203-
204-
ParseFileRowNumberOption(reader_bind, options, return_types, names);
205210
}
206211

207212
void ParquetMultiFileInfo::BindReader(ClientContext &context, vector<LogicalType> &return_types, vector<string> &names,
@@ -465,10 +470,6 @@ optional_idx ParquetMultiFileInfo::MaxThreads(const MultiFileBindData &bind_data
465470

466471
void ParquetMultiFileInfo::FinalizeBindData(MultiFileBindData &multi_file_data) {
467472
auto &bind_data = multi_file_data.bind_data->Cast<ParquetReadBindData>();
468-
// Enable the parquet file_row_number on the parquet options if the file_row_number_idx was set
469-
if (multi_file_data.reader_bind.file_row_number_idx != DConstants::INVALID_INDEX) {
470-
bind_data.parquet_options.file_row_number = true;
471-
}
472473
if (multi_file_data.initial_reader) {
473474
auto &initial_reader = multi_file_data.initial_reader->Cast<ParquetReader>();
474475
bind_data.initial_file_cardinality = initial_reader.NumRows();
@@ -498,7 +499,9 @@ double ParquetMultiFileInfo::GetProgressInFile(ClientContext &context, const Bas
498499
return 100.0 * (static_cast<double>(read_rows) / static_cast<double>(parquet_reader.NumRows()));
499500
}
500501

501-
void ParquetMultiFileInfo::GetVirtualColumns(ClientContext &, MultiFileBindData &, virtual_column_map_t &) {
502+
void ParquetMultiFileInfo::GetVirtualColumns(ClientContext &, MultiFileBindData &, virtual_column_map_t &result) {
503+
result.insert(make_pair(MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER,
504+
TableColumn("file_row_number", LogicalType::BIGINT)));
502505
}
503506

504507
shared_ptr<BaseFileReader> ParquetMultiFileInfo::CreateReader(ClientContext &context, GlobalTableFunctionState &,

src/duckdb/extension/parquet/parquet_reader.cpp

+21-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "duckdb/storage/object_cache.hpp"
2828
#include "duckdb/optimizer/statistics_propagator.hpp"
2929
#include "duckdb/planner/table_filter_state.hpp"
30+
#include "duckdb/common/multi_file/multi_file_reader.hpp"
3031

3132
#include <cassert>
3233
#include <chrono>
@@ -35,6 +36,8 @@
3536

3637
namespace duckdb {
3738

39+
constexpr int32_t ParquetReader::ORDINAL_FIELD_ID;
40+
3841
using duckdb_parquet::ColumnChunk;
3942
using duckdb_parquet::ConvertedType;
4043
using duckdb_parquet::FieldRepetitionType;
@@ -573,6 +576,11 @@ ParquetColumnSchema ParquetReader::ParseSchemaRecursive(idx_t depth, idx_t max_d
573576
}
574577
}
575578

579+
ParquetColumnSchema FileRowNumberSchema() {
580+
return ParquetColumnSchema("file_row_number", LogicalType::BIGINT, 0, 0, 0, 0,
581+
ParquetColumnSchemaType::FILE_ROW_NUMBER);
582+
}
583+
576584
unique_ptr<ParquetColumnSchema> ParquetReader::ParseSchema() {
577585
auto file_meta_data = GetFileMetadata();
578586
idx_t next_schema_idx = 0;
@@ -598,16 +606,18 @@ unique_ptr<ParquetColumnSchema> ParquetReader::ParseSchema() {
598606
"Using file_row_number option on file with column named file_row_number is not supported");
599607
}
600608
}
601-
ParquetColumnSchema file_row_number("file_row_number", LogicalType::BIGINT, 0, 0, 0, 0,
602-
ParquetColumnSchemaType::FILE_ROW_NUMBER);
603-
root.children.push_back(std::move(file_row_number));
609+
root.children.push_back(FileRowNumberSchema());
604610
}
605611
return make_uniq<ParquetColumnSchema>(root);
606612
}
607613

608614
MultiFileColumnDefinition ParquetReader::ParseColumnDefinition(const FileMetaData &file_meta_data,
609615
ParquetColumnSchema &element) {
610616
MultiFileColumnDefinition result(element.name, element.type);
617+
if (element.schema_type == ParquetColumnSchemaType::FILE_ROW_NUMBER) {
618+
result.identifier = Value::INTEGER(ORDINAL_FIELD_ID);
619+
return result;
620+
}
611621
auto &column_schema = file_meta_data.schema[element.schema_index];
612622

613623
if (column_schema.__isset.field_id) {
@@ -645,6 +655,14 @@ void ParquetReader::InitializeSchema(ClientContext &context) {
645655
}
646656
}
647657

658+
void ParquetReader::AddVirtualColumn(column_t virtual_column_id) {
659+
if (virtual_column_id == MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER) {
660+
root_schema->children.push_back(FileRowNumberSchema());
661+
} else {
662+
throw InternalException("Unsupported virtual column id %d for parquet reader", virtual_column_id);
663+
}
664+
}
665+
648666
ParquetOptions::ParquetOptions(ClientContext &context) {
649667
Value binary_as_string_val;
650668
if (context.TryGetCurrentSetting("binary_as_string", binary_as_string_val)) {

src/duckdb/src/common/multi_file/multi_file_column_mapper.cpp

+33-18
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,14 @@ unique_ptr<Expression> ConstructMapExpression(ClientContext &context, idx_t loca
365365
std::move(bind_data));
366366
}
367367

368+
bool VirtualColumnIsConstant(column_t column_id) {
369+
if (column_id == COLUMN_IDENTIFIER_EMPTY ||
370+
COLUMN_IDENTIFIER_EMPTY == MultiFileReader::COLUMN_IDENTIFIER_FILENAME) {
371+
return true;
372+
}
373+
return false;
374+
}
375+
368376
ResultColumnMapping MultiFileColumnMapper::CreateColumnMappingByMapper(const ColumnMapper &mapper) {
369377
auto &reader = *reader_data.reader;
370378
auto &local_columns = reader.GetColumns();
@@ -397,28 +405,36 @@ ResultColumnMapping MultiFileColumnMapper::CreateColumnMappingByMapper(const Col
397405
auto &global_id = global_column_ids[i];
398406
auto global_column_id = global_id.GetPrimaryIndex();
399407

408+
auto local_idx = MultiFileLocalIndex(reader.column_ids.size());
400409
if (IsVirtualColumn(global_column_id)) {
401-
// virtual column - these are emitted for every file
410+
// virtual column - look it up in the virtual column entry map
402411
auto virtual_entry = virtual_columns.find(global_column_id);
403412
if (virtual_entry == virtual_columns.end()) {
404413
throw InternalException("Virtual column id %d not found in virtual columns map", global_column_id);
405414
}
406-
expressions.push_back(make_uniq<BoundConstantExpression>(Value(virtual_entry->second.type)));
407-
continue;
408-
}
409-
410-
auto local_idx = MultiFileLocalIndex(reader.column_ids.size());
411-
if (global_column_id >= global_columns.size()) {
412-
if (bind_data.file_row_number_idx == global_column_id) {
413-
// FIXME: this needs a more extensible solution
414-
auto new_column_id = MultiFileLocalColumnId(mapper.MapCount());
415-
reader.column_ids.push_back(new_column_id);
416-
reader.column_indexes.emplace_back(mapper.MapCount());
417-
//! FIXME: what to do here???
418-
expressions.push_back(make_uniq<BoundReferenceExpression>(LogicalType::BIGINT, local_idx));
419-
} else {
420-
throw InternalException("Unexpected generated column");
415+
auto &virtual_column_type = virtual_entry->second.type;
416+
// check if this column is constant for the file
417+
if (VirtualColumnIsConstant(global_column_id)) {
418+
// the column is constant (e.g. filename or empty) - no need to project from the file
419+
expressions.push_back(make_uniq<BoundConstantExpression>(Value(virtual_column_type)));
420+
continue;
421421
}
422+
// the column is not constant for the file - need to push into the projection list
423+
auto expr = make_uniq<BoundReferenceExpression>(virtual_column_type, local_idx.GetIndex());
424+
reader_data.expressions.push_back(std::move(expr));
425+
426+
MultiFileLocalColumnId local_id(reader.columns.size());
427+
ColumnIndex local_index(local_id.GetId());
428+
429+
// add the virtual column to the reader
430+
reader.columns.emplace_back(virtual_entry->second.name, virtual_column_type);
431+
reader.AddVirtualColumn(global_column_id);
432+
433+
// set it as being projected in this spot
434+
MultiFileColumnMap index_mapping(local_idx, virtual_column_type, virtual_column_type);
435+
result.global_to_local.insert(make_pair(global_idx.GetIndex(), std::move(index_mapping)));
436+
reader.column_ids.push_back(local_id);
437+
reader.column_indexes.push_back(std::move(local_index));
422438
continue;
423439
}
424440

@@ -433,8 +449,7 @@ ResultColumnMapping MultiFileColumnMapper::CreateColumnMappingByMapper(const Col
433449
ColumnIndex local_index(local_id.GetId());
434450
auto &local_type = local_columns[local_id.GetId()].type;
435451
auto &global_type = global_column.type;
436-
auto local_idx = reader.column_ids.size();
437-
auto expr = make_uniq<BoundReferenceExpression>(global_type, local_idx);
452+
auto expr = make_uniq<BoundReferenceExpression>(global_type, local_idx.GetIndex());
438453
if (global_type != local_type) {
439454
reader.cast_map[local_id.GetId()] = global_type;
440455
} else {

src/duckdb/src/common/multi_file/multi_file_reader.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
namespace duckdb {
1717

1818
constexpr column_t MultiFileReader::COLUMN_IDENTIFIER_FILENAME;
19+
constexpr column_t MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER;
1920

2021
MultiFileReaderGlobalState::~MultiFileReaderGlobalState() {
2122
}

src/duckdb/src/function/table/version/pragma_version.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#ifndef DUCKDB_PATCH_VERSION
2-
#define DUCKDB_PATCH_VERSION "0-dev2195"
2+
#define DUCKDB_PATCH_VERSION "0-dev2212"
33
#endif
44
#ifndef DUCKDB_MINOR_VERSION
55
#define DUCKDB_MINOR_VERSION 3
@@ -8,10 +8,10 @@
88
#define DUCKDB_MAJOR_VERSION 1
99
#endif
1010
#ifndef DUCKDB_VERSION
11-
#define DUCKDB_VERSION "v1.3.0-dev2195"
11+
#define DUCKDB_VERSION "v1.3.0-dev2212"
1212
#endif
1313
#ifndef DUCKDB_SOURCE_ID
14-
#define DUCKDB_SOURCE_ID "cf02bffeb0"
14+
#define DUCKDB_SOURCE_ID "946a2bd1bf"
1515
#endif
1616
#include "duckdb/function/table/system_functions.hpp"
1717
#include "duckdb/main/database.hpp"

src/duckdb/src/include/duckdb/common/multi_file/base_file_reader.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ class BaseFileReader {
5656
//! Whether or not to push casts into the cast map
5757
return false;
5858
}
59+
//! Adds a virtual column to be projected at the end
60+
virtual void AddVirtualColumn(column_t virtual_column_id) {
61+
throw InternalException("Reader %s does not support AddVirtualColumn", GetReaderType());
62+
}
5963

6064
public:
6165
template <class TARGET>

src/duckdb/src/include/duckdb/common/multi_file/multi_file_reader.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ enum class ReaderInitializeType { INITIALIZED, SKIP_READING_FILE };
3333
struct MultiFileReader {
3434
public:
3535
static constexpr column_t COLUMN_IDENTIFIER_FILENAME = UINT64_C(9223372036854775808);
36+
static constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = UINT64_C(9223372036854775809);
3637

3738
public:
3839
virtual ~MultiFileReader();

src/duckdb/src/include/duckdb/common/multi_file/multi_file_states.hpp

-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ struct MultiFileReaderBindData {
2121
column_t filename_idx = DConstants::INVALID_INDEX;
2222
//! The set of hive partitioning indexes (if any)
2323
vector<HivePartitioningIndex> hive_partitioning_indexes;
24-
//! The (global) column id of the file_row_number column (if any)
25-
column_t file_row_number_idx = DConstants::INVALID_INDEX;
2624
//! (optional) The schema set by the multi file reader
2725
vector<MultiFileColumnDefinition> schema;
2826
//! The method used to map local -> global columns

src/duckdb/src/include/duckdb/main/config.hpp

+6
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,12 @@ struct DBConfigOptions {
290290
set<string> allowed_directories;
291291
//! The log configuration
292292
LogConfig log_config = LogConfig();
293+
//! Partially process tasks before rescheduling - allows for more scheduler fairness between separate queries
294+
#ifdef DUCKDB_ALTERNATIVE_VERIFY
295+
bool scheduler_process_partial = true;
296+
#else
297+
bool scheduler_process_partial = false;
298+
#endif
293299

294300
bool operator==(const DBConfigOptions &other) const;
295301
};

src/duckdb/src/include/duckdb/main/settings.hpp

+11
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,17 @@ struct ScalarSubqueryErrorOnMultipleRowsSetting {
11221122
static Value GetSetting(const ClientContext &context);
11231123
};
11241124

1125+
struct SchedulerProcessPartialSetting {
1126+
using RETURN_TYPE = bool;
1127+
static constexpr const char *Name = "scheduler_process_partial";
1128+
static constexpr const char *Description =
1129+
"Partially process tasks before rescheduling - allows for more scheduler fairness between separate queries";
1130+
static constexpr const char *InputType = "BOOLEAN";
1131+
static void SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &parameter);
1132+
static void ResetGlobal(DatabaseInstance *db, DBConfig &config);
1133+
static Value GetSetting(const ClientContext &context);
1134+
};
1135+
11251136
struct SchemaSetting {
11261137
using RETURN_TYPE = string;
11271138
static constexpr const char *Name = "schema";

src/duckdb/src/include/duckdb/parallel/task.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#pragma once
1010

1111
#include "duckdb/common/common.hpp"
12+
#include "duckdb/common/optional_ptr.hpp"
1213

1314
namespace duckdb {
1415
class ClientContext;
@@ -50,6 +51,9 @@ class Task : public enable_shared_from_this<Task> {
5051
virtual bool TaskBlockedOnResult() const {
5152
return false;
5253
}
54+
55+
public:
56+
optional_ptr<ProducerToken> token;
5357
};
5458

5559
} // namespace duckdb

src/duckdb/src/main/config.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ static const ConfigurationOption internal_options[] = {
161161
DUCKDB_LOCAL(ProfilingModeSetting),
162162
DUCKDB_LOCAL(ProgressBarTimeSetting),
163163
DUCKDB_LOCAL(ScalarSubqueryErrorOnMultipleRowsSetting),
164+
DUCKDB_GLOBAL(SchedulerProcessPartialSetting),
164165
DUCKDB_LOCAL(SchemaSetting),
165166
DUCKDB_LOCAL(SearchPathSetting),
166167
DUCKDB_GLOBAL(SecretDirectorySetting),

src/duckdb/src/main/settings/autogenerated_settings.cpp

+16
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,22 @@ Value ScalarSubqueryErrorOnMultipleRowsSetting::GetSetting(const ClientContext &
11171117
return Value::BOOLEAN(config.scalar_subquery_error_on_multiple_rows);
11181118
}
11191119

1120+
//===----------------------------------------------------------------------===//
1121+
// Scheduler Process Partial
1122+
//===----------------------------------------------------------------------===//
1123+
void SchedulerProcessPartialSetting::SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &input) {
1124+
config.options.scheduler_process_partial = input.GetValue<bool>();
1125+
}
1126+
1127+
void SchedulerProcessPartialSetting::ResetGlobal(DatabaseInstance *db, DBConfig &config) {
1128+
config.options.scheduler_process_partial = DBConfig().options.scheduler_process_partial;
1129+
}
1130+
1131+
Value SchedulerProcessPartialSetting::GetSetting(const ClientContext &context) {
1132+
auto &config = DBConfig::GetConfig(context);
1133+
return Value::BOOLEAN(config.options.scheduler_process_partial);
1134+
}
1135+
11201136
//===----------------------------------------------------------------------===//
11211137
// Zstd Min String Length
11221138
//===----------------------------------------------------------------------===//

src/duckdb/src/parallel/task_executor.cpp

-2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ BaseExecutorTask::BaseExecutorTask(TaskExecutor &executor) : executor(executor)
6363
}
6464

6565
TaskExecutionResult BaseExecutorTask::Execute(TaskExecutionMode mode) {
66-
(void)mode;
67-
D_ASSERT(mode == TaskExecutionMode::PROCESS_ALL);
6866
if (executor.HasError()) {
6967
// another task encountered an error - bailout
7068
executor.FinishTask();

0 commit comments

Comments
 (0)