From 7503d6771905f81573a6dbc863afc9252ea15946 Mon Sep 17 00:00:00 2001 From: Noah Isaksen Date: Wed, 26 Mar 2025 17:51:43 -0400 Subject: [PATCH 1/6] implement limit pushdown for postgres scanning --- src/include/postgres_scanner.hpp | 1 + src/postgres_scanner.cpp | 8 ++-- src/storage/postgres_optimizer.cpp | 66 ++++++++++++++++++++++++++++++ test/sql/storage/limit.test | 31 ++++++++++++++ 4 files changed, 102 insertions(+), 4 deletions(-) create mode 100644 test/sql/storage/limit.test diff --git a/src/include/postgres_scanner.hpp b/src/include/postgres_scanner.hpp index e7f3f993..52d6d0d2 100644 --- a/src/include/postgres_scanner.hpp +++ b/src/include/postgres_scanner.hpp @@ -26,6 +26,7 @@ struct PostgresBindData : public FunctionData { string schema_name; string table_name; string sql; + string limit; idx_t pages_approx = 0; vector postgres_types; diff --git a/src/postgres_scanner.cpp b/src/postgres_scanner.cpp index 766a3b39..db0d51a5 100644 --- a/src/postgres_scanner.cpp +++ b/src/postgres_scanner.cpp @@ -252,14 +252,14 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData if (bind_data->table_name.empty()) { D_ASSERT(!bind_data->sql.empty()); lstate.sql = StringUtil::Format( - R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s) TO STDOUT (FORMAT "binary");)", - col_names, bind_data->sql, filter); + R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s%s) TO STDOUT (FORMAT "binary");)", + col_names, bind_data->sql, filter, bind_data->limit); } else { lstate.sql = StringUtil::Format( - R"(COPY (SELECT %s FROM %s.%s %s) TO STDOUT (FORMAT "binary");)", + R"(COPY (SELECT %s FROM %s.%s %s%s) TO STDOUT (FORMAT "binary");)", col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'), - KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter); + KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter, bind_data->limit); } lstate.exec = false; lstate.done = false; diff --git a/src/storage/postgres_optimizer.cpp b/src/storage/postgres_optimizer.cpp index 0c83cdcb..fc399c1d 100644 --- a/src/storage/postgres_optimizer.cpp +++ b/src/storage/postgres_optimizer.cpp @@ -3,15 +3,79 @@ #include "storage/postgres_transaction.hpp" #include "storage/postgres_optimizer.hpp" #include "duckdb/planner/operator/logical_get.hpp" +#include "duckdb/planner/operator/logical_limit.hpp" #include "storage/postgres_catalog.hpp" #include "postgres_scanner.hpp" + namespace duckdb { struct PostgresOperators { reference_map_t>> scans; }; +static void OptimizePostgresScanLimitPushdown(unique_ptr &op) { + if (op->type == LogicalOperatorType::LOGICAL_LIMIT) { + auto &limit = op->Cast(); + reference child = *op->children[0]; + + while (child.get().type == LogicalOperatorType::LOGICAL_PROJECTION) { + child = *child.get().children[0]; + } + + if (child.get().type != LogicalOperatorType::LOGICAL_GET) { + OptimizePostgresScanLimitPushdown(op->children[0]); + return; + } + + auto &get = child.get().Cast(); + if (!PostgresCatalog::IsPostgresScan(get.function.name)) { + OptimizePostgresScanLimitPushdown(op->children[0]); + return; + } + + switch (limit.limit_val.Type()) { + case LimitNodeType::CONSTANT_VALUE: + case LimitNodeType::UNSET: + break; + default: + // not a constant or unset limit + OptimizePostgresScanLimitPushdown(op->children[0]); + return; + } + switch (limit.offset_val.Type()) { + case LimitNodeType::CONSTANT_VALUE: + case LimitNodeType::UNSET: + break; + default: + // not a constant or unset offset + OptimizePostgresScanLimitPushdown(op->children[0]); + return; + } + + auto &bind_data = get.bind_data->Cast(); + + string generated_limit_clause = ""; + if (limit.limit_val.Type() != LimitNodeType::UNSET) { + generated_limit_clause += " LIMIT " + to_string(limit.limit_val.GetConstantValue()); + } + if (limit.offset_val.Type() != LimitNodeType::UNSET) { + generated_limit_clause += " OFFSET " + to_string(limit.offset_val.GetConstantValue()); + } + + if (!generated_limit_clause.empty()) { + bind_data.limit = generated_limit_clause; + + op = std::move(op->children[0]); + return; + } + } + + for (auto &child : op->children) { + OptimizePostgresScanLimitPushdown(child); + } +} + void GatherPostgresScans(LogicalOperator &op, PostgresOperators &result) { if (op.type == LogicalOperatorType::LOGICAL_GET) { auto &get = op.Cast(); @@ -35,6 +99,8 @@ void GatherPostgresScans(LogicalOperator &op, PostgresOperators &result) { } void PostgresOptimizer::Optimize(OptimizerExtensionInput &input, unique_ptr &plan) { + // look at query plan and check if we can find LIMIT/OFFSET to pushdown + OptimizePostgresScanLimitPushdown(plan); // look at the query plan and check if we can enable streaming query scans PostgresOperators operators; GatherPostgresScans(*plan, operators); diff --git a/test/sql/storage/limit.test b/test/sql/storage/limit.test new file mode 100644 index 00000000..9854bcd8 --- /dev/null +++ b/test/sql/storage/limit.test @@ -0,0 +1,31 @@ +# name: test/sql/storage/limit.test +# description: Test limit on an attached table +# group: [storage] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES) + +statement ok +CREATE OR REPLACE TABLE s.large_tbl AS FROM range(100000) t(i) + +query I +FROM s.large_tbl LIMIT 5 +---- +0 +1 +2 +3 +4 + +query I +FROM s.large_tbl LIMIT 5 OFFSET 5 +---- +5 +6 +7 +8 +9 From 17ee27ab6345e6da674180c0a225df4dbab34e73 Mon Sep 17 00:00:00 2001 From: Noah Isaksen Date: Thu, 27 Mar 2025 08:08:03 -0400 Subject: [PATCH 2/6] Remove deeper loop --- src/storage/postgres_optimizer.cpp | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/storage/postgres_optimizer.cpp b/src/storage/postgres_optimizer.cpp index fc399c1d..dbd95534 100644 --- a/src/storage/postgres_optimizer.cpp +++ b/src/storage/postgres_optimizer.cpp @@ -24,13 +24,11 @@ static void OptimizePostgresScanLimitPushdown(unique_ptr &op) { } if (child.get().type != LogicalOperatorType::LOGICAL_GET) { - OptimizePostgresScanLimitPushdown(op->children[0]); return; } auto &get = child.get().Cast(); if (!PostgresCatalog::IsPostgresScan(get.function.name)) { - OptimizePostgresScanLimitPushdown(op->children[0]); return; } @@ -39,8 +37,7 @@ static void OptimizePostgresScanLimitPushdown(unique_ptr &op) { case LimitNodeType::UNSET: break; default: - // not a constant or unset limit - OptimizePostgresScanLimitPushdown(op->children[0]); + // not a constant or unset limit return; } switch (limit.offset_val.Type()) { @@ -48,8 +45,7 @@ static void OptimizePostgresScanLimitPushdown(unique_ptr &op) { case LimitNodeType::UNSET: break; default: - // not a constant or unset offset - OptimizePostgresScanLimitPushdown(op->children[0]); + // not a constant or unset offset return; } @@ -63,12 +59,8 @@ static void OptimizePostgresScanLimitPushdown(unique_ptr &op) { generated_limit_clause += " OFFSET " + to_string(limit.offset_val.GetConstantValue()); } - if (!generated_limit_clause.empty()) { - bind_data.limit = generated_limit_clause; - - op = std::move(op->children[0]); - return; - } + op = std::move(op->children[0]); + return; } for (auto &child : op->children) { From d3e3d8a1da58d8f4f7252b7ae0785329a8c9e93e Mon Sep 17 00:00:00 2001 From: Noah Isaksen Date: Thu, 27 Mar 2025 08:51:24 -0400 Subject: [PATCH 3/6] Update postgres_optimizer.cpp --- src/storage/postgres_optimizer.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/storage/postgres_optimizer.cpp b/src/storage/postgres_optimizer.cpp index dbd95534..393eb7e2 100644 --- a/src/storage/postgres_optimizer.cpp +++ b/src/storage/postgres_optimizer.cpp @@ -24,11 +24,13 @@ static void OptimizePostgresScanLimitPushdown(unique_ptr &op) { } if (child.get().type != LogicalOperatorType::LOGICAL_GET) { + OptimizePostgresScanLimitPushdown(op->children[0]); return; } auto &get = child.get().Cast(); if (!PostgresCatalog::IsPostgresScan(get.function.name)) { + OptimizePostgresScanLimitPushdown(op->children[0]); return; } @@ -38,6 +40,7 @@ static void OptimizePostgresScanLimitPushdown(unique_ptr &op) { break; default: // not a constant or unset limit + OptimizePostgresScanLimitPushdown(op->children[0]); return; } switch (limit.offset_val.Type()) { @@ -46,6 +49,7 @@ static void OptimizePostgresScanLimitPushdown(unique_ptr &op) { break; default: // not a constant or unset offset + OptimizePostgresScanLimitPushdown(op->children[0]); return; } @@ -59,8 +63,12 @@ static void OptimizePostgresScanLimitPushdown(unique_ptr &op) { generated_limit_clause += " OFFSET " + to_string(limit.offset_val.GetConstantValue()); } - op = std::move(op->children[0]); - return; + if (!generated_limit_clause.empty()) { + bind_data.limit = generated_limit_clause; + + op = std::move(op->children[0]); + return; + } } for (auto &child : op->children) { From d14a77f4d40b6a9c5acb31af8b98d7916e45dfe3 Mon Sep 17 00:00:00 2001 From: Daniel Lietz Date: Tue, 1 Apr 2025 16:19:40 +0200 Subject: [PATCH 4/6] bump duckdb to v1.2.1 --- duckdb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/duckdb b/duckdb index c8fa9aee..8e52ec43 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit c8fa9aee7858909c625b5c3abcc3a257c5d9d934 +Subproject commit 8e52ec43959ab363643d63cb78ee214577111da4 From 1d9e8a0307cd7fa092520f04bb16939ad845b6b4 Mon Sep 17 00:00:00 2001 From: Daniel Lietz Date: Thu, 3 Apr 2025 16:05:30 +0200 Subject: [PATCH 5/6] bump extension-ci-tools to v1.2.1 --- extension-ci-tools | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension-ci-tools b/extension-ci-tools index f4735531..58970c53 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit f473553168fd1db490aaa9f440b8f812af0568da +Subproject commit 58970c538d35919db875096460c05806056f4de0 From 71c46ff9f6ad591f61973f6d55118ef42a4586ae Mon Sep 17 00:00:00 2001 From: Daniel Lietz Date: Thu, 3 Apr 2025 16:17:36 +0200 Subject: [PATCH 6/6] Fix MainDistributionPipeline at v1.2.1 --- .github/workflows/MainDistributionPipeline.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/MainDistributionPipeline.yml b/.github/workflows/MainDistributionPipeline.yml index 9fbf6de8..6da920f2 100644 --- a/.github/workflows/MainDistributionPipeline.yml +++ b/.github/workflows/MainDistributionPipeline.yml @@ -16,7 +16,7 @@ jobs: name: Build extension binaries uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main with: - duckdb_version: main + duckdb_version: v1.2.1 ci_tools_version: main extension_name: postgres_scanner exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_mingw' @@ -27,7 +27,7 @@ jobs: uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@main secrets: inherit with: - duckdb_version: main + duckdb_version: v1.2.1 ci_tools_version: main extension_name: postgres_scanner exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_mingw'