-
Notifications
You must be signed in to change notification settings - Fork 221
/
Copy pathinsert.rs
102 lines (91 loc) · 3.05 KB
/
insert.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0.
use std::sync::Arc;
use super::*;
use crate::array::DataChunk;
use crate::catalog::{ColumnId, TableRefId};
use crate::storage::{Storage, Table, Transaction};
use crate::types::ColumnIndex;
/// The executor of `insert` statement.
pub struct InsertExecutor<S: Storage> {
pub table_id: TableRefId,
pub column_ids: Vec<ColumnId>,
pub storage: Arc<S>,
}
impl<S: Storage> InsertExecutor<S> {
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self, child: BoxedExecutor) {
let table = self.storage.get_table(self.table_id)?;
let columns = table.columns()?;
// construct an expression
let mut expr = RecExpr::default();
let list = columns
.iter()
.map(|col| {
let val = expr.add(
match self.column_ids.iter().position(|&id| id == col.id()) {
Some(index) => Expr::ColumnIndex(ColumnIndex(index as _)),
None => Expr::null(),
},
);
let ty = expr.add(Expr::Type(col.data_type()));
expr.add(Expr::Cast([ty, val]))
})
.collect();
expr.add(Expr::List(list));
let mut txn = table.write().await?;
let mut cnt = 0;
#[for_await]
for chunk in child {
let chunk = Evaluator::new(&expr).eval_list(&chunk?)?;
cnt += chunk.cardinality();
txn.append(chunk).await?;
}
txn.commit().await?;
yield DataChunk::single(cnt as i32);
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::array::ArrayImpl;
use crate::catalog::{ColumnCatalog, ColumnDesc, TableRefId};
use crate::storage::{InMemoryStorage, StorageImpl};
use crate::types::DataType;
#[tokio::test]
async fn simple() {
let storage = create_table().await;
let executor = InsertExecutor {
table_id: TableRefId::new(1, 0),
column_ids: vec![0, 1],
storage: storage.as_in_memory_storage(),
};
let source = async_stream::try_stream! {
yield [
ArrayImpl::new_int32((0..4).collect()),
ArrayImpl::new_int32((100..104).collect()),
]
.into_iter()
.collect();
}
.boxed();
executor.execute(source).next().await.unwrap().unwrap();
}
async fn create_table() -> StorageImpl {
let storage = StorageImpl::InMemoryStorage(Arc::new(InMemoryStorage::new()));
storage
.as_in_memory_storage()
.create_table(
1,
"t",
&[
ColumnCatalog::new(0, ColumnDesc::new("v1", DataType::Int32, false)),
ColumnCatalog::new(1, ColumnDesc::new("v2", DataType::Int32, false)),
],
&[],
)
.await
.unwrap();
storage
}
}