Skip to content

Commit 2353f98

Browse files
committed
Fix sort for unpartitioned window with order by clause
There is an optimization to remove sorts if the data is already sorted and single node. This applies if there is an unpartitioned window function with an order by in the window function and the query has a general order by on the same columns. However, this optimization was being applied even if there was a local exchange before the sort that repartitioned (and therefore reordered) the data. In particular, this kind of plan occurs if there is a filter or project after a window function. This PR makes removing sorts aware of local exchanges and does not remove a sort when a local exchange changes the ordering of the data. Example affected query: SELECT regionkey, count(name) OVER (order by regionkey) FROM region ORDER BY regionkey; Previous plan: -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Output[regionkey, _col1] => [regionkey:bigint, count:bigint] _col1 := count (1:27) - Project[projectLocality = LOCAL] => [regionkey:bigint, count:bigint] - LocalExchange[ROUND_ROBIN] () => [regionkey:bigint, name:varchar(25), count:bigint] - Window[order by (regionkey ASC_NULLS_LAST)] => [regionkey:bigint, name:varchar(25), count:bigint] count := count(name) RANGE UNBOUNDED_PRECEDING CURRENT_ROW (1:27) - LocalExchange[SINGLE] () => [regionkey:bigint, name:varchar(25)] Estimates: {rows: 5 (104B), cpu: 104.00, memory: 0.00, network: 104.00} - RemoteStreamingExchange[GATHER] => [regionkey:bigint, name:varchar(25)] Estimates: {rows: 5 (104B), cpu: 104.00, memory: 0.00, network: 104.00} - TableScan[TableHandle {connectorId='tpch', connectorHandle='region:sf1.0', layout='Optional[region:sf1.0]'}] => [regionkey:bigint, name:varchar(25)] Estimates: {rows: 5 (104B), cpu: 104.00, memory: 0.00, network: 0.00} name := tpch:name (1:70) regionkey := tpch:regionkey (1:70) New Plan: -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Output[regionkey, _col1] => [regionkey:bigint, count:bigint] _col1 := count (1:27) - LocalMerge[regionkey ASC_NULLS_LAST] => [regionkey:bigint, count:bigint] - Sort[regionkey ASC_NULLS_LAST] => [regionkey:bigint, count:bigint] - Project[projectLocality = LOCAL] => [regionkey:bigint, count:bigint] - LocalExchange[ROUND_ROBIN] () => [regionkey:bigint, name:varchar(25), count:bigint] - Window[order by (regionkey ASC_NULLS_LAST)] => [regionkey:bigint, name:varchar(25), count:bigint] count := count(name) RANGE UNBOUNDED_PRECEDING CURRENT_ROW (1:27) - LocalExchange[SINGLE] () => [regionkey:bigint, name:varchar(25)] Estimates: {rows: 5 (104B), cpu: 104.00, memory: 0.00, network: 104.00} - RemoteStreamingExchange[GATHER] => [regionkey:bigint, name:varchar(25)] Estimates: {rows: 5 (104B), cpu: 104.00, memory: 0.00, network: 104.00} - TableScan[TableHandle {connectorId='tpch', connectorHandle='region:sf1.0', layout='Optional[region:sf1.0]'}] => [regionkey:bigint, name:varchar(25)] Estimates: {rows: 5 (104B), cpu: 104.00, memory: 0.00, network: 0.00} name := tpch:name (2:6) regionkey := tpch:regionkey (2:6)
1 parent 6873c7e commit 2353f98

File tree

4 files changed

+81
-8
lines changed

4 files changed

+81
-8
lines changed

presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,16 +497,16 @@ public PlanWithProperties visitSort(SortNode node, PreferredProperties preferred
497497

498498
if (child.getProperties().isSingleNode()) {
499499
// current plan so far is single node, so local properties are effectively global properties
500-
// skip the SortNode if the local properties guarantee ordering on Sort keys
501-
// TODO: This should be extracted as a separate optimizer once the planner is able to reason about the ordering of each operator
500+
// don't need an extra exchange if the node is already sorted on the desired columns
501+
// a later optimization will remove this sort node
502502
List<LocalProperty<VariableReferenceExpression>> desiredProperties = new ArrayList<>();
503503
for (VariableReferenceExpression variable : node.getOrderingScheme().getOrderByVariables()) {
504504
desiredProperties.add(new SortingProperty<>(variable, node.getOrderingScheme().getOrdering(variable)));
505505
}
506506

507507
if (LocalProperties.match(child.getProperties().getLocalProperties(), desiredProperties).stream()
508508
.noneMatch(Optional::isPresent)) {
509-
return child;
509+
return rebaseAndDeriveProperties(node, child);
510510
}
511511
}
512512

presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.facebook.presto.spi.plan.DistinctLimitNode;
2525
import com.facebook.presto.spi.plan.LimitNode;
2626
import com.facebook.presto.spi.plan.MarkDistinctNode;
27+
import com.facebook.presto.spi.plan.OrderingScheme;
2728
import com.facebook.presto.spi.plan.PlanNode;
2829
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
2930
import com.facebook.presto.spi.plan.TopNNode;
@@ -187,6 +188,20 @@ public PlanWithProperties visitExplainAnalyze(ExplainAnalyzeNode node, StreamPre
187188
@Override
188189
public PlanWithProperties visitSort(SortNode node, StreamPreferredProperties parentPreferences)
189190
{
191+
// Remove sort if the child is already sorted and in a single stream
192+
// TODO: extract to its own optimization after AddLocalExchanges once the
193+
// constraint optimization framework is in a better state to be extended
194+
PlanWithProperties childPlan = planAndEnforce(node.getSource(), any(), singleStream());
195+
if (childPlan.getProperties().isSingleStream() && childPlan.getProperties().isOrdered()) {
196+
OrderingScheme orderingScheme = node.getOrderingScheme();
197+
List<LocalProperty<VariableReferenceExpression>> desiredProperties = orderingScheme.getOrderByVariables().stream()
198+
.map(variable -> new SortingProperty<>(variable, orderingScheme.getOrdering(variable)))
199+
.collect(toImmutableList());
200+
if (LocalProperties.match(childPlan.getProperties().getLocalProperties(), desiredProperties).stream().noneMatch(Optional::isPresent)) {
201+
return childPlan;
202+
}
203+
}
204+
190205
if (isDistributedSortEnabled(session)) {
191206
PlanWithProperties sortPlan = planAndEnforceChildren(node, fixedParallelism(), fixedParallelism());
192207

presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,10 @@ public StreamProperties visitMarkDistinct(MarkDistinctNode node, List<StreamProp
535535
@Override
536536
public StreamProperties visitWindow(WindowNode node, List<StreamProperties> inputProperties)
537537
{
538+
StreamProperties childProperties = Iterables.getOnlyElement(inputProperties);
539+
if (childProperties.isSingleStream() && node.getPartitionBy().isEmpty() && node.getOrderingScheme().isPresent()) {
540+
return StreamProperties.ordered();
541+
}
538542
return Iterables.getOnlyElement(inputProperties);
539543
}
540544

presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestEliminateSorts.java

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.facebook.presto.sql.planner.optimizations;
1515

16+
import com.facebook.presto.Session;
1617
import com.facebook.presto.common.block.SortOrder;
1718
import com.facebook.presto.sql.parser.SqlParser;
1819
import com.facebook.presto.sql.planner.PartitioningProviderManager;
@@ -33,30 +34,42 @@
3334
import java.util.List;
3435
import java.util.Optional;
3536

37+
import static com.facebook.presto.SystemSessionProperties.TASK_CONCURRENCY;
38+
import static com.facebook.presto.sql.planner.LogicalPlanner.Stage.OPTIMIZED;
3639
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
40+
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange;
41+
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter;
3742
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.functionCall;
3843
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.output;
3944
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.sort;
4045
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.specification;
4146
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan;
4247
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.window;
48+
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL;
49+
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER;
50+
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION;
4351

4452
public class TestEliminateSorts
4553
extends BasePlanTest
4654
{
4755
private static final String QUANTITY_ALIAS = "QUANTITY";
56+
private static final String TAX_ALIAS = "TAX";
4857

4958
private static final ExpectedValueProvider<WindowNode.Specification> windowSpec = specification(
5059
ImmutableList.of(),
5160
ImmutableList.of(QUANTITY_ALIAS),
5261
ImmutableMap.of(QUANTITY_ALIAS, SortOrder.ASC_NULLS_LAST));
5362

54-
private static final PlanMatchPattern LINEITEM_TABLESCAN_Q = tableScan(
63+
private static final PlanMatchPattern LINEITEM_TABLESCAN_Q_BASIC = tableScan(
5564
"lineitem",
5665
ImmutableMap.of(QUANTITY_ALIAS, "quantity"));
5766

67+
private static final PlanMatchPattern LINEITEM_TABLESCAN_Q = tableScan(
68+
"lineitem",
69+
ImmutableMap.of(QUANTITY_ALIAS, "quantity", TAX_ALIAS, "tax"));
70+
5871
@Test
59-
public void testEliminateSorts()
72+
public void testEliminateSortsBasic()
6073
{
6174
@Language("SQL") String sql = "SELECT quantity, row_number() OVER (ORDER BY quantity) FROM lineitem ORDER BY quantity";
6275

@@ -65,7 +78,38 @@ public void testEliminateSorts()
6578
window(windowMatcherBuilder -> windowMatcherBuilder
6679
.specification(windowSpec)
6780
.addFunction(functionCall("row_number", Optional.empty(), ImmutableList.of())),
68-
anyTree(LINEITEM_TABLESCAN_Q)));
81+
anyTree(LINEITEM_TABLESCAN_Q_BASIC)));
82+
83+
assertUnitPlan(sql, pattern);
84+
}
85+
86+
/**
87+
* Cannot eliminate sorts when there is a filter or project above the window as a local exchange
88+
* will be added to repartition the data before the filter or project node
89+
*/
90+
@Test
91+
public void testDoesNotEliminateSortsWithFilter()
92+
{
93+
@Language("SQL") String sql = "SELECT * FROM " +
94+
"(SELECT quantity, count(tax) OVER (ORDER BY quantity) AS c FROM lineitem) " +
95+
"WHERE c > 3 ORDER BY quantity";
96+
97+
PlanMatchPattern pattern =
98+
output(
99+
exchange(
100+
LOCAL,
101+
GATHER,
102+
sort(
103+
filter("C > 3",
104+
exchange(
105+
LOCAL,
106+
REPARTITION,
107+
window(windowMatcherBuilder -> windowMatcherBuilder
108+
.specification(windowSpec)
109+
.addFunction(
110+
"C",
111+
functionCall("count", Optional.empty(), ImmutableList.of(TAX_ALIAS))),
112+
anyTree(LINEITEM_TABLESCAN_Q)))))));
69113

70114
assertUnitPlan(sql, pattern);
71115
}
@@ -95,15 +139,25 @@ public void assertUnitPlan(@Language("SQL") String sql, PlanMatchPattern pattern
95139
getQueryRunner().getStatsCalculator(),
96140
getQueryRunner().getCostCalculator(),
97141
new TranslateExpressions(getMetadata(), new SqlParser()).rules()),
142+
new UnaliasSymbolReferences(getMetadata().getFunctionAndTypeManager()),
143+
new PruneUnreferencedOutputs(),
144+
new IterativeOptimizer(
145+
new RuleStatsRecorder(),
146+
getQueryRunner().getStatsCalculator(),
147+
getQueryRunner().getCostCalculator(),
148+
ImmutableSet.of(new RemoveRedundantIdentityProjections())),
98149
new AddExchanges(getQueryRunner().getMetadata(), new SqlParser(), new PartitioningProviderManager()),
150+
new AddLocalExchanges(getMetadata(), new SqlParser()),
99151
new UnaliasSymbolReferences(getMetadata().getFunctionAndTypeManager()),
100152
new PruneUnreferencedOutputs(),
101153
new IterativeOptimizer(
102154
new RuleStatsRecorder(),
103155
getQueryRunner().getStatsCalculator(),
104156
getQueryRunner().getCostCalculator(),
105157
ImmutableSet.of(new RemoveRedundantIdentityProjections())));
106-
107-
assertPlan(sql, pattern, optimizers);
158+
Session session = Session.builder(getQueryRunner().getDefaultSession())
159+
.setSystemProperty(TASK_CONCURRENCY, "4")
160+
.build();
161+
assertPlan(sql, session, OPTIMIZED, pattern, optimizers);
108162
}
109163
}

0 commit comments

Comments
 (0)