Skip to content

Commit

Permalink
feat: add interval_stream helper
Browse files Browse the repository at this point in the history
  • Loading branch information
OliverNChalk committed Dec 22, 2024
1 parent 0f2d8ef commit 5bfa986
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 5 deletions.
166 changes: 165 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "toolbox"
version = "0.1.0"
version = "0.2.0"
authors = ["Oliver Chalk"]
edition = "2021"
readme = "README.md"
Expand All @@ -14,13 +14,18 @@ disallowed_methods = "warn"

[features]
default = []
tokio = ["dep:tokio"]
named_task = ["tokio/rt"]
interval_stream = ["tokio/time", "dep:futures"]
tracing = ["dep:const_format", "dep:tracing", "dep:tracing-appender", "dep:tracing-subscriber"]
version = ["dep:const_format"]

[dependencies]
const_format = { version = "0.2.32", optional = true }
tokio = { version = "1.0", features = ["rt"], optional = true }
futures = { version = "0.3.31", optional = true }
tokio = { version = "1.0", optional = true }
tracing = { version = "0.1.40", optional = true }
tracing-appender = { version = "0.2.3", optional = true }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"], optional = true }

[dev-dependencies]
tokio-test = "0.4.4"
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod fs;
#[cfg(feature = "tokio")]
#[cfg(any(feature = "named_task", feature = "interval_stream"))]
pub mod tokio;
#[cfg(feature = "tracing")]
pub mod tracing;
Expand Down
84 changes: 84 additions & 0 deletions src/tokio/interval_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::future::Future;
use std::pin::pin;
use std::task::Poll;

use futures::{ready, Stream};

/// Creates and polls a future on an interval, producing a stream.
///
/// # Example
///
/// ```rust
/// # tokio_test::block_on(async {
/// use futures::FutureExt;
/// use futures::StreamExt;
/// use toolbox::tokio::IntervalStream;
///
/// let slot_interval = tokio::time::interval(std::time::Duration::from_millis(1));
/// let mut slot_stream = IntervalStream::new(
/// slot_interval,
/// Box::new(|| futures::future::ready("hello world").boxed()),
/// );
///
/// assert_eq!(slot_stream.next().await.unwrap(), "hello world");
/// assert_eq!(slot_stream.next().await.unwrap(), "hello world");
/// # })
/// ```
pub struct IntervalStream<Fut>
where
Fut: Unpin,
{
interval: tokio::time::Interval,
poll: Box<dyn Fn() -> Fut>,

in_progress: Option<Fut>,
}

impl<Fut, Output> IntervalStream<Fut>
where
Fut: Future<Output = Output> + Unpin,
{
pub fn new(interval: tokio::time::Interval, poll: Box<dyn Fn() -> Fut>) -> Self {
IntervalStream { interval, poll, in_progress: None }
}
}

impl<Fut, Output> Stream for IntervalStream<Fut>
where
Fut: Future<Output = Output> + Unpin,
{
type Item = Output;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();

// Poll the current future if one already exists.
if let Some(fut) = &mut this.in_progress {
let output = ready!(pin!(fut).poll(cx));
this.in_progress = None;

return Poll::Ready(Some(output));
}

// Poll the interval to see if we should create a new future.
ready!(this.interval.poll_tick(cx));

// Create a new future.
let mut fut = (this.poll)();

// Poll the future.
let pinned = pin!(&mut fut);
let poll = pinned.poll(cx);
match poll {
Poll::Ready(output) => Poll::Ready(Some(output)),
Poll::Pending => {
this.in_progress = Some(fut);

Poll::Pending
}
}
}
}
12 changes: 12 additions & 0 deletions src/tokio/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/// Creates and polls a future on an interval, producing a stream.
#[cfg(feature = "interval_stream")]
mod interval_stream;
/// Allows attaching names to [`tokio::task`] to enable tracking which tasks are
/// exiting.
#[cfg(feature = "named_task")]
mod named_task;

#[cfg(feature = "interval_stream")]
pub use interval_stream::*;
#[cfg(feature = "named_task")]
pub use named_task::*;
File renamed without changes.

0 comments on commit 5bfa986

Please sign in to comment.