diff --git a/Cargo.lock b/Cargo.lock index c3a60a3..deaa03f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,34 @@ dependencies = [ "memchr", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + [[package]] name = "backtrace" version = "0.3.74" @@ -41,6 +69,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "bytes" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" + [[package]] name = "cfg-if" version = "1.0.0" @@ -91,6 +125,95 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "gimli" version = "0.31.1" @@ -188,6 +311,12 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "powerfmt" version = "0.2.0" @@ -309,6 +438,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.13.2" @@ -397,12 +535,38 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "toolbox" -version = "0.1.0" +version = "0.2.0" dependencies = [ "const_format", + "futures", "tokio", + "tokio-test", "tracing", "tracing-appender", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 69d5079..4445fe9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "toolbox" -version = "0.1.0" +version = "0.2.0" authors = ["Oliver Chalk"] edition = "2021" readme = "README.md" @@ -14,13 +14,18 @@ disallowed_methods = "warn" [features] default = [] -tokio = ["dep:tokio"] +named_task = ["tokio/rt"] +interval_stream = ["tokio/time", "dep:futures"] tracing = ["dep:const_format", "dep:tracing", "dep:tracing-appender", "dep:tracing-subscriber"] version = ["dep:const_format"] [dependencies] const_format = { version = "0.2.32", optional = true } -tokio = { version = "1.0", features = ["rt"], optional = true } +futures = { version = "0.3.31", optional = true } +tokio = { version = "1.0", optional = true } tracing = { version = "0.1.40", optional = true } tracing-appender = { version = "0.2.3", optional = true } tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"], optional = true } + +[dev-dependencies] +tokio-test = "0.4.4" diff --git a/src/lib.rs b/src/lib.rs index 0e0aacd..53a225f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ pub mod fs; -#[cfg(feature = "tokio")] +#[cfg(any(feature = "named_task", feature = "interval_stream"))] pub mod tokio; #[cfg(feature = "tracing")] pub mod tracing; diff --git a/src/tokio/interval_stream.rs b/src/tokio/interval_stream.rs new file mode 100644 index 0000000..36f71da --- /dev/null +++ b/src/tokio/interval_stream.rs @@ -0,0 +1,84 @@ +use std::future::Future; +use std::pin::pin; +use std::task::Poll; + +use futures::{ready, Stream}; + +/// Creates and polls a future on an interval, producing a stream. +/// +/// # Example +/// +/// ```rust +/// # tokio_test::block_on(async { +/// use futures::FutureExt; +/// use futures::StreamExt; +/// use toolbox::tokio::IntervalStream; +/// +/// let slot_interval = tokio::time::interval(std::time::Duration::from_millis(1)); +/// let mut slot_stream = IntervalStream::new( +/// slot_interval, +/// Box::new(|| futures::future::ready("hello world").boxed()), +/// ); +/// +/// assert_eq!(slot_stream.next().await.unwrap(), "hello world"); +/// assert_eq!(slot_stream.next().await.unwrap(), "hello world"); +/// # }) +/// ``` +pub struct IntervalStream +where + Fut: Unpin, +{ + interval: tokio::time::Interval, + poll: Box Fut>, + + in_progress: Option, +} + +impl IntervalStream +where + Fut: Future + Unpin, +{ + pub fn new(interval: tokio::time::Interval, poll: Box Fut>) -> Self { + IntervalStream { interval, poll, in_progress: None } + } +} + +impl Stream for IntervalStream +where + Fut: Future + Unpin, +{ + type Item = Output; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + + // Poll the current future if one already exists. + if let Some(fut) = &mut this.in_progress { + let output = ready!(pin!(fut).poll(cx)); + this.in_progress = None; + + return Poll::Ready(Some(output)); + } + + // Poll the interval to see if we should create a new future. + ready!(this.interval.poll_tick(cx)); + + // Create a new future. + let mut fut = (this.poll)(); + + // Poll the future. + let pinned = pin!(&mut fut); + let poll = pinned.poll(cx); + match poll { + Poll::Ready(output) => Poll::Ready(Some(output)), + Poll::Pending => { + this.in_progress = Some(fut); + + Poll::Pending + } + } + } +} diff --git a/src/tokio/mod.rs b/src/tokio/mod.rs new file mode 100644 index 0000000..1017807 --- /dev/null +++ b/src/tokio/mod.rs @@ -0,0 +1,12 @@ +/// Creates and polls a future on an interval, producing a stream. +#[cfg(feature = "interval_stream")] +mod interval_stream; +/// Allows attaching names to [`tokio::task`] to enable tracking which tasks are +/// exiting. +#[cfg(feature = "named_task")] +mod named_task; + +#[cfg(feature = "interval_stream")] +pub use interval_stream::*; +#[cfg(feature = "named_task")] +pub use named_task::*; diff --git a/src/tokio.rs b/src/tokio/named_task.rs similarity index 100% rename from src/tokio.rs rename to src/tokio/named_task.rs