From 64f22da5d9c7f016c3a6a1034ffd6948cec84670 Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Sat, 30 Dec 2023 12:06:55 -0700 Subject: [PATCH 1/6] Ensure disconnected sockets do not block the caller --- src/tokio/io.rs | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/src/tokio/io.rs b/src/tokio/io.rs index 803d7e7..4667ba3 100644 --- a/src/tokio/io.rs +++ b/src/tokio/io.rs @@ -379,6 +379,10 @@ where T: UnderlyingIo + AsyncWrite, C: Clone + Send + Unpin + 'static, { + /// Method for writing to the underlying IO item. + /// If the write results in a disconnect, the write is skipped and the + /// underlying IO item will attempt to be reconnected. + /// No error is returned to the caller. fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -389,15 +393,23 @@ where let poll = AsyncWrite::poll_write(Pin::new(&mut self.underlying_io), cx, buf); if self.is_write_disconnect_detected(&poll) { + error!( + "{}Write disconnect detected. Skipping message", + &self.get_connection_name() + ); self.on_disconnect(cx); - Poll::Pending + Poll::Ready(Ok(buf.len())) } else { poll } } Status::Disconnected(_) => { + error!( + "{}Write disconnect detected. Skipping Message", + &self.get_connection_name() + ); self.poll_disconnect(cx); - Poll::Pending + Poll::Ready(Ok(buf.len())) } Status::FailedAndExhausted => exhausted_err(), } @@ -439,6 +451,10 @@ where } } + /// Method for writing to the underlying IO item. + /// If the write results in a disconnect, the write is skipped and the + /// underlying IO item will attempt to be reconnected. + /// No error is returned to the caller. fn poll_write_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -450,15 +466,23 @@ where AsyncWrite::poll_write_vectored(Pin::new(&mut self.underlying_io), cx, bufs); if self.is_write_disconnect_detected(&poll) { + error!( + "{}Write disconnect detected. Skipping message", + &self.get_connection_name() + ); self.on_disconnect(cx); - Poll::Pending + Poll::Ready(Ok(bufs.iter().map(|buf| buf.len()).sum())) } else { poll } } Status::Disconnected(_) => { + error!( + "{}Write disconnect detected. Skipping Message", + &self.get_connection_name() + ); self.poll_disconnect(cx); - Poll::Pending + Poll::Ready(Ok(bufs.iter().map(|buf| buf.len()).sum())) } Status::FailedAndExhausted => exhausted_err(), } From df6416ce5e5cfcd1eaa0450a9cdcf128037328c2 Mon Sep 17 00:00:00 2001 From: "Caleb Leinz (he/him)" <103841857+cmleinz@users.noreply.github.com> Date: Wed, 13 Dec 2023 08:17:17 -0800 Subject: [PATCH 2/6] Dependency updates Doc changes and logging changes Option to toggle write failure behavior Ensure multiple shutdown attempts behave correctly (#25) * Ensure multiple shutdown attempts behave correctly --------- Authored-by: Caleb Leinz --- Cargo.lock | 42 ++++++------- Cargo.toml | 8 +-- README.md | 2 +- src/config.rs | 10 ++++ src/tokio/io.rs | 118 +++++++++++++++++++++++++++---------- tests/integration_tests.rs | 28 +++++++++ 6 files changed, 151 insertions(+), 57 deletions(-) create mode 100644 tests/integration_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 34e71b5..0cfb3ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,9 +61,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "futures" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -76,9 +76,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -86,15 +86,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -103,15 +103,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-macro" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", @@ -120,21 +120,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", @@ -292,7 +292,7 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "sdre-stubborn-io" -version = "0.4.4" +version = "0.5.0" dependencies = [ "futures", "log", @@ -333,9 +333,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.34.0" +version = "1.35.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", "bytes", diff --git a/Cargo.toml b/Cargo.toml index b04d733..de40cb4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sdre-stubborn-io" -version = "0.4.4" +version = "0.5.0" authors = ["David Raifaizen ", "Fred Clausen"] edition = "2021" description = "Forked from https://github.com/craftytrickster/stubborn-io. io traits/structs that automatically recover from potential disconnections/interruptions." @@ -11,11 +11,11 @@ documentation = "https://docs.rs/sdre-stubborn-io" readme = "README.md" [dependencies] -tokio = { version = "1.34.0", features = ["time", "net"] } +tokio = { version = "1.35.1", features = ["time", "net"] } log = "0.4.20" rand = "0.8.5" [dev-dependencies] -tokio = { version = "1.34.0", features = ["macros", "rt", "fs", "io-util"] } +tokio = { version = "1.35.1", features = ["macros", "rt", "fs", "io-util"] } tokio-util = { version = "0.7.10", features = ["codec"] } -futures = "0.3.29" +futures = "0.3.30" diff --git a/README.md b/README.md index 50de2e9..82595e6 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ This project has been forked from [stubborn-io](https://github.com/craftytrickst ## Documentation API Documentation, examples and motivations can be found here - -(Rust Docs) . +(Rust Docs) . Only change to the documentation in this fork will be the addition of the `ReconnectionOptions` struct, which adds `with_connection_name(name: &str)` as a method to the `StubbornTcpStream` struct. This allows for the naming of the connection, which is useful for logging purposes. diff --git a/src/config.rs b/src/config.rs index 69c2b20..68cc080 100644 --- a/src/config.rs +++ b/src/config.rs @@ -26,6 +26,10 @@ pub struct ReconnectOptions { pub on_connect_fail_callback: Box, pub connection_name: String, + + /// If this is set to false (default), then the StubbornIo will NOT block + /// On write failures. + pub block_on_write_failures: bool, } impl ReconnectOptions { @@ -41,6 +45,7 @@ impl ReconnectOptions { on_disconnect_callback: Box::new(|| {}), on_connect_fail_callback: Box::new(|| {}), connection_name: String::new(), + block_on_write_failures: false, } } @@ -97,4 +102,9 @@ impl ReconnectOptions { self.connection_name = name.into(); self } + + pub fn with_block_on_write_failures(mut self, value: bool) -> Self { + self.block_on_write_failures = value; + self + } } diff --git a/src/tokio/io.rs b/src/tokio/io.rs index 4667ba3..d8f5eb0 100644 --- a/src/tokio/io.rs +++ b/src/tokio/io.rs @@ -1,5 +1,5 @@ use crate::config::ReconnectOptions; -use log::{error, info}; +use log::{error, info, warn}; use std::future::Future; use std::io::{self, ErrorKind, IoSlice}; use std::marker::PhantomData; @@ -95,12 +95,24 @@ enum Status { FailedAndExhausted, // the way one feels after programming in dynamically typed languages } +#[inline] +fn poll_err( + kind: ErrorKind, + reason: impl Into>, +) -> Poll> { + let io_err = io::Error::new(kind, reason); + Poll::Ready(Err(io_err)) +} + fn exhausted_err() -> Poll> { - let io_err = io::Error::new( + poll_err( ErrorKind::NotConnected, "Disconnected. Connection attempts have been exhausted.", - ); - Poll::Ready(Err(io_err)) + ) +} + +fn disconnected_err() -> Poll> { + poll_err(ErrorKind::NotConnected, "Underlying I/O is disconnected.") } impl Deref for StubbornIo { @@ -147,6 +159,10 @@ where self.options.connection_name.format_name() } + pub fn get_block_on_write_failures(&self) -> bool { + self.options.block_on_write_failures.clone() + } + pub async fn connect_with_options(ctor_arg: C, options: ReconnectOptions) -> io::Result { let tcp = match T::establish(ctor_arg.clone()).await { Ok(tcp) => { @@ -380,8 +396,8 @@ where C: Clone + Send + Unpin + 'static, { /// Method for writing to the underlying IO item. - /// If the write results in a disconnect, the write is skipped and the - /// underlying IO item will attempt to be reconnected. + /// If the write results in a disconnect. If ReconectOptions::block_on_write_failures is true, + /// Poll::Pending is returned to the caller and the buffer is held. Otherwise, the write is skipped /// No error is returned to the caller. fn poll_write( mut self: Pin<&mut Self>, @@ -393,24 +409,44 @@ where let poll = AsyncWrite::poll_write(Pin::new(&mut self.underlying_io), cx, buf); if self.is_write_disconnect_detected(&poll) { + if !self.get_block_on_write_failures() { + error!( + "{}Write disconnect detected. Skipping message", + &self.get_connection_name() + ); + + self.on_disconnect(cx); + Poll::Ready(Ok(buf.len())) + } else { + warn!( + "{}Write disconnect detected. Blocking on write", + &self.get_connection_name() + ); + self.on_disconnect(cx); + Poll::Pending + } + } else { + poll + } + } + Status::Disconnected(_) => { + if !self.get_block_on_write_failures() { error!( - "{}Write disconnect detected. Skipping message", + "{}Write disconnect detected. Skipping Message", &self.get_connection_name() ); - self.on_disconnect(cx); + + self.poll_disconnect(cx); Poll::Ready(Ok(buf.len())) } else { - poll + warn!( + "{}Write disconnect detected. Blocking on write", + &self.get_connection_name() + ); + self.poll_disconnect(cx); + Poll::Pending } } - Status::Disconnected(_) => { - error!( - "{}Write disconnect detected. Skipping Message", - &self.get_connection_name() - ); - self.poll_disconnect(cx); - Poll::Ready(Ok(buf.len())) - } Status::FailedAndExhausted => exhausted_err(), } } @@ -446,14 +482,14 @@ where poll } - Status::Disconnected(_) => Poll::Pending, + Status::Disconnected(_) => disconnected_err(), Status::FailedAndExhausted => exhausted_err(), } } /// Method for writing to the underlying IO item. - /// If the write results in a disconnect, the write is skipped and the - /// underlying IO item will attempt to be reconnected. + /// If the write results in a disconnect. If ReconectOptions::block_on_write_failures is true, + /// Poll::Pending is returned to the caller and the buffer is held. Otherwise, the write is skipped /// No error is returned to the caller. fn poll_write_vectored( mut self: Pin<&mut Self>, @@ -466,24 +502,44 @@ where AsyncWrite::poll_write_vectored(Pin::new(&mut self.underlying_io), cx, bufs); if self.is_write_disconnect_detected(&poll) { + if !self.get_block_on_write_failures() { + error!( + "{}Write disconnect detected. Skipping message", + &self.get_connection_name() + ); + + self.on_disconnect(cx); + Poll::Ready(Ok(bufs.iter().map(|buf| buf.len()).sum())) + } else { + warn!( + "{}Write disconnect detected. Blocking on write", + &self.get_connection_name() + ); + self.on_disconnect(cx); + Poll::Pending + } + } else { + poll + } + } + Status::Disconnected(_) => { + if !self.get_block_on_write_failures() { error!( - "{}Write disconnect detected. Skipping message", + "{}Write disconnect detected. Skipping Message", &self.get_connection_name() ); - self.on_disconnect(cx); + + self.poll_disconnect(cx); Poll::Ready(Ok(bufs.iter().map(|buf| buf.len()).sum())) } else { - poll + warn!( + "{}Write disconnect detected. Blocking on write", + &self.get_connection_name() + ); + self.poll_disconnect(cx); + Poll::Pending } } - Status::Disconnected(_) => { - error!( - "{}Write disconnect detected. Skipping Message", - &self.get_connection_name() - ); - self.poll_disconnect(cx); - Poll::Ready(Ok(bufs.iter().map(|buf| buf.len()).sum())) - } Status::FailedAndExhausted => exhausted_err(), } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs new file mode 100644 index 0000000..7c31127 --- /dev/null +++ b/tests/integration_tests.rs @@ -0,0 +1,28 @@ +use std::time::Duration; + +use stubborn_io::StubbornTcpStream; +use tokio::{io::AsyncWriteExt, sync::oneshot}; + +#[tokio::test] +async fn back_to_back_shutdown_attempts() { + let (port_tx, port_rx) = oneshot::channel(); + tokio::spawn(async move { + let mut streams = Vec::new(); + let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + port_tx.send(addr).unwrap(); + loop { + let (stream, _addr) = listener.accept().await.unwrap(); + streams.push(stream); + } + }); + let addr = port_rx.await.unwrap(); + let mut connection = StubbornTcpStream::connect(addr).await.unwrap(); + + connection.shutdown().await.unwrap(); + let elapsed = tokio::time::timeout(Duration::from_secs(5), connection.shutdown()).await; + + let result = elapsed.unwrap(); + let error = result.unwrap_err(); + assert_eq!(error.kind(), std::io::ErrorKind::NotConnected); +} From a3af82f077b04727d3e8eb980598d97da44d52e7 Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Sat, 30 Dec 2023 13:39:20 -0700 Subject: [PATCH 3/6] Fix namespace issue with upstream merge --- tests/integration_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 7c31127..519cca5 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use stubborn_io::StubbornTcpStream; +use sdre_stubborn_io::StubbornTcpStream; use tokio::{io::AsyncWriteExt, sync::oneshot}; #[tokio::test] From b2a4ae51ee819185583125301479a255b5ff2950 Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Sat, 30 Dec 2023 13:39:31 -0700 Subject: [PATCH 4/6] Fix github action --- .github/workflows/pr-validator.yml | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/.github/workflows/pr-validator.yml b/.github/workflows/pr-validator.yml index 9a75f17..4b2e0fe 100644 --- a/.github/workflows/pr-validator.yml +++ b/.github/workflows/pr-validator.yml @@ -10,11 +10,8 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v1 - - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - + - uses: actions/checkout@v4.1.1 + - uses: hecrj/setup-rust-action@v2.0.0 - name: Format check run: cargo fmt -- --check - name: Build and Lint From 3e073c46277f79688abdbdd4925a5b7688af90f7 Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Sat, 30 Dec 2023 13:40:33 -0700 Subject: [PATCH 5/6] fix clippy error --- src/tokio/io.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tokio/io.rs b/src/tokio/io.rs index d8f5eb0..2df0aef 100644 --- a/src/tokio/io.rs +++ b/src/tokio/io.rs @@ -160,7 +160,7 @@ where } pub fn get_block_on_write_failures(&self) -> bool { - self.options.block_on_write_failures.clone() + self.options.block_on_write_failures } pub async fn connect_with_options(ctor_arg: C, options: ReconnectOptions) -> io::Result { From 97047ad67f8295cf3951b1f373dcb55ec68f622f Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Sat, 30 Dec 2023 13:45:00 -0700 Subject: [PATCH 6/6] fix doc tests --- src/config.rs | 2 +- src/lib.rs | 2 +- src/strategies.rs | 2 +- src/tokio/tcp.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/config.rs b/src/config.rs index 68cc080..74027ca 100644 --- a/src/config.rs +++ b/src/config.rs @@ -55,7 +55,7 @@ impl ReconnectOptions { /// /// ``` /// use std::time::Duration; - /// use stubborn_io::ReconnectOptions; + /// use sdre_stubborn_io::ReconnectOptions; /// /// // With the below vector, the stubborn-io item will try to reconnect three times, /// // waiting 2 seconds between each attempt. Once all three tries are exhausted, diff --git a/src/lib.rs b/src/lib.rs index 8a6ac94..7389040 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,7 +24,7 @@ //! use std::future::Future; //! use std::path::PathBuf; //! use std::pin::Pin; -//! use stubborn_io::tokio::{StubbornIo, UnderlyingIo}; +//! use sdre_stubborn_io::tokio::{StubbornIo, UnderlyingIo}; //! use tokio::fs::File; //! //! struct MyFile(File); // Struct must implement AsyncRead + AsyncWrite diff --git a/src/strategies.rs b/src/strategies.rs index f23be0a..87466ce 100644 --- a/src/strategies.rs +++ b/src/strategies.rs @@ -7,7 +7,7 @@ use std::time::Duration; /// /// ``` /// use std::time::Duration; -/// use stubborn_io::{ReconnectOptions, strategies::ExpBackoffStrategy}; +/// use sdre_stubborn_io::{ReconnectOptions, strategies::ExpBackoffStrategy}; /// /// // With the below strategy, the stubborn-io item will try to reconnect infinitely, /// // waiting an exponentially increasing (by 2) value with 5% random jitter. Once the diff --git a/src/tokio/tcp.rs b/src/tokio/tcp.rs index e4286e9..6fe910d 100644 --- a/src/tokio/tcp.rs +++ b/src/tokio/tcp.rs @@ -17,7 +17,7 @@ where /// distinction that it will automatically attempt to reconnect in the face of connectivity failures. /// /// ``` -/// use stubborn_io::StubbornTcpStream; +/// use sdre_stubborn_io::StubbornTcpStream; /// use tokio::io::AsyncWriteExt; /// /// let addr = "localhost:8080";