From d8b8b0622950caee0c40af46de3285f9cbc20cc1 Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Thu, 21 Nov 2024 17:00:38 +0100 Subject: [PATCH 01/12] Pin bstr version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c3ad148..cc95b1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,7 +102,7 @@ lz4_flex = { version = "0.11.3", default-features = false, features = [ cityhash-rs = { version = "=1.0.1", optional = true } # exact version for safety uuid = { version = "1", optional = true } time = { version = "0.3", optional = true } -bstr = { version = "1.2", default-features = false } +bstr = { version = "1.11.0", default-features = false } quanta = { version = "0.12", optional = true } replace_with = { version = "0.1.7" } From daacd6ca0f135eb5a21df8162e2110195a128ac8 Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Thu, 21 Nov 2024 17:09:39 +0100 Subject: [PATCH 02/12] Pin bstr version [2] --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index cc95b1a..ab151ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,7 +102,7 @@ lz4_flex = { version = "0.11.3", default-features = false, features = [ cityhash-rs = { version = "=1.0.1", optional = true } # exact version for safety uuid = { version = "1", optional = true } time = { version = "0.3", optional = true } -bstr = { version = "1.11.0", default-features = false } +bstr = { version = "=1.10.0", default-features = false } quanta = { version = "0.12", optional = true } replace_with = { version = "0.1.7" } From adef1020fe3d85ab392cc473637cff63475a61d4 Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Thu, 21 Nov 2024 17:13:35 +0100 Subject: [PATCH 03/12] Pin hyper-rustls version --- Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ab151ab..d70161f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,10 +85,10 @@ hyper = "1.4" hyper-util = { version = "0.1.6", features = ["client-legacy", "http1"] } hyper-tls = { version = "0.6.0", optional = true } rustls = { version = "0.23", default-features = false, optional = true } -hyper-rustls = { version = "0.27.2", default-features = false, features = [ +hyper-rustls = { version = "=0.27.2", default-features = false, features = [ "http1", "tls12", -], optional = true } +], optional = true } # 0.27.3+ is Rust 1.71+ only url = "2.1.1" futures = "0.3.5" futures-channel = "0.3.30" @@ -102,7 +102,7 @@ lz4_flex = { version = "0.11.3", default-features = false, features = [ cityhash-rs = { version = "=1.0.1", optional = true } # exact version for safety uuid = { version = "1", optional = true } time = { version = "0.3", optional = true } -bstr = { version = "=1.10.0", default-features = false } +bstr = { version = "=1.10.0", default-features = false } # 1.11.0+ is Rust 1.73+ only quanta = { version = "0.12", optional = true } replace_with = { version = "0.1.7" } From fb60db0fdd34544fe3806426ecb29aff9847293a Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Tue, 3 Dec 2024 23:54:04 +0100 Subject: [PATCH 04/12] Bump MSRV to 1.73 --- .github/workflows/ci.yml | 2 +- Cargo.toml | 8 ++++---- derive/Cargo.toml | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e327198..a43d5c7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,7 @@ env: RUSTFLAGS: -Dwarnings RUSTDOCFLAGS: -Dwarnings RUST_BACKTRACE: 1 - MSRV: 1.70.0 + MSRV: 1.73.0 jobs: build: diff --git a/Cargo.toml b/Cargo.toml index d70161f..f089e2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ license = "MIT OR Apache-2.0" readme = "README.md" edition = "2021" # update `derive/Cargo.toml` and CI if changed -rust-version = "1.70.0" +rust-version = "1.73.0" [lints.rust] rust_2018_idioms = { level = "warn", priority = -1 } @@ -85,10 +85,10 @@ hyper = "1.4" hyper-util = { version = "0.1.6", features = ["client-legacy", "http1"] } hyper-tls = { version = "0.6.0", optional = true } rustls = { version = "0.23", default-features = false, optional = true } -hyper-rustls = { version = "=0.27.2", default-features = false, features = [ +hyper-rustls = { version = "0.27.3", default-features = false, features = [ "http1", "tls12", -], optional = true } # 0.27.3+ is Rust 1.71+ only +], optional = true } url = "2.1.1" futures = "0.3.5" futures-channel = "0.3.30" @@ -102,7 +102,7 @@ lz4_flex = { version = "0.11.3", default-features = false, features = [ cityhash-rs = { version = "=1.0.1", optional = true } # exact version for safety uuid = { version = "1", optional = true } time = { version = "0.3", optional = true } -bstr = { version = "=1.10.0", default-features = false } # 1.11.0+ is Rust 1.73+ only +bstr = { version = "1.11.0", default-features = false } # 1.11.0+ is Rust 1.73+ only quanta = { version = "0.12", optional = true } replace_with = { version = "0.1.7" } diff --git a/derive/Cargo.toml b/derive/Cargo.toml index 2d2e4e3..56cb322 100644 --- a/derive/Cargo.toml +++ b/derive/Cargo.toml @@ -8,7 +8,7 @@ homepage = "https://clickhouse.com" edition = "2021" license = "MIT OR Apache-2.0" # update `Cargo.toml` and CI if changed -rust-version = "1.70.0" +rust-version = "1.73.0" [lib] proc-macro = true From d762b75b334a614f472c784c7c6b9de22805d61d Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Wed, 4 Dec 2024 00:03:14 +0100 Subject: [PATCH 05/12] Fix clippy issues --- src/rowbinary/de.rs | 6 +++--- src/rowbinary/ser.rs | 8 ++++---- src/sql/bind.rs | 2 +- src/sql/ser.rs | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/rowbinary/de.rs b/src/rowbinary/de.rs index b0d56c8..30d97b3 100644 --- a/src/rowbinary/de.rs +++ b/src/rowbinary/de.rs @@ -25,7 +25,7 @@ struct RowBinaryDeserializer<'cursor, 'data> { input: &'cursor mut &'data [u8], } -impl<'cursor, 'data> RowBinaryDeserializer<'cursor, 'data> { +impl<'data> RowBinaryDeserializer<'_, 'data> { fn read_vec(&mut self, size: usize) -> Result> { Ok(self.read_slice(size)?.to_vec()) } @@ -64,7 +64,7 @@ macro_rules! impl_num { }; } -impl<'cursor, 'data> Deserializer<'data> for &mut RowBinaryDeserializer<'cursor, 'data> { +impl<'data> Deserializer<'data> for &mut RowBinaryDeserializer<'_, 'data> { type Error = Error; impl_num!(i8, deserialize_i8, visit_i8, get_i8); @@ -163,7 +163,7 @@ impl<'cursor, 'data> Deserializer<'data> for &mut RowBinaryDeserializer<'cursor, len: usize, } - impl<'de, 'cursor, 'data> SeqAccess<'data> for Access<'de, 'cursor, 'data> { + impl<'data> SeqAccess<'data> for Access<'_, '_, 'data> { type Error = Error; fn next_element_seed(&mut self, seed: T) -> Result> diff --git a/src/rowbinary/ser.rs b/src/rowbinary/ser.rs index 1191140..838b845 100644 --- a/src/rowbinary/ser.rs +++ b/src/rowbinary/ser.rs @@ -30,7 +30,7 @@ macro_rules! impl_num { }; } -impl<'a, B: BufMut> Serializer for &'a mut RowBinarySerializer { +impl Serializer for &'_ mut RowBinarySerializer { type Error = Error; type Ok = (); type SerializeMap = Impossible<(), Error>; @@ -201,7 +201,7 @@ impl<'a, B: BufMut> Serializer for &'a mut RowBinarySerializer { } } -impl<'a, B: BufMut> SerializeStruct for &'a mut RowBinarySerializer { +impl SerializeStruct for &mut RowBinarySerializer { type Error = Error; type Ok = (); @@ -216,7 +216,7 @@ impl<'a, B: BufMut> SerializeStruct for &'a mut RowBinarySerializer { } } -impl<'a, B: BufMut> SerializeSeq for &'a mut RowBinarySerializer { +impl SerializeSeq for &'_ mut RowBinarySerializer { type Error = Error; type Ok = (); @@ -229,7 +229,7 @@ impl<'a, B: BufMut> SerializeSeq for &'a mut RowBinarySerializer { } } -impl<'a, B: BufMut> SerializeTuple for &'a mut RowBinarySerializer { +impl SerializeTuple for &'_ mut RowBinarySerializer { type Error = Error; type Ok = (); diff --git a/src/sql/bind.rs b/src/sql/bind.rs index c4885b0..1b98b35 100644 --- a/src/sql/bind.rs +++ b/src/sql/bind.rs @@ -24,7 +24,7 @@ impl Bind for S { pub struct Identifier<'a>(pub &'a str); #[sealed] -impl<'a> Bind for Identifier<'a> { +impl Bind for Identifier<'_> { #[inline] fn write(&self, dst: &mut impl fmt::Write) -> Result<(), String> { escape::identifier(self.0, dst).map_err(|err| err.to_string()) diff --git a/src/sql/ser.rs b/src/sql/ser.rs index 6cbdb22..f9ca310 100644 --- a/src/sql/ser.rs +++ b/src/sql/ser.rs @@ -226,7 +226,7 @@ struct SqlListSerializer<'a, W> { closing_char: char, } -impl<'a, W: Write> SerializeSeq for SqlListSerializer<'a, W> { +impl SerializeSeq for SqlListSerializer<'_, W> { type Error = SerializerError; type Ok = (); @@ -253,7 +253,7 @@ impl<'a, W: Write> SerializeSeq for SqlListSerializer<'a, W> { } } -impl<'a, W: Write> SerializeTuple for SqlListSerializer<'a, W> { +impl SerializeTuple for SqlListSerializer<'_, W> { type Error = SerializerError; type Ok = (); From f4921faa914c3202ca55a0ade419718d3ce8bcb6 Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Wed, 4 Dec 2024 00:18:29 +0100 Subject: [PATCH 06/12] Reduce max_execution_time in the cursor_error test --- tests/it/cursor_error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/it/cursor_error.rs b/tests/it/cursor_error.rs index 21d1bad..db2285e 100644 --- a/tests/it/cursor_error.rs +++ b/tests/it/cursor_error.rs @@ -20,7 +20,7 @@ async fn max_execution_time(mut client: Client, wait_end_of_query: bool) { // TODO: check different `timeout_overflow_mode` let mut cursor = client .with_compression(Compression::None) - .with_option("max_execution_time", "0.1") + .with_option("max_execution_time", "0.01") .query("SELECT toUInt8(65 + number % 5) FROM system.numbers LIMIT 100000000") .fetch::() .unwrap(); From f2e77af219e382b2124b7439e6d482bd2eb2a28b Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Wed, 4 Dec 2024 00:20:40 +0100 Subject: [PATCH 07/12] Increase limit in the cursor_error test (maybe it's too fast?) --- tests/it/cursor_error.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/it/cursor_error.rs b/tests/it/cursor_error.rs index db2285e..81c382c 100644 --- a/tests/it/cursor_error.rs +++ b/tests/it/cursor_error.rs @@ -20,8 +20,8 @@ async fn max_execution_time(mut client: Client, wait_end_of_query: bool) { // TODO: check different `timeout_overflow_mode` let mut cursor = client .with_compression(Compression::None) - .with_option("max_execution_time", "0.01") - .query("SELECT toUInt8(65 + number % 5) FROM system.numbers LIMIT 100000000") + .with_option("max_execution_time", "0.05") + .query("SELECT toUInt8(65 + number % 5) FROM system.numbers LIMIT 1000000000") .fetch::() .unwrap(); @@ -39,6 +39,8 @@ async fn max_execution_time(mut client: Client, wait_end_of_query: bool) { } }; + println!("i: {}", i); + assert!(wait_end_of_query ^ (i != 0)); assert!(err.to_string().contains("TIMEOUT_EXCEEDED")); } From 7572484fa964f72950f12d676c346eb0a5aea39c Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Wed, 4 Dec 2024 00:24:01 +0100 Subject: [PATCH 08/12] Potential overflow (?) --- tests/it/cursor_error.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/it/cursor_error.rs b/tests/it/cursor_error.rs index 81c382c..a0e0e06 100644 --- a/tests/it/cursor_error.rs +++ b/tests/it/cursor_error.rs @@ -21,7 +21,7 @@ async fn max_execution_time(mut client: Client, wait_end_of_query: bool) { let mut cursor = client .with_compression(Compression::None) .with_option("max_execution_time", "0.05") - .query("SELECT toUInt8(65 + number % 5) FROM system.numbers LIMIT 1000000000") + .query("SELECT toUInt8(65 + number % 100) FROM system.numbers LIMIT 1000000000") .fetch::() .unwrap(); @@ -31,7 +31,7 @@ async fn max_execution_time(mut client: Client, wait_end_of_query: bool) { match cursor.next().await { Ok(Some(no)) => { // Check that we haven't parsed something extra. - assert_eq!(no, (65 + i % 5) as u8); + assert_eq!(no, (65 + i % 100) as u8); i += 1; } Ok(None) => panic!("DB exception hasn't been found"), From 0b03bc6822c38079c67edfe4c2974f5c24fc6913 Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Wed, 4 Dec 2024 00:44:56 +0100 Subject: [PATCH 09/12] Rework cursor_error.rs --- tests/it/cursor_error.rs | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/tests/it/cursor_error.rs b/tests/it/cursor_error.rs index a0e0e06..a90b200 100644 --- a/tests/it/cursor_error.rs +++ b/tests/it/cursor_error.rs @@ -3,46 +3,51 @@ use clickhouse::{Client, Compression}; #[tokio::test] async fn deferred() { let client = prepare_database!(); - max_execution_time(client, false).await; + let results = max_execution_time(client).await; + assert_eq!(results, vec![0u64, 1u64]); } #[tokio::test] async fn wait_end_of_query() { let client = prepare_database!(); - max_execution_time(client, true).await; + let results = max_execution_time(client.with_option("wait_end_of_query", "1")).await; + // the entire response is buffered before sending to the client (and it fails) + // so we don't get any results in this case + assert_eq!(results, Vec::::new()); } -async fn max_execution_time(mut client: Client, wait_end_of_query: bool) { - if wait_end_of_query { - client = client.with_option("wait_end_of_query", "1") +async fn max_execution_time(client: Client) -> Vec { + #[derive(Debug, clickhouse::Row, serde::Deserialize)] + struct Res { + number: u64, + _sleep: u8, } // TODO: check different `timeout_overflow_mode` let mut cursor = client .with_compression(Compression::None) - .with_option("max_execution_time", "0.05") - .query("SELECT toUInt8(65 + number % 100) FROM system.numbers LIMIT 1000000000") - .fetch::() + // reducing max_block_size to force the server to stream one row at a time + .with_option("max_block_size", "1") + // with sleepEachRow(0.01) this ensures that the query will fail after the second row + .with_option("max_execution_time", "0.025") + .query("SELECT number, sleepEachRow(0.01) AS sleep FROM system.numbers LIMIT 3") + .fetch::() .unwrap(); - let mut i = 0u64; - + let mut results: Vec = Vec::new(); let err = loop { match cursor.next().await { - Ok(Some(no)) => { - // Check that we haven't parsed something extra. - assert_eq!(no, (65 + i % 100) as u8); - i += 1; + Ok(Some(res)) => { + println!("no: {:?}", res); + results.push(res.number); } Ok(None) => panic!("DB exception hasn't been found"), Err(err) => break err, } }; - println!("i: {}", i); - - assert!(wait_end_of_query ^ (i != 0)); assert!(err.to_string().contains("TIMEOUT_EXCEEDED")); + results } #[cfg(feature = "lz4")] From ac46b6bc8ea8a338f3fcc2004ffe70c918ebe1d5 Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Wed, 4 Dec 2024 00:47:58 +0100 Subject: [PATCH 10/12] Increase max_execution_time a bit --- tests/it/cursor_error.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/it/cursor_error.rs b/tests/it/cursor_error.rs index a90b200..8b9e90e 100644 --- a/tests/it/cursor_error.rs +++ b/tests/it/cursor_error.rs @@ -28,9 +28,9 @@ async fn max_execution_time(client: Client) -> Vec { .with_compression(Compression::None) // reducing max_block_size to force the server to stream one row at a time .with_option("max_block_size", "1") - // with sleepEachRow(0.01) this ensures that the query will fail after the second row - .with_option("max_execution_time", "0.025") - .query("SELECT number, sleepEachRow(0.01) AS sleep FROM system.numbers LIMIT 3") + // with sleepEachRow(0.1) this ensures that the query will fail after the second row + .with_option("max_execution_time", "0.25") + .query("SELECT number, sleepEachRow(0.1) AS sleep FROM system.numbers LIMIT 3") .fetch::() .unwrap(); From c4dc5cab2d3ea072e8ab5bbead4d56e87aed2358 Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Wed, 4 Dec 2024 00:57:59 +0100 Subject: [PATCH 11/12] Pin ClickHouse version on the CI, revert the cursor_error test --- .github/workflows/ci.yml | 2 +- tests/it/cursor_error.rs | 37 +++++++++++++++---------------------- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a43d5c7..fe6ab78 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,7 +77,7 @@ jobs: services: clickhouse: - image: clickhouse/clickhouse-server + image: clickhouse/clickhouse-server:24.10-alpine ports: - 8123:8123 diff --git a/tests/it/cursor_error.rs b/tests/it/cursor_error.rs index 8b9e90e..21d1bad 100644 --- a/tests/it/cursor_error.rs +++ b/tests/it/cursor_error.rs @@ -3,51 +3,44 @@ use clickhouse::{Client, Compression}; #[tokio::test] async fn deferred() { let client = prepare_database!(); - let results = max_execution_time(client).await; - assert_eq!(results, vec![0u64, 1u64]); + max_execution_time(client, false).await; } #[tokio::test] async fn wait_end_of_query() { let client = prepare_database!(); - let results = max_execution_time(client.with_option("wait_end_of_query", "1")).await; - // the entire response is buffered before sending to the client (and it fails) - // so we don't get any results in this case - assert_eq!(results, Vec::::new()); + max_execution_time(client, true).await; } -async fn max_execution_time(client: Client) -> Vec { - #[derive(Debug, clickhouse::Row, serde::Deserialize)] - struct Res { - number: u64, - _sleep: u8, +async fn max_execution_time(mut client: Client, wait_end_of_query: bool) { + if wait_end_of_query { + client = client.with_option("wait_end_of_query", "1") } // TODO: check different `timeout_overflow_mode` let mut cursor = client .with_compression(Compression::None) - // reducing max_block_size to force the server to stream one row at a time - .with_option("max_block_size", "1") - // with sleepEachRow(0.1) this ensures that the query will fail after the second row - .with_option("max_execution_time", "0.25") - .query("SELECT number, sleepEachRow(0.1) AS sleep FROM system.numbers LIMIT 3") - .fetch::() + .with_option("max_execution_time", "0.1") + .query("SELECT toUInt8(65 + number % 5) FROM system.numbers LIMIT 100000000") + .fetch::() .unwrap(); - let mut results: Vec = Vec::new(); + let mut i = 0u64; + let err = loop { match cursor.next().await { - Ok(Some(res)) => { - println!("no: {:?}", res); - results.push(res.number); + Ok(Some(no)) => { + // Check that we haven't parsed something extra. + assert_eq!(no, (65 + i % 5) as u8); + i += 1; } Ok(None) => panic!("DB exception hasn't been found"), Err(err) => break err, } }; + assert!(wait_end_of_query ^ (i != 0)); assert!(err.to_string().contains("TIMEOUT_EXCEEDED")); - results } #[cfg(feature = "lz4")] From 9dfbfca1725e1c7f34e8bf56df30b6715a1f3651 Mon Sep 17 00:00:00 2001 From: slvrtrn Date: Wed, 4 Dec 2024 00:59:44 +0100 Subject: [PATCH 12/12] Remove Cargo.toml comment --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f089e2e..7f98746 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,7 +102,7 @@ lz4_flex = { version = "0.11.3", default-features = false, features = [ cityhash-rs = { version = "=1.0.1", optional = true } # exact version for safety uuid = { version = "1", optional = true } time = { version = "0.3", optional = true } -bstr = { version = "1.11.0", default-features = false } # 1.11.0+ is Rust 1.73+ only +bstr = { version = "1.11.0", default-features = false } quanta = { version = "0.12", optional = true } replace_with = { version = "0.1.7" }