diff --git a/src/include/postgres_scanner.hpp b/src/include/postgres_scanner.hpp index e7f3f993..52d6d0d2 100644 --- a/src/include/postgres_scanner.hpp +++ b/src/include/postgres_scanner.hpp @@ -26,6 +26,7 @@ struct PostgresBindData : public FunctionData { string schema_name; string table_name; string sql; + string limit; idx_t pages_approx = 0; vector postgres_types; diff --git a/src/postgres_scanner.cpp b/src/postgres_scanner.cpp index 766a3b39..db0d51a5 100644 --- a/src/postgres_scanner.cpp +++ b/src/postgres_scanner.cpp @@ -252,14 +252,14 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData if (bind_data->table_name.empty()) { D_ASSERT(!bind_data->sql.empty()); lstate.sql = StringUtil::Format( - R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s) TO STDOUT (FORMAT "binary");)", - col_names, bind_data->sql, filter); + R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s%s) TO STDOUT (FORMAT "binary");)", + col_names, bind_data->sql, filter, bind_data->limit); } else { lstate.sql = StringUtil::Format( - R"(COPY (SELECT %s FROM %s.%s %s) TO STDOUT (FORMAT "binary");)", + R"(COPY (SELECT %s FROM %s.%s %s%s) TO STDOUT (FORMAT "binary");)", col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'), - KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter); + KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter, bind_data->limit); } lstate.exec = false; lstate.done = false; diff --git a/src/storage/postgres_optimizer.cpp b/src/storage/postgres_optimizer.cpp index 0c83cdcb..393eb7e2 100644 --- a/src/storage/postgres_optimizer.cpp +++ b/src/storage/postgres_optimizer.cpp @@ -3,15 +3,79 @@ #include "storage/postgres_transaction.hpp" #include "storage/postgres_optimizer.hpp" #include "duckdb/planner/operator/logical_get.hpp" +#include "duckdb/planner/operator/logical_limit.hpp" #include "storage/postgres_catalog.hpp" #include "postgres_scanner.hpp" + namespace duckdb { struct PostgresOperators { reference_map_t>> scans; }; +static void OptimizePostgresScanLimitPushdown(unique_ptr &op) { + if (op->type == LogicalOperatorType::LOGICAL_LIMIT) { + auto &limit = op->Cast(); + reference child = *op->children[0]; + + while (child.get().type == LogicalOperatorType::LOGICAL_PROJECTION) { + child = *child.get().children[0]; + } + + if (child.get().type != LogicalOperatorType::LOGICAL_GET) { + OptimizePostgresScanLimitPushdown(op->children[0]); + return; + } + + auto &get = child.get().Cast(); + if (!PostgresCatalog::IsPostgresScan(get.function.name)) { + OptimizePostgresScanLimitPushdown(op->children[0]); + return; + } + + switch (limit.limit_val.Type()) { + case LimitNodeType::CONSTANT_VALUE: + case LimitNodeType::UNSET: + break; + default: + // not a constant or unset limit + OptimizePostgresScanLimitPushdown(op->children[0]); + return; + } + switch (limit.offset_val.Type()) { + case LimitNodeType::CONSTANT_VALUE: + case LimitNodeType::UNSET: + break; + default: + // not a constant or unset offset + OptimizePostgresScanLimitPushdown(op->children[0]); + return; + } + + auto &bind_data = get.bind_data->Cast(); + + string generated_limit_clause = ""; + if (limit.limit_val.Type() != LimitNodeType::UNSET) { + generated_limit_clause += " LIMIT " + to_string(limit.limit_val.GetConstantValue()); + } + if (limit.offset_val.Type() != LimitNodeType::UNSET) { + generated_limit_clause += " OFFSET " + to_string(limit.offset_val.GetConstantValue()); + } + + if (!generated_limit_clause.empty()) { + bind_data.limit = generated_limit_clause; + + op = std::move(op->children[0]); + return; + } + } + + for (auto &child : op->children) { + OptimizePostgresScanLimitPushdown(child); + } +} + void GatherPostgresScans(LogicalOperator &op, PostgresOperators &result) { if (op.type == LogicalOperatorType::LOGICAL_GET) { auto &get = op.Cast(); @@ -35,6 +99,8 @@ void GatherPostgresScans(LogicalOperator &op, PostgresOperators &result) { } void PostgresOptimizer::Optimize(OptimizerExtensionInput &input, unique_ptr &plan) { + // look at query plan and check if we can find LIMIT/OFFSET to pushdown + OptimizePostgresScanLimitPushdown(plan); // look at the query plan and check if we can enable streaming query scans PostgresOperators operators; GatherPostgresScans(*plan, operators); diff --git a/test/sql/storage/limit.test b/test/sql/storage/limit.test new file mode 100644 index 00000000..9854bcd8 --- /dev/null +++ b/test/sql/storage/limit.test @@ -0,0 +1,31 @@ +# name: test/sql/storage/limit.test +# description: Test limit on an attached table +# group: [storage] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES) + +statement ok +CREATE OR REPLACE TABLE s.large_tbl AS FROM range(100000) t(i) + +query I +FROM s.large_tbl LIMIT 5 +---- +0 +1 +2 +3 +4 + +query I +FROM s.large_tbl LIMIT 5 OFFSET 5 +---- +5 +6 +7 +8 +9