-
Notifications
You must be signed in to change notification settings - Fork 221
/
Copy pathtop_n.rs
76 lines (68 loc) · 2.33 KB
/
top_n.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0.
use std::cmp::Ordering;
use binary_heap_plus::BinaryHeap;
use super::*;
use crate::array::{DataChunk, DataChunkBuilder};
use crate::types::{DataType, Row};
/// The executor of a Top N operation.
pub struct TopNExecutor {
pub offset: usize,
pub limit: usize,
/// A list of expressions to order by.
///
/// e.g. `(list (+ #0 #1) (desc #0))`
pub order_keys: RecExpr,
pub types: Vec<DataType>,
}
impl TopNExecutor {
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self, child: BoxedExecutor) {
// initialize heap
let heap_size = self.offset + self.limit;
let orders = Evaluator::new(&self.order_keys).orders();
let mut heap =
BinaryHeap::with_capacity_by(heap_size, |row1, row2| cmp(row1, row2, &orders));
// evaluate order keys and append the original rows
// chunks = keys || child
#[for_await]
for chunk in child {
let chunk = chunk?;
let order_key_chunk = Evaluator::new(&self.order_keys).eval_list(&chunk)?;
for row in order_key_chunk.row_concat(chunk).rows() {
heap.push(row.to_owned());
if heap.len() > heap_size {
heap.pop();
}
}
}
// build chunk
let order_keys_len = self.order_keys.as_ref().last().unwrap().as_list().len();
let mut builder = DataChunkBuilder::new(self.types.iter(), PROCESSING_WINDOW_SIZE);
for row in heap
.into_sorted_vec()
.into_iter()
.skip(self.offset)
.take(self.limit)
{
if let Some(chunk) = builder.push_row(row.into_iter().skip(order_keys_len)) {
yield chunk;
}
}
if let Some(chunk) = builder.take() {
yield chunk;
}
}
}
/// Compare two rows by orders.
///
/// The order is `false` for ascending and `true` for descending.
fn cmp(row1: &Row, row2: &Row, orders: &[bool]) -> Ordering {
for ((v1, v2), desc) in row1.iter().zip(row2.iter()).zip(orders) {
match v1.cmp(v2) {
Ordering::Equal => continue,
o if *desc => return o.reverse(),
o => return o,
}
}
Ordering::Equal
}