Skip to content

Commit c4f8102

Browse files
authored
Merge pull request #599 from pdet/merge
V1.4 -> Main
2 parents 42763ee + ef7b362 commit c4f8102

14 files changed

+297
-31
lines changed

duckdb

Submodule duckdb updated 2165 files

src/functions/ducklake_cleanup_files.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ static unique_ptr<FunctionData> CleanupBind(ClientContext &context, TableFunctio
6464
bool cleanup_all = false;
6565
for (auto &entry : input.named_parameters) {
6666
if (StringUtil::CIEquals(entry.first, "dry_run")) {
67-
result->dry_run = true;
67+
result->dry_run = entry.second.GetValue<bool>();
68+
;
6869
} else if (StringUtil::CIEquals(entry.first, "cleanup_all")) {
69-
cleanup_all = true;
70+
cleanup_all = entry.second.GetValue<bool>();
7071
} else if (StringUtil::CIEquals(entry.first, "older_than")) {
7172
from_timestamp = entry.second.GetValue<timestamp_tz_t>();
7273
has_timestamp = true;

src/functions/ducklake_compaction_functions.cpp

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class DuckLakeLogicalCompaction : public LogicalExtensionOperator {
141141
class DuckLakeCompactor {
142142
public:
143143
DuckLakeCompactor(ClientContext &context, DuckLakeCatalog &catalog, DuckLakeTransaction &transaction,
144-
Binder &binder, TableIndex table_id);
144+
Binder &binder, TableIndex table_id, uint64_t max_files);
145145
DuckLakeCompactor(ClientContext &context, DuckLakeCatalog &catalog, DuckLakeTransaction &transaction,
146146
Binder &binder, TableIndex table_id, double delete_threshold);
147147
void GenerateCompactions(DuckLakeTableEntry &table, vector<unique_ptr<LogicalOperator>> &compactions);
@@ -154,14 +154,15 @@ class DuckLakeCompactor {
154154
Binder &binder;
155155
TableIndex table_id;
156156
double delete_threshold = 0.95;
157+
optional_idx max_files;
157158

158159
CompactionType type;
159160
};
160161

161162
DuckLakeCompactor::DuckLakeCompactor(ClientContext &context, DuckLakeCatalog &catalog, DuckLakeTransaction &transaction,
162-
Binder &binder, TableIndex table_id)
163+
Binder &binder, TableIndex table_id, uint64_t max_files)
163164
: context(context), catalog(catalog), transaction(transaction), binder(binder), table_id(table_id),
164-
type(CompactionType::MERGE_ADJACENT_TABLES) {
165+
max_files(max_files), type(CompactionType::MERGE_ADJACENT_TABLES) {
165166
}
166167

167168
DuckLakeCompactor::DuckLakeCompactor(ClientContext &context, DuckLakeCatalog &catalog, DuckLakeTransaction &transaction,
@@ -249,6 +250,7 @@ void DuckLakeCompactor::GenerateCompactions(DuckLakeTableEntry &table,
249250
}
250251
// we have gathered all the candidate files per compaction group
251252
// iterate over them to generate actual compaction commands
253+
uint64_t compacted_files = 0;
252254
for (auto &entry : candidates) {
253255
auto &candidate_list = entry.second.candidate_files;
254256
if (candidate_list.size() <= 1) {
@@ -285,13 +287,21 @@ void DuckLakeCompactor::GenerateCompactions(DuckLakeTableEntry &table,
285287

286288
if (start_idx < compaction_idx) {
287289
idx_t compaction_file_count = compaction_idx - start_idx;
290+
if (compaction_file_count == 1) {
291+
// If we only have one file to compact, we have nothing to compact
292+
continue;
293+
}
288294
vector<DuckLakeCompactionFileEntry> compaction_files;
289295
for (idx_t i = start_idx; i < compaction_idx; i++) {
290296
compaction_files.push_back(std::move(files[candidate_list[i]]));
291297
}
292298
compactions.push_back(GenerateCompactionCommand(std::move(compaction_files)));
293299
start_idx += compaction_file_count - 1;
294300
}
301+
compacted_files++;
302+
if (compacted_files >= max_files.GetIndex()) {
303+
break;
304+
}
295305
}
296306
}
297307
}
@@ -494,10 +504,11 @@ static unique_ptr<LogicalOperator> GenerateCompactionOperator(TableFunctionBindI
494504
static void GenerateCompaction(ClientContext &context, DuckLakeTransaction &transaction,
495505
DuckLakeCatalog &ducklake_catalog, TableFunctionBindInput &input,
496506
DuckLakeTableEntry &cur_table, CompactionType type, double delete_threshold,
497-
vector<unique_ptr<LogicalOperator>> &compactions) {
507+
uint64_t max_files, vector<unique_ptr<LogicalOperator>> &compactions) {
498508
switch (type) {
499509
case CompactionType::MERGE_ADJACENT_TABLES: {
500-
DuckLakeCompactor compactor(context, ducklake_catalog, transaction, *input.binder, cur_table.GetTableId());
510+
DuckLakeCompactor compactor(context, ducklake_catalog, transaction, *input.binder, cur_table.GetTableId(),
511+
max_files);
501512
compactor.GenerateCompactions(cur_table, compactions);
502513
break;
503514
}
@@ -532,6 +543,17 @@ unique_ptr<LogicalOperator> BindCompaction(ClientContext &context, TableFunction
532543
}
533544

534545
vector<unique_ptr<LogicalOperator>> compactions;
546+
uint64_t max_files = NumericLimits<uint64_t>::Maximum() - 1;
547+
auto max_files_entry = input.named_parameters.find("max_compacted_files");
548+
if (max_files_entry != input.named_parameters.end()) {
549+
if (max_files_entry->second.IsNull()) {
550+
throw BinderException("The max_compacted_files option must be a non-null integer.");
551+
}
552+
max_files = UBigIntValue::Get(max_files_entry->second);
553+
if (max_files == 0) {
554+
throw BinderException("The max_compacted_files option must be greater than zero.");
555+
}
556+
}
535557
if (input.inputs.size() == 1) {
536558
if (schema.empty() && table.empty()) {
537559
// No default schema/table, we will perform rewrites on deletes in the whole database
@@ -541,7 +563,7 @@ unique_ptr<LogicalOperator> BindCompaction(ClientContext &context, TableFunction
541563
if (entry.type == CatalogType::TABLE_ENTRY) {
542564
auto &cur_table = entry.Cast<DuckLakeTableEntry>();
543565
GenerateCompaction(context, transaction, ducklake_catalog, input, cur_table, type,
544-
delete_threshold, compactions);
566+
delete_threshold, max_files, compactions);
545567
}
546568
});
547569
}
@@ -554,7 +576,7 @@ unique_ptr<LogicalOperator> BindCompaction(ClientContext &context, TableFunction
554576
if (entry.type == CatalogType::TABLE_ENTRY) {
555577
auto &cur_table = entry.Cast<DuckLakeTableEntry>();
556578
GenerateCompaction(context, transaction, ducklake_catalog, input, cur_table, type, delete_threshold,
557-
compactions);
579+
max_files, compactions);
558580
}
559581
});
560582
return GenerateCompactionOperator(input, bind_index, compactions);
@@ -568,10 +590,11 @@ unique_ptr<LogicalOperator> BindCompaction(ClientContext &context, TableFunction
568590
if (schema_entry != input.named_parameters.end()) {
569591
schema = StringValue::Get(schema_entry->second);
570592
}
571-
EntryLookupInfo table_lookup(CatalogType::TABLE_ENTRY, table, nullptr, QueryErrorContext());
593+
594+
const EntryLookupInfo table_lookup(CatalogType::TABLE_ENTRY, table, nullptr, QueryErrorContext());
572595
auto table_entry = catalog.GetEntry(context, schema, table_lookup, OnEntryNotFound::THROW_EXCEPTION);
573596
auto &ducklake_table = table_entry->Cast<DuckLakeTableEntry>();
574-
GenerateCompaction(context, transaction, ducklake_catalog, input, ducklake_table, type, delete_threshold,
597+
GenerateCompaction(context, transaction, ducklake_catalog, input, ducklake_table, type, delete_threshold, max_files,
575598
compactions);
576599

577600
return GenerateCompactionOperator(input, bind_index, compactions);
@@ -591,6 +614,7 @@ TableFunctionSet DuckLakeMergeAdjacentFilesFunction::GetFunctions() {
591614
function.bind_operator = MergeAdjacentFilesBind;
592615
if (type.size() == 2) {
593616
function.named_parameters["schema"] = LogicalType::VARCHAR;
617+
function.named_parameters["max_compacted_files"] = LogicalType::UBIGINT;
594618
}
595619
set.AddFunction(function);
596620
}

src/include/common/ducklake_options.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ struct DuckLakeOptions {
3131
bool create_if_not_exists = true;
3232
bool migrate_if_required = true;
3333
unique_ptr<BoundAtClause> at_clause;
34-
unordered_map<string, Value> metadata_parameters;
34+
case_insensitive_map_t<Value> metadata_parameters;
3535
option_map_t config_options;
3636
map<SchemaIndex, option_map_t> schema_options;
3737
map<TableIndex, option_map_t> table_options;

src/storage/ducklake_table_entry.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,7 @@ static bool TypePromotionIsAllowedUSmallint(const LogicalType &to) {
629629
}
630630
}
631631
bool TypePromotionIsAllowed(const LogicalType &source, const LogicalType &target) {
632-
// FIXME: support DECIMAL, and DATE -> TIMESTAMP
632+
// FIXME: Rework to use DUCKDB_API static LogicalType MaxLogicalType
633633
switch (source.id()) {
634634
case LogicalTypeId::TINYINT:
635635
return TypePromotionIsAllowedTinyint(target);
@@ -649,6 +649,8 @@ bool TypePromotionIsAllowed(const LogicalType &source, const LogicalType &target
649649
return false;
650650
case LogicalTypeId::FLOAT:
651651
return target.id() == LogicalTypeId::DOUBLE;
652+
case LogicalTypeId::TIMESTAMP:
653+
return target.id() == LogicalTypeId::TIMESTAMP_TZ;
652654
default:
653655
return false;
654656
}

src/storage/ducklake_update.cpp

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,24 @@ unique_ptr<LocalSinkState> DuckLakeUpdate::GetLocalSinkState(ExecutionContext &c
6767
delete_types.emplace_back(LogicalType::UBIGINT);
6868
delete_types.emplace_back(LogicalType::BIGINT);
6969

70-
vector<LogicalType> insert_types;
70+
vector<LogicalType> expression_types;
7171
result->expression_executor = make_uniq<ExpressionExecutor>(context.client, expressions);
7272
for (auto &expr : result->expression_executor->expressions) {
73-
insert_types.push_back(expr->return_type);
73+
expression_types.push_back(expr->return_type);
7474
}
7575

76-
for (auto &type : insert_types) {
76+
for (auto &type : expression_types) {
7777
if (DuckLakeTypes::RequiresCast(type)) {
7878
type = DuckLakeTypes::GetCastedType(type);
7979
}
8080
}
81-
result->update_expression_chunk.Initialize(context.client, insert_types);
82-
// updates also write the row id to the file, so the final version needs the row_id
83-
insert_types.push_back(LogicalType::BIGINT);
81+
result->update_expression_chunk.Initialize(context.client, expression_types);
82+
// updates also write the row id to the file, so the final version needs the row_id placed
83+
// right after the physical columns (before computed partition columns).
84+
vector<LogicalType> insert_types = expression_types;
85+
auto physical_column_count = columns.size();
86+
D_ASSERT(physical_column_count <= insert_types.size());
87+
insert_types.insert(insert_types.begin() + physical_column_count, LogicalType::BIGINT);
8488
result->insert_chunk.Initialize(context.client, insert_types);
8589

8690
result->delete_chunk.Initialize(context.client, delete_types);
@@ -101,12 +105,27 @@ SinkResultType DuckLakeUpdate::Sink(ExecutionContext &context, DataChunk &chunk,
101105
insert_chunk.SetCardinality(chunk.size());
102106
lstate.expression_executor->Execute(chunk, update_expression_chunk);
103107

104-
// We reference all columns we created in our updates
105-
for (idx_t i = 0; i < update_expression_chunk.ColumnCount(); i++) {
108+
const idx_t physical_column_count = columns.size();
109+
const idx_t expression_column_count = update_expression_chunk.ColumnCount();
110+
D_ASSERT(expression_column_count >= physical_column_count);
111+
const idx_t partition_column_count = expression_column_count - physical_column_count;
112+
const idx_t insert_column_count = insert_chunk.ColumnCount();
113+
D_ASSERT(insert_column_count >= expression_column_count);
114+
// virtual columns (PlanUpdate sets WRITE_ROW_ID, so there is exactly one: the row id) must sit between the physical and partition columns
115+
const idx_t virtual_column_count = insert_column_count - expression_column_count;
116+
D_ASSERT(virtual_column_count == 1);
117+
118+
// copy the physical columns directly
119+
for (idx_t i = 0; i < physical_column_count; i++) {
106120
insert_chunk.data[i].Reference(update_expression_chunk.data[i]);
107121
}
108-
109-
insert_chunk.data[insert_chunk.data.size() - 1].Reference(chunk.data[row_id_index]);
122+
// reference the row id right after the physical columns
123+
insert_chunk.data[physical_column_count].Reference(chunk.data[row_id_index]);
124+
// place computed partition columns after the virtual columns
125+
for (idx_t part_idx = 0; part_idx < partition_column_count; part_idx++) {
126+
insert_chunk.data[physical_column_count + virtual_column_count + part_idx]
127+
.Reference(update_expression_chunk.data[physical_column_count + part_idx]);
128+
}
110129

111130
OperatorSinkInput copy_input {*copy_op.sink_state, *lstate.copy_local_state, input.interrupt_state};
112131
copy_op.Sink(context, insert_chunk, copy_input);

test/configs/postgres.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
"ducklake",
99
"httpfs",
1010
"postgres_scanner",
11-
"icu"
11+
"icu",
12+
"httpfs"
1213
],
1314
"test_env": [{
1415
"env_name": "DUCKLAKE_CONNECTION",

test/configs/sqlite.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
"ducklake",
88
"httpfs",
99
"sqlite_scanner",
10-
"icu"
10+
"icu",
11+
"httpfs"
1112
],
1213
"test_env": [{
1314
"env_name": "DUCKLAKE_CONNECTION",
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# name: test/sql/alter/alter_timestamptz_promotion.test
2+
# description: test ducklake timestamp to timestamptz promotion
3+
# group: [alter]
4+
5+
require ducklake
6+
7+
require parquet
8+
9+
test-env DUCKLAKE_CONNECTION __TEST_DIR__/{UUID}.db
10+
11+
test-env DATA_PATH __TEST_DIR__
12+
13+
14+
statement ok
15+
ATTACH 'ducklake:${DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '${DATA_PATH}/alter_timestamptz_promotion')
16+
17+
statement ok
18+
CREATE TABLE ducklake.test(col1 TIMESTAMP);
19+
20+
statement ok
21+
INSERT INTO ducklake.test VALUES ('2025-01-15 12:30:45'::TIMESTAMP)
22+
23+
statement ok
24+
ALTER TABLE ducklake.test ALTER COLUMN col1 TYPE TIMESTAMPTZ;
25+
26+
statement ok
27+
INSERT INTO ducklake.test VALUES ('2025-01-15 12:30:45'::TIMESTAMPTZ)
28+
29+
query I
30+
FROM ducklake.test
31+
----
32+
2025-01-15 12:30:45+00
33+
2025-01-15 12:30:45+00
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# name: test/sql/cleanup/cleanup_old_files.test
2+
# description: Cleanup files when creating and dropping a table
3+
# group: [cleanup]
4+
5+
require ducklake
6+
7+
require parquet
8+
9+
test-env DUCKLAKE_CONNECTION __TEST_DIR__/{UUID}.db
10+
11+
test-env DATA_PATH __TEST_DIR__
12+
13+
statement ok
14+
ATTACH 'ducklake:${DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '${DATA_PATH}/cleanup_old_files')
15+
16+
statement ok
17+
USE ducklake;
18+
19+
statement ok
20+
CREATE TABLE t (x INT);
21+
22+
statement ok
23+
INSERT INTO t VALUES (1), (2), (3);
24+
25+
statement ok
26+
INSERT INTO t VALUES (4), (5);
27+
28+
statement ok
29+
DELETE FROM t WHERE x <= 2;
30+
31+
statement ok
32+
INSERT INTO t VALUES (6), (7);
33+
34+
statement ok
35+
CALL ducklake_rewrite_data_files('ducklake', 't');
36+
37+
statement ok
38+
CALL ducklake_merge_adjacent_files('ducklake');
39+
40+
statement ok
41+
CALL ducklake_expire_snapshots('ducklake', older_than => now());
42+
43+
query I
44+
SELECT COUNT(*) FROM ducklake_cleanup_old_files('ducklake', dry_run => true, cleanup_all => true);
45+
----
46+
2
47+
48+
statement ok
49+
CALL ducklake_cleanup_old_files('ducklake', dry_run => false, cleanup_all => true);
50+
51+
query I
52+
SELECT COUNT(*) FROM ducklake_cleanup_old_files('ducklake', dry_run => true, cleanup_all => true);
53+
----
54+
0

0 commit comments

Comments
 (0)