Skip to content

Commit 9dd1208

Browse files
rjujurdunklau
authored andcommitted
Implement sort pushdown.
1 parent bc5fdcf commit 9dd1208

File tree

11 files changed

+645
-33
lines changed

11 files changed

+645
-33
lines changed

Makefile

+2-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ TESTS = test-$(PYTHON_TEST_VERSION)/sql/multicorn_cache_invalidation.sql
109109
test-$(PYTHON_TEST_VERSION)/sql/multicorn_sequence_test.sql \
110110
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_date.sql \
111111
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_dict.sql \
112-
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_list.sql
112+
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_list.sql \
113+
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_sort.sql
113114

114115
ifeq (${UNSUPPORTS_SQLALCHEMY}, 0)
115116
TESTS += test-$(PYTHON_TEST_VERSION)/sql/multicorn_alchemy_test.sql

python/multicorn/__init__.py

+37-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"""
1212

1313
import sys
14+
from collections import namedtuple
1415

1516
__version__ = '__VERSION__'
1617

@@ -19,6 +20,11 @@
1920
UNBOUND = object()
2021

2122

23+
SortKey = namedtuple("SortKey",
24+
["attname", "attnum", "is_reversed",
25+
"nulls_first", "collate"])
26+
27+
2228
class Qual(object):
2329
"""A Qual describes a postgresql qualifier.
2430
@@ -152,6 +158,32 @@ def get_rel_size(self, quals, columns):
152158
"""
153159
return (100000000, len(columns) * 100)
154160

161+
def can_sort(self, pathkeys):
162+
"""
163+
Method called from the planner to ask the FDW what are the sorts it can
164+
enforced, to avoid PostgreSQL to sort the data after retreiving all the
165+
rows. These sorts can come from explicit ORDER BY clauses, but also GROUP
166+
BY and DISTINCT clauses.
167+
168+
The FDW has to inspect every sort, and respond which one are handled.
169+
The sorts are cumulatives. For example::
170+
171+
col1 ASC
172+
col2 DESC
173+
174+
means that the FDW must render the tuples sorted by col1 ascending and
175+
col2 descending.
176+
177+
Args:
178+
pathkeys (list): A list of :class:`SortKey`
179+
representing all the sorts the query must enforce.
180+
181+
Return:
182+
The list of cumulative SortKey, for which the FDW can
183+
enforce the sort.
184+
"""
185+
return []
186+
155187
def get_path_keys(self):
156188
u"""
157189
Method called from the planner to add additional Path to the planner.
@@ -209,7 +241,7 @@ def get_path_keys(self):
209241
"""
210242
return []
211243

212-
def execute(self, quals, columns):
244+
def execute(self, quals, columns, pathkeys=[]):
213245
"""Execute a query in the foreign data wrapper.
214246
215247
This method is called at the first iteration.
@@ -240,13 +272,17 @@ def execute(self, quals, columns):
240272
You should return AT LEAST those columns when returning a
241273
dict. If returning a sequence, every column from the table
242274
should be in the sequence.
275+
pathkeys (list): A list of :class:`MulticornDeparsedSortGroup`
276+
that the FDW said it can enforce.
243277
244278
Returns:
245279
An iterable of python objects which can be converted back to PostgreSQL.
246280
Currently, such objects are:
247281
- sequences containing exactly as much columns as the
248282
underlying tables
249283
- dictionaries mapping column names to their values.
284+
If the pathkeys wasn't empty, the FDW has to return the data in the
285+
expected order.
250286
251287
"""
252288
pass

python/multicorn/testfdw.py

+22-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .utils import log_to_postgres, WARNING, ERROR
55
from itertools import cycle
66
from datetime import datetime
7+
from operator import itemgetter
78

89

910
class TestForeignDataWrapper(ForeignDataWrapper):
@@ -70,14 +71,30 @@ def _as_generator(self, quals, columns):
7071
index)
7172
yield line
7273

73-
def execute(self, quals, columns):
74+
def execute(self, quals, columns, pathkeys=[]):
7475
log_to_postgres(str(sorted(quals)))
7576
log_to_postgres(str(sorted(columns)))
77+
if (len(pathkeys)) > 0:
78+
log_to_postgres("requested sort(s): ")
79+
for k in pathkeys:
80+
log_to_postgres(k)
7681
if self.test_type == 'None':
7782
return None
7883
elif self.test_type == 'iter_none':
7984
return [None, None]
8085
else:
86+
if (len(pathkeys) > 0):
87+
# testfdw don't have tables with more than 2 fields, without
88+
# duplicates, so we only need to worry about sorting on 1st
89+
# asked column
90+
k = pathkeys[0];
91+
res = self._as_generator(quals, columns)
92+
if (self.test_type == 'sequence'):
93+
return sorted(res, key=itemgetter(k.attnum - 1),
94+
reverse=k.is_reversed)
95+
else:
96+
return sorted(res, key=itemgetter(k.attname),
97+
reverse=k.is_reversed)
8198
return self._as_generator(quals, columns)
8299

83100
def get_rel_size(self, quals, columns):
@@ -90,6 +107,10 @@ def get_path_keys(self):
90107
return [(('test1',), 1)]
91108
return []
92109

110+
def can_sort(self, pathkeys):
111+
# assume sort pushdown ok for all cols, in any order, any collation
112+
return pathkeys
113+
93114
def update(self, rowid, newvalues):
94115
if self.test_type == 'nowrite':
95116
super(TestForeignDataWrapper, self).update(rowid, newvalues)

src/multicorn.c

+63-13
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "catalog/pg_type.h"
2323
#include "utils/memutils.h"
2424
#include "miscadmin.h"
25+
#include "utils/lsyscache.h"
2526
#include "utils/rel.h"
2627
#include "parser/parsetree.h"
2728

@@ -304,23 +305,65 @@ multicornGetForeignPaths(PlannerInfo *root,
304305
RelOptInfo *baserel,
305306
Oid foreigntableid)
306307
{
307-
Path *path;
308-
MulticornPlanState *planstate = baserel->fdw_private;
308+
List *pathes; /* List of ForeignPath */
309+
MulticornPlanState *planstate = baserel->fdw_private;
310+
ListCell *lc;
311+
312+
/* These lists are used to handle sort pushdown */
313+
List *apply_pathkeys = NULL;
314+
List *deparsed_pathkeys = NULL;
309315

310316
/* Extract a friendly version of the pathkeys. */
311317
List *possiblePaths = pathKeys(planstate);
312318

313-
findPaths(root, baserel, possiblePaths, planstate->startupCost);
314-
/* Add a default path */
315-
path = (Path *) create_foreignscan_path(root, baserel,
316-
baserel->rows,
317-
planstate->startupCost,
318-
baserel->rows * baserel->width,
319-
NIL, /* no pathkeys */
320-
NULL, /* no outer rel either */
321-
(void *) baserel->fdw_private);
322-
323-
add_path(baserel, path);
319+
/* Try to find parameterized paths */
320+
pathes = findPaths(root, baserel, possiblePaths, planstate->startupCost,
321+
planstate, apply_pathkeys, deparsed_pathkeys);
322+
323+
/* Add a simple default path */
324+
pathes = lappend(pathes, create_foreignscan_path(root, baserel,
325+
baserel->rows,
326+
planstate->startupCost,
327+
baserel->rows * baserel->width,
328+
NIL, /* no pathkeys */
329+
NULL, /* no outer rel either */
330+
NULL));
331+
332+
/* Handle sort pushdown */
333+
if (root->query_pathkeys)
334+
{
335+
List *deparsed = deparse_sortgroup(root, foreigntableid, baserel);
336+
337+
if (deparsed)
338+
{
339+
/* Update the sort_*_pathkeys lists if needed */
340+
computeDeparsedSortGroup(deparsed, planstate, &apply_pathkeys,
341+
&deparsed_pathkeys);
342+
}
343+
}
344+
345+
/* Add each ForeignPath previously found */
346+
foreach(lc, pathes)
347+
{
348+
ForeignPath *path = (ForeignPath *) lfirst(lc);
349+
350+
/* Add the path without modification */
351+
add_path(baserel, (Path *) path);
352+
353+
/* Add the path with sort pusdown if possible */
354+
if (apply_pathkeys && deparsed_pathkeys)
355+
{
356+
ForeignPath *newpath;
357+
358+
newpath = create_foreignscan_path(root, baserel, path->path.rows,
359+
path->path.startup_cost, path->path.total_cost,
360+
apply_pathkeys, NULL,
361+
(void *) deparsed_pathkeys);
362+
363+
newpath->path.param_info = path->path.param_info;
364+
add_path(baserel, (Path *) newpath);
365+
}
366+
}
324367
errorCheck();
325368
}
326369

@@ -351,6 +394,7 @@ multicornGetForeignPlan(PlannerInfo *root,
351394
&planstate->qual_list);
352395
}
353396
}
397+
planstate->pathkeys = (List *) best_path->fdw_private;
354398
return make_foreignscan(tlist,
355399
scan_clauses,
356400
scan_relid,
@@ -921,6 +965,9 @@ serializePlanState(MulticornPlanState * state)
921965
result = lappend(result, makeConst(INT4OID,
922966
-1, InvalidOid, -1, state->foreigntableid, false, true));
923967
result = lappend(result, state->target_list);
968+
969+
result = lappend(result, serializeDeparsedSortGroup(state->pathkeys));
970+
924971
return result;
925972
}
926973

@@ -935,10 +982,13 @@ initializeExecState(void *internalstate)
935982
List *values = (List *) internalstate;
936983
AttrNumber attnum = ((Const *) linitial(values))->constvalue;
937984
Oid foreigntableid = ((Const *) lsecond(values))->constvalue;
985+
List *pathkeys;
938986

939987
/* Those list must be copied, because their memory context can become */
940988
/* invalid during the execution (in particular with the cursor interface) */
941989
execstate->target_list = copyObject(lthird(values));
990+
pathkeys = lfourth(values);
991+
execstate->pathkeys = deserializeDeparsedSortGroup(pathkeys);
942992
execstate->fdw_instance = getInstance(foreigntableid);
943993
execstate->buffer = makeStringInfo();
944994
execstate->cinfos = palloc0(sizeof(ConversionInfo *) * attnum);

src/multicorn.h

+33-6
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ typedef struct MulticornPlanState
5757
List *qual_list;
5858
int startupCost;
5959
ConversionInfo **cinfos;
60+
List *pathkeys; /* list of MulticornDeparsedSortGroup) */
6061
} MulticornPlanState;
6162

6263
typedef struct MulticornExecState
@@ -74,6 +75,7 @@ typedef struct MulticornExecState
7475
StringInfo buffer;
7576
AttrNumber rowidAttno;
7677
char *rowidAttrName;
78+
List *pathkeys; /* list of MulticornDeparsedSortGroup) */
7779
} MulticornExecState;
7880

7981
typedef struct MulticornModifyState
@@ -117,6 +119,16 @@ typedef struct MulticornParamQual
117119
Expr *expr;
118120
} MulticornParamQual;
119121

122+
typedef struct MulticornDeparsedSortGroup
123+
{
124+
char attname[NAMEDATALEN];
125+
int attnum;
126+
bool reversed;
127+
bool nulls_first;
128+
char collate[NAMEDATALEN];
129+
PathKey *key;
130+
} MulticornDeparsedSortGroup;
131+
120132
/* errors.c */
121133
void errorCheck(void);
122134

@@ -138,12 +150,14 @@ PyObject *optionsListToPyDict(List *options);
138150
const char *getPythonEncodingName(void);
139151

140152
void getRelSize(MulticornPlanState * state,
141-
PlannerInfo *root,
142-
double *rows,
143-
int *width);
153+
PlannerInfo *root,
154+
double *rows,
155+
int *width);
144156

145157
List *pathKeys(MulticornPlanState * state);
146158

159+
List *canSort(MulticornPlanState * state, List *deparsed);
160+
147161
CacheEntry *getCacheEntry(Oid foreigntableid);
148162
UserMapping *multicorn_GetUserMapping(Oid userid, Oid serverid);
149163

@@ -158,14 +172,27 @@ void extractRestrictions(Relids base_relids,
158172
List **quals);
159173
List *extractColumns(List *reltargetlist, List *restrictinfolist);
160174
void initConversioninfo(ConversionInfo ** cinfo,
161-
AttInMetadata *attinmeta);
175+
AttInMetadata *attinmeta);
162176

163177
Value *colnameFromVar(Var *var, PlannerInfo *root,
164-
MulticornPlanState * state);
178+
MulticornPlanState * state);
179+
180+
void computeDeparsedSortGroup(List *deparsed, MulticornPlanState *planstate,
181+
List **apply_pathkeys,
182+
List **deparsed_pathkeys);
165183

166-
void findPaths(PlannerInfo *root, RelOptInfo *baserel, List *possiblePaths, int startupCost);
184+
List *findPaths(PlannerInfo *root, RelOptInfo *baserel, List *possiblePaths,
185+
int startupCost,
186+
MulticornPlanState *state,
187+
List *apply_pathkeys, List *deparsed_pathkeys);
188+
189+
List *deparse_sortgroup(PlannerInfo *root, Oid foreigntableid, RelOptInfo *rel);
167190

168191
PyObject *datumToPython(Datum node, Oid typeoid, ConversionInfo * cinfo);
192+
193+
List *serializeDeparsedSortGroup(List *pathkeys);
194+
List *deserializeDeparsedSortGroup(List *items);
195+
169196
#endif /* PG_MULTICORN_H */
170197

171198
char *PyUnicode_AsPgString(PyObject *p_unicode);

0 commit comments

Comments
 (0)