1
1
#include " duckdb/function/window/window_merge_sort_tree.hpp"
2
+ #include " duckdb/planner/expression/bound_constant_expression.hpp"
2
3
3
4
#include < thread>
4
5
#include < utility>
5
6
6
7
namespace duckdb {
7
8
8
9
WindowMergeSortTree::WindowMergeSortTree (ClientContext &context, const vector<BoundOrderByNode> &orders,
9
- const vector<column_t > &sort_idx, const idx_t count)
10
+ const vector<column_t > &sort_idx, const idx_t count, bool unique )
10
11
: context(context), memory_per_thread(PhysicalOperator::GetMaxThreadMemory(context)), sort_idx(sort_idx),
11
12
build_stage (PartitionSortStage::INIT), tasks_completed(0 ) {
12
13
// Sort the unfiltered indices by the orders
@@ -26,7 +27,19 @@ WindowMergeSortTree::WindowMergeSortTree(ClientContext &context, const vector<Bo
26
27
payload_layout.Initialize (payload_types);
27
28
28
29
auto &buffer_manager = BufferManager::GetBufferManager (context);
29
- global_sort = make_uniq<GlobalSortState>(buffer_manager, orders, payload_layout);
30
+ if (unique) {
31
+ vector<BoundOrderByNode> unique_orders;
32
+ for (const auto &order : orders) {
33
+ unique_orders.emplace_back (order.Copy ());
34
+ }
35
+ auto unique_expr = make_uniq<BoundConstantExpression>(Value (index_type));
36
+ const auto order_type = OrderType::ASCENDING;
37
+ const auto order_by_type = OrderByNullType::NULLS_LAST;
38
+ unique_orders.emplace_back (BoundOrderByNode (order_type, order_by_type, std::move (unique_expr)));
39
+ global_sort = make_uniq<GlobalSortState>(buffer_manager, unique_orders, payload_layout);
40
+ } else {
41
+ global_sort = make_uniq<GlobalSortState>(buffer_manager, orders, payload_layout);
42
+ }
30
43
global_sort->external = ClientConfig::GetConfig (context).force_external ;
31
44
}
32
45
@@ -48,18 +61,22 @@ WindowMergeSortTreeLocalState::WindowMergeSortTreeLocalState(WindowMergeSortTree
48
61
49
62
void WindowMergeSortTreeLocalState::SinkChunk (DataChunk &chunk, const idx_t row_idx,
50
63
optional_ptr<SelectionVector> filter_sel, idx_t filtered) {
64
+ // Sequence the payload column
65
+ auto &indices = payload_chunk.data [0 ];
66
+ payload_chunk.SetCardinality (chunk);
67
+ indices.Sequence (int64_t (row_idx), 1 , payload_chunk.size ());
68
+
51
69
// Reference the sort columns
52
70
auto &sort_idx = window_tree.sort_idx ;
53
71
for (column_t c = 0 ; c < sort_idx.size (); ++c) {
54
72
sort_chunk.data [c].Reference (chunk.data [sort_idx[c]]);
55
73
}
74
+ // Add the row numbers if we are uniquifying
75
+ if (sort_idx.size () < sort_chunk.ColumnCount ()) {
76
+ sort_chunk.data [sort_idx.size ()].Reference (indices);
77
+ }
56
78
sort_chunk.SetCardinality (chunk);
57
79
58
- // Sequence the payload column
59
- auto &indices = payload_chunk.data [0 ];
60
- payload_chunk.SetCardinality (sort_chunk);
61
- indices.Sequence (int64_t (row_idx), 1 , payload_chunk.size ());
62
-
63
80
// Apply FILTER clause, if any
64
81
if (filter_sel) {
65
82
sort_chunk.Slice (*filter_sel, filtered);
0 commit comments