Skip to content

Commit 9ea169b

Browse files
authored
Merge pull request #346 from Mytherin/connectionisbusy
Fix #344: correctly finish binary copy in Postgres result reading to reset the connection back to idle
2 parents 03f7c18 + 0b2860c commit 9ea169b

10 files changed

+73
-40
lines changed

src/include/postgres_binary_reader.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ struct PostgresBinaryReader : public PostgresResultReader {
2424

2525
protected:
2626
bool Next();
27-
void CheckResult();
2827

2928
void Reset();
3029
bool Ready();

src/include/postgres_scanner.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ class PostgresTransaction;
2222
struct PostgresBindData : public FunctionData {
2323
static constexpr const idx_t DEFAULT_PAGES_PER_TASK = 1000;
2424

25+
public:
26+
PostgresBindData(ClientContext &context);
27+
2528
PostgresVersion version;
2629
string schema_name;
2730
string table_name;

src/postgres_binary_reader.cpp

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@ PostgresReadResult PostgresBinaryReader::Read(DataChunk &output) {
2525
while (!Ready()) {
2626
if (!Next()) {
2727
// finished this batch
28-
CheckResult();
2928
return PostgresReadResult::FINISHED;
3029
}
3130
}
3231

3332
// read a row
3433
auto tuple_count = ReadInteger<int16_t>();
35-
if (tuple_count <= 0) { // done here, lets try to get more
34+
if (tuple_count <= 0) {
35+
// tuple_count of -1 signifies the file trailer (i.e. footer) - reset and skip
3636
Reset();
37-
return PostgresReadResult::FINISHED;
37+
continue;
3838
}
3939

4040
D_ASSERT(tuple_count == column_ids.size());
@@ -69,12 +69,17 @@ bool PostgresBinaryReader::Next() {
6969

7070
// len -1 signals end
7171
if (len == -1) {
72-
auto final_result = PQgetResult(con.GetConn());
73-
if (!final_result || PQresultStatus(final_result) != PGRES_COMMAND_OK) {
74-
PQclear(final_result);
75-
throw IOException("Failed to fetch header for COPY: %s", string(PQresultErrorMessage(final_result)));
72+
// consume all available results
73+
while (true) {
74+
PostgresResult pg_res(PQgetResult(con.GetConn()));
75+
auto final_result = pg_res.res;
76+
if (!final_result) {
77+
break;
78+
}
79+
if (PQresultStatus(final_result) != PGRES_COMMAND_OK) {
80+
throw IOException("Failed to fetch header for COPY: %s", string(PQresultErrorMessage(final_result)));
81+
}
7682
}
77-
PQclear(final_result);
7883
return false;
7984
}
8085

@@ -89,15 +94,6 @@ bool PostgresBinaryReader::Next() {
8994
return true;
9095
}
9196

92-
void PostgresBinaryReader::CheckResult() {
93-
auto result = PQgetResult(con.GetConn());
94-
if (!result || PQresultStatus(result) != PGRES_COMMAND_OK) {
95-
PQclear(result);
96-
throw std::runtime_error("Failed to execute COPY: " + string(PQresultErrorMessage(result)));
97-
}
98-
PQclear(result);
99-
}
100-
10197
void PostgresBinaryReader::Reset() {
10298
if (buffer) {
10399
PQfreemem(buffer);

src/postgres_connection.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,16 +107,14 @@ vector<unique_ptr<PostgresResult>> PostgresConnection::ExecuteQueries(const stri
107107
if (!res) {
108108
break;
109109
}
110+
auto result = make_uniq<PostgresResult>(res);
110111
if (ResultHasError(res)) {
111-
PQclear(res);
112112
throw std::runtime_error("Failed to execute query \"" + queries +
113113
"\": " + string(PQresultErrorMessage(res)));
114114
}
115115
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
116-
PQclear(res);
117116
continue;
118117
}
119-
auto result = make_uniq<PostgresResult>(res);
120118
results.push_back(std::move(result));
121119
}
122120
return results;

src/postgres_copy_from.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@
44
namespace duckdb {
55

66
void PostgresConnection::BeginCopyFrom(const string &query, ExecStatusType expected_result) {
7-
auto result = PQExecute(query.c_str());
7+
PostgresResult pg_res(PQExecute(query.c_str()));
8+
auto result = pg_res.res;
89
if (!result || PQresultStatus(result) != expected_result) {
9-
PQclear(result);
1010
throw std::runtime_error("Failed to prepare COPY \"" + query + "\": " + string(PQresultErrorMessage(result)));
1111
}
12-
PQclear(result);
1312
}
1413

1514
} // namespace duckdb

src/postgres_copy_to.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,11 @@ void PostgresConnection::BeginCopyTo(ClientContext &context, PostgresCopyState &
5656
}
5757
query += ")";
5858

59-
auto result = PQExecute(query.c_str());
59+
PostgresResult pg_res(PQExecute(query.c_str()));
60+
auto result = pg_res.res;
6061
if (!result || PQresultStatus(result) != PGRES_COPY_IN) {
61-
PQclear(result);
6262
throw std::runtime_error("Failed to prepare COPY \"" + query + "\": " + string(PQresultErrorMessage(result)));
6363
}
64-
PQclear(result);
6564
if (state.format == PostgresCopyFormat::BINARY) {
6665
// binary copy requires a header
6766
PostgresBinaryWriter writer(state);
@@ -106,12 +105,12 @@ void PostgresConnection::FinishCopyTo(PostgresCopyState &state) {
106105
throw InternalException("Error during PQputCopyEnd: %s", PQerrorMessage(GetConn()));
107106
}
108107
// fetch the query result to check for errors
109-
auto result = PQgetResult(GetConn());
108+
PostgresResult pg_res(PQgetResult(GetConn()));
109+
auto result = pg_res.res;
110110
if (!result || PQresultStatus(result) != PGRES_COMMAND_OK) {
111-
PQclear(result);
112-
throw std::runtime_error("Failed to copy data: " + string(PQresultErrorMessage(result)));
111+
string error_msg(PQresultErrorMessage(result));
112+
throw std::runtime_error("Failed to copy data: " + error_msg);
113113
}
114-
PQclear(result);
115114
}
116115

117116
bool NeedsQuotes(const string &to_quote, idx_t size) {

src/postgres_query.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace duckdb {
1111

1212
static unique_ptr<FunctionData> PGQueryBind(ClientContext &context, TableFunctionBindInput &input,
1313
vector<LogicalType> &return_types, vector<string> &names) {
14-
auto result = make_uniq<PostgresBindData>();
14+
auto result = make_uniq<PostgresBindData>(context);
1515

1616
if (input.inputs[0].IsNull() || input.inputs[1].IsNull()) {
1717
throw BinderException("Parameters to postgres_query cannot be NULL");

src/postgres_scanner.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,9 @@ void PostgresScanFunction::PrepareBind(PostgresVersion version, ClientContext &c
115115
if (context.TryGetCurrentSetting("pg_use_ctid_scan", pg_use_ctid_scan)) {
116116
use_ctid_scan = BooleanValue::Get(pg_use_ctid_scan);
117117
}
118-
Value use_text_protocol;
119-
if (context.TryGetCurrentSetting("pg_use_text_protocol", use_text_protocol)) {
120-
if (BooleanValue::Get(use_text_protocol)) {
121-
bind_data.use_text_protocol = true;
122-
use_ctid_scan = false;
123-
}
118+
if (bind_data.use_text_protocol) {
119+
// ctid scan is only supported for binary copy
120+
use_ctid_scan = false;
124121
}
125122

126123
if (version.major_v < 14) {
@@ -135,6 +132,15 @@ void PostgresScanFunction::PrepareBind(PostgresVersion version, ClientContext &c
135132
bind_data.version = version;
136133
}
137134

135+
PostgresBindData::PostgresBindData(ClientContext &context) {
136+
Value text_protocol;
137+
if (context.TryGetCurrentSetting("pg_use_text_protocol", text_protocol)) {
138+
if (BooleanValue::Get(text_protocol)) {
139+
use_text_protocol = true;
140+
}
141+
}
142+
}
143+
138144
void PostgresBindData::SetTablePages(idx_t approx_num_pages) {
139145
this->pages_approx = approx_num_pages;
140146
if (!read_only || use_text_protocol) {
@@ -166,7 +172,7 @@ void PostgresBindData::SetTable(PostgresTableEntry &table) {
166172

167173
static unique_ptr<FunctionData> PostgresBind(ClientContext &context, TableFunctionBindInput &input,
168174
vector<LogicalType> &return_types, vector<string> &names) {
169-
auto bind_data = make_uniq<PostgresBindData>();
175+
auto bind_data = make_uniq<PostgresBindData>(context);
170176

171177
bind_data->dsn = input.inputs[0].GetValue<string>();
172178
bind_data->schema_name = input.inputs[1].GetValue<string>();

src/storage/postgres_table_entry.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ TableFunction PostgresTableEntry::GetScanFunction(ClientContext &context, unique
3939
auto &pg_catalog = catalog.Cast<PostgresCatalog>();
4040
auto &transaction = Transaction::Get(context, catalog).Cast<PostgresTransaction>();
4141

42-
auto result = make_uniq<PostgresBindData>();
42+
auto result = make_uniq<PostgresBindData>(context);
4343

4444
result->schema_name = schema.name;
4545
result->table_name = name;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# name: test/sql/scanner/postgres_query_error.test
2+
# description: Test running postgres_query
3+
# group: [scanner]
4+
5+
require postgres_scanner
6+
7+
require-env POSTGRES_TEST_DATABASE_AVAILABLE
8+
9+
statement ok
10+
ATTACH 'dbname=postgresscanner' AS s1 (TYPE POSTGRES)
11+
12+
statement ok
13+
BEGIN
14+
15+
statement ok
16+
CALL pg_clear_cache();
17+
18+
query III
19+
select * from postgres_query('s1', 'SELECT * FROM cars');
20+
----
21+
ferari testarosa red
22+
aston martin db2 blue
23+
bentley mulsanne gray
24+
ford T black
25+
26+
query III
27+
SELECT * FROM s1.cars
28+
----
29+
ferari testarosa red
30+
aston martin db2 blue
31+
bentley mulsanne gray
32+
ford T black
33+

0 commit comments

Comments
 (0)