Skip to content

Commit

Permalink
feat: add interval_stream helper
Browse files Browse the repository at this point in the history
  • Loading branch information
OliverNChalk committed Dec 21, 2024
1 parent 0f2d8ef commit 91c4092
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 5 deletions.
113 changes: 112 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "toolbox"
version = "0.1.0"
version = "0.2.0"
authors = ["Oliver Chalk"]
edition = "2021"
readme = "README.md"
Expand All @@ -14,13 +14,15 @@ disallowed_methods = "warn"

[features]
default = []
tokio = ["dep:tokio"]
named_task = ["tokio/rt"]
interval_stream = ["tokio/time", "dep:futures"]
tracing = ["dep:const_format", "dep:tracing", "dep:tracing-appender", "dep:tracing-subscriber"]
version = ["dep:const_format"]

[dependencies]
const_format = { version = "0.2.32", optional = true }
tokio = { version = "1.0", features = ["rt"], optional = true }
futures = { version = "0.3.31", optional = true }
tokio = { version = "1.0", optional = true }
tracing = { version = "0.1.40", optional = true }
tracing-appender = { version = "0.2.3", optional = true }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"], optional = true }
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod fs;
#[cfg(feature = "tokio")]
#[cfg(any(feature = "named_task", feature = "interval_stream"))]
pub mod tokio;
#[cfg(feature = "tracing")]
pub mod tracing;
Expand Down
67 changes: 67 additions & 0 deletions src/tokio/interval_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::future::Future;
use std::pin::pin;
use std::task::Poll;

use futures::{ready, Stream};

pub struct IntervalStream<Poll, Fut>
where
Poll: Fn() -> Fut + Unpin,
Fut: Unpin,
{
interval: tokio::time::Interval,
poll: Poll,

in_progress: Option<Fut>,
}

impl<Poll, Fut, Output> IntervalStream<Poll, Fut>
where
Poll: Fn() -> Fut + Unpin,
Fut: Future<Output = Output> + Unpin,
{
pub fn new(interval: tokio::time::Interval, poll: Poll) -> Self {
IntervalStream { interval, poll, in_progress: None }
}
}

impl<PollFn, Fut, Output> Stream for IntervalStream<PollFn, Fut>
where
PollFn: Fn() -> Fut + Unpin,
Fut: Future<Output = Output> + Unpin,
{
type Item = Output;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();

// Poll the current future if one already exists.
if let Some(fut) = &mut this.in_progress {
let output = ready!(pin!(fut).poll(cx));
this.in_progress = None;

return Poll::Ready(Some(output));
}

// Poll the interval to see if we should create a new future.
ready!(this.interval.poll_tick(cx));

// Create a new future.
let mut fut = (this.poll)();

// Poll the future.
let pinned = pin!(&mut fut);
let poll = pinned.poll(cx);
match poll {
Poll::Ready(output) => Poll::Ready(Some(output)),
Poll::Pending => {
this.in_progress = Some(fut);

Poll::Pending
}
}
}
}
9 changes: 9 additions & 0 deletions src/tokio/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#[cfg(feature = "interval_stream")]
mod interval_stream;
#[cfg(feature = "named_task")]
mod named_task;

#[cfg(feature = "interval_stream")]
pub use interval_stream::*;
#[cfg(feature = "named_task")]
pub use named_task::*;
File renamed without changes.

0 comments on commit 91c4092

Please sign in to comment.