From 91c4092cdd40376da23556781acf4783e44ff7f7 Mon Sep 17 00:00:00 2001 From: OliverNChalk <11343499+OliverNChalk@users.noreply.github.com> Date: Sat, 21 Dec 2024 21:23:26 +0100 Subject: [PATCH] feat: add interval_stream helper --- Cargo.lock | 113 +++++++++++++++++++++++++- Cargo.toml | 8 +- src/lib.rs | 2 +- src/tokio/interval_stream.rs | 67 +++++++++++++++ src/tokio/mod.rs | 9 ++ src/{tokio.rs => tokio/named_task.rs} | 0 6 files changed, 194 insertions(+), 5 deletions(-) create mode 100644 src/tokio/interval_stream.rs create mode 100644 src/tokio/mod.rs rename src/{tokio.rs => tokio/named_task.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index c3a60a3..60c1ad0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + [[package]] name = "backtrace" version = "0.3.74" @@ -91,6 +97,95 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "gimli" version = "0.31.1" @@ -188,6 +283,12 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "powerfmt" version = "0.2.0" @@ -309,6 +410,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.13.2" @@ -399,9 +509,10 @@ dependencies = [ [[package]] name = "toolbox" -version = "0.1.0" +version = "0.2.0" dependencies = [ "const_format", + "futures", "tokio", "tracing", "tracing-appender", diff --git a/Cargo.toml b/Cargo.toml index 69d5079..fe7e894 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "toolbox" -version = "0.1.0" +version = "0.2.0" authors = ["Oliver Chalk"] edition = "2021" readme = "README.md" @@ -14,13 +14,15 @@ disallowed_methods = "warn" [features] default = [] -tokio = ["dep:tokio"] +named_task = ["tokio/rt"] +interval_stream = ["tokio/time", "dep:futures"] tracing = ["dep:const_format", "dep:tracing", "dep:tracing-appender", "dep:tracing-subscriber"] version = ["dep:const_format"] [dependencies] const_format = { version = "0.2.32", optional = true } -tokio = { version = "1.0", features = ["rt"], optional = true } +futures = { version = "0.3.31", optional = true } +tokio = { version = "1.0", optional = true } tracing = { version = "0.1.40", optional = true } tracing-appender = { version = "0.2.3", optional = true } tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"], optional = true } diff --git a/src/lib.rs b/src/lib.rs index 0e0aacd..53a225f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ pub mod fs; -#[cfg(feature = "tokio")] +#[cfg(any(feature = "named_task", feature = "interval_stream"))] pub mod tokio; #[cfg(feature = "tracing")] pub mod tracing; diff --git a/src/tokio/interval_stream.rs b/src/tokio/interval_stream.rs new file mode 100644 index 0000000..364a911 --- /dev/null +++ b/src/tokio/interval_stream.rs @@ -0,0 +1,67 @@ +use std::future::Future; +use std::pin::pin; +use std::task::Poll; + +use futures::{ready, Stream}; + +pub struct IntervalStream +where + Poll: Fn() -> Fut + Unpin, + Fut: Unpin, +{ + interval: tokio::time::Interval, + poll: Poll, + + in_progress: Option, +} + +impl IntervalStream +where + Poll: Fn() -> Fut + Unpin, + Fut: Future + Unpin, +{ + pub fn new(interval: tokio::time::Interval, poll: Poll) -> Self { + IntervalStream { interval, poll, in_progress: None } + } +} + +impl Stream for IntervalStream +where + PollFn: Fn() -> Fut + Unpin, + Fut: Future + Unpin, +{ + type Item = Output; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + + // Poll the current future if one already exists. + if let Some(fut) = &mut this.in_progress { + let output = ready!(pin!(fut).poll(cx)); + this.in_progress = None; + + return Poll::Ready(Some(output)); + } + + // Poll the interval to see if we should create a new future. + ready!(this.interval.poll_tick(cx)); + + // Create a new future. + let mut fut = (this.poll)(); + + // Poll the future. + let pinned = pin!(&mut fut); + let poll = pinned.poll(cx); + match poll { + Poll::Ready(output) => Poll::Ready(Some(output)), + Poll::Pending => { + this.in_progress = Some(fut); + + Poll::Pending + } + } + } +} diff --git a/src/tokio/mod.rs b/src/tokio/mod.rs new file mode 100644 index 0000000..99ebbca --- /dev/null +++ b/src/tokio/mod.rs @@ -0,0 +1,9 @@ +#[cfg(feature = "interval_stream")] +mod interval_stream; +#[cfg(feature = "named_task")] +mod named_task; + +#[cfg(feature = "interval_stream")] +pub use interval_stream::*; +#[cfg(feature = "named_task")] +pub use named_task::*; diff --git a/src/tokio.rs b/src/tokio/named_task.rs similarity index 100% rename from src/tokio.rs rename to src/tokio/named_task.rs