Skip to content

Commit 24b39c7

Browse files
committed
feat(inserter): add force_commit()
1 parent e445311 commit 24b39c7

File tree

3 files changed

+61
-22
lines changed

3 files changed

+61
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
- insert: apply options set on the client ([#90]).
1313
- inserter: can be limited by size, see `Inserter::with_max_bytes()`.
1414
- inserter: `Inserter::pending()` to get stats about still being inserted data.
15+
- inserter: `Inserter::force_commit()` to commit and insert immediately.
1516

1617
### Changed
1718
- **BREAKING** inserter: move under the `inserter` feature.

src/inserter.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use crate::{error::Result, insert::Insert, row::Row, ticks::Ticks, Client};
88
/// Performs multiple consecutive `INSERT`s.
99
///
1010
/// By default, it doesn't end the current active `INSERT` automatically.
11-
/// Use `with_max_bytes`,`with_max_rows` and `with_period` to set limits.
11+
/// Use `with_max_bytes`, `with_max_rows` and `with_period` to set limits.
12+
/// Alternatively, use `force_commit` to end an active `INSERT` whenever you want.
1213
///
1314
/// Rows are being sent progressively to spread network load.
1415
///
@@ -213,17 +214,23 @@ where
213214

214215
/// Checks limits and ends the current `INSERT` if they are reached.
215216
pub async fn commit(&mut self) -> Result<Quantities> {
217+
if !self.limits_reached() {
218+
self.in_transaction = false;
219+
return Ok(Quantities::ZERO);
220+
}
221+
222+
self.force_commit().await
223+
}
224+
225+
/// Ends the current `INSERT` unconditionally.
226+
pub async fn force_commit(&mut self) -> Result<Quantities> {
216227
self.in_transaction = false;
217228

218-
Ok(if self.limits_reached() {
219-
let quantities = mem::replace(&mut self.pending, Quantities::ZERO);
220-
let result = self.insert().await;
221-
self.ticks.reschedule();
222-
result?;
223-
quantities
224-
} else {
225-
Quantities::ZERO
226-
})
229+
let quantities = mem::replace(&mut self.pending, Quantities::ZERO);
230+
let result = self.insert().await;
231+
self.ticks.reschedule();
232+
result?;
233+
Ok(quantities)
227234
}
228235

229236
/// Ends the current `INSERT` and whole `Inserter` unconditionally.

tests/it/inserter.rs

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#![cfg(feature = "inserter")]
22

3+
use std::string::ToString;
4+
35
use serde::Serialize;
46

57
use clickhouse::{inserter::Quantities, Client, Row};
@@ -9,6 +11,14 @@ struct MyRow {
911
data: String,
1012
}
1113

14+
impl MyRow {
15+
fn new(data: impl ToString) -> Self {
16+
Self {
17+
data: data.to_string(),
18+
}
19+
}
20+
}
21+
1222
async fn create_table(client: &Client) {
1323
client
1424
.query("CREATE TABLE test(data String) ENGINE = MergeTree ORDER BY data")
@@ -17,6 +27,35 @@ async fn create_table(client: &Client) {
1727
.unwrap();
1828
}
1929

30+
#[tokio::test]
31+
async fn force_commit() {
32+
let client = prepare_database!();
33+
create_table(&client).await;
34+
35+
let mut inserter = client.inserter("test").unwrap();
36+
let rows = 100;
37+
38+
for i in 1..=rows {
39+
inserter.write(&MyRow::new(i)).unwrap();
40+
assert_eq!(inserter.commit().await.unwrap(), Quantities::ZERO);
41+
42+
if i % 10 == 0 {
43+
assert_eq!(inserter.force_commit().await.unwrap().rows, 10);
44+
}
45+
}
46+
47+
assert_eq!(inserter.end().await.unwrap(), Quantities::ZERO);
48+
49+
let (count, sum) = client
50+
.query("SELECT count(), sum(toUInt64(data)) FROM test")
51+
.fetch_one::<(u64, u64)>()
52+
.await
53+
.unwrap();
54+
55+
assert_eq!(count, rows);
56+
assert_eq!(sum, (1..=rows).sum::<u64>());
57+
}
58+
2059
#[tokio::test]
2160
async fn limited_by_rows() {
2261
let client = prepare_database!();
@@ -26,13 +65,9 @@ async fn limited_by_rows() {
2665
let rows = 100;
2766

2867
for i in (2..=rows).step_by(2) {
29-
let row = MyRow {
30-
data: (i - 1).to_string(),
31-
};
68+
let row = MyRow::new(i - 1);
3269
inserter.write(&row).unwrap();
33-
let row = MyRow {
34-
data: i.to_string(),
35-
};
70+
let row = MyRow::new(i);
3671
inserter.write(&row).unwrap();
3772

3873
let inserted = inserter.commit().await.unwrap();
@@ -71,9 +106,7 @@ async fn limited_by_bytes() {
71106
let mut inserter = client.inserter("test").unwrap().with_max_bytes(100);
72107
let rows = 100;
73108

74-
let row = MyRow {
75-
data: "x".repeat(9), // +1 for length
76-
};
109+
let row = MyRow::new("x".repeat(9));
77110

78111
for i in 1..=rows {
79112
inserter.write(&row).unwrap();
@@ -118,9 +151,7 @@ async fn limited_by_time() {
118151
let rows = 100;
119152

120153
for i in 1..=rows {
121-
let row = MyRow {
122-
data: i.to_string(),
123-
};
154+
let row = MyRow::new(i);
124155
inserter.write(&row).unwrap();
125156

126157
tokio::time::sleep(period / 10).await;

0 commit comments

Comments
 (0)