-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathpostgres_table_entry.cpp
137 lines (123 loc) · 4.5 KB
/
postgres_table_entry.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include "storage/postgres_catalog.hpp"
#include "storage/postgres_table_entry.hpp"
#include "storage/postgres_transaction.hpp"
#include "duckdb/storage/statistics/base_statistics.hpp"
#include "duckdb/storage/table_storage_info.hpp"
#include "postgres_scanner.hpp"
namespace duckdb {
PostgresTableEntry::PostgresTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info)
: TableCatalogEntry(catalog, schema, info) {
for (idx_t c = 0; c < columns.LogicalColumnCount(); c++) {
auto &col = columns.GetColumnMutable(LogicalIndex(c));
if (col.GetType().HasAlias()) {
col.TypeMutable() = PostgresUtils::RemoveAlias(col.GetType());
}
postgres_types.push_back(PostgresUtils::CreateEmptyPostgresType(col.GetType()));
D_ASSERT(!col.GetName().empty());
postgres_names.push_back(col.GetName());
}
approx_num_pages = 0;
}
PostgresTableEntry::PostgresTableEntry(Catalog &catalog, SchemaCatalogEntry &schema, PostgresTableInfo &info)
: TableCatalogEntry(catalog, schema, *info.create_info), postgres_types(std::move(info.postgres_types)),
postgres_names(std::move(info.postgres_names)) {
#ifdef DEBUG
for (auto &result_name : postgres_names) {
D_ASSERT(!result_name.empty());
}
#endif
D_ASSERT(postgres_types.size() == columns.LogicalColumnCount());
approx_num_pages = info.approx_num_pages;
}
unique_ptr<BaseStatistics> PostgresTableEntry::GetStatistics(ClientContext &context, column_t column_id) {
return nullptr;
}
void PostgresTableEntry::BindUpdateConstraints(LogicalGet &, LogicalProjection &, LogicalUpdate &, ClientContext &) {
}
TableFunction PostgresTableEntry::GetScanFunction(ClientContext &context, unique_ptr<FunctionData> &bind_data) {
auto &pg_catalog = catalog.Cast<PostgresCatalog>();
auto &transaction = Transaction::Get(context, catalog).Cast<PostgresTransaction>();
auto result = make_uniq<PostgresBindData>();
result->schema_name = schema.name;
result->table_name = name;
result->dsn = transaction.GetDSN();
result->SetCatalog(pg_catalog);
for (auto &col : columns.Logical()) {
result->types.push_back(col.GetType());
}
result->names = postgres_names;
#ifdef DEBUG
for (auto &result_name : result->names) {
D_ASSERT(!result_name.empty());
}
#endif
result->postgres_types = postgres_types;
result->read_only = transaction.IsReadOnly();
PostgresScanFunction::PrepareBind(pg_catalog.GetPostgresVersion(), context, *result, approx_num_pages);
bind_data = std::move(result);
auto function = PostgresScanFunction();
Value filter_pushdown;
if (context.TryGetCurrentSetting("pg_experimental_filter_pushdown", filter_pushdown)) {
function.filter_pushdown = BooleanValue::Get(filter_pushdown);
}
return function;
}
TableStorageInfo PostgresTableEntry::GetStorageInfo(ClientContext &context) {
auto &transaction = Transaction::Get(context, catalog).Cast<PostgresTransaction>();
auto &db = transaction.GetConnection();
TableStorageInfo result;
result.cardinality = 0;
result.index_info = db.GetIndexInfo(name);
return result;
}
static bool CopyRequiresText(const LogicalType &type, const PostgresType &pg_type) {
if (pg_type.info != PostgresTypeAnnotation::STANDARD) {
return true;
}
switch (type.id()) {
case LogicalTypeId::LIST: {
D_ASSERT(pg_type.children.size() == 1);
auto &child_type = ListType::GetChildType(type);
if (child_type.id() != LogicalTypeId::LIST && !PostgresUtils::SupportedPostgresOid(child_type)) {
return true;
}
if (CopyRequiresText(child_type, pg_type.children[0])) {
return true;
}
return false;
}
case LogicalTypeId::STRUCT: {
auto &children = StructType::GetChildTypes(type);
D_ASSERT(children.size() == pg_type.children.size());
for (idx_t c = 0; c < pg_type.children.size(); c++) {
if (!PostgresUtils::SupportedPostgresOid(children[c].second)) {
return true;
}
if (CopyRequiresText(children[c].second, pg_type.children[c])) {
return true;
}
}
return false;
}
default:
return false;
}
}
PostgresCopyFormat PostgresTableEntry::GetCopyFormat(ClientContext &context) {
Value use_binary_copy;
if (context.TryGetCurrentSetting("pg_use_binary_copy", use_binary_copy)) {
if (!BooleanValue::Get(use_binary_copy)) {
return PostgresCopyFormat::TEXT;
}
}
D_ASSERT(postgres_types.size() == columns.LogicalColumnCount());
for (idx_t c = 0; c < postgres_types.size(); c++) {
if (CopyRequiresText(columns.GetColumn(LogicalIndex(c)).GetType(), postgres_types[c])) {
return PostgresCopyFormat::TEXT;
}
}
return PostgresCopyFormat::BINARY;
}
PostgresTableEntry::~PostgresTableEntry() {
}
} // namespace duckdb