diff --git a/README.md b/README.md index e46715c..0383bb4 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ Let's build real-time (multi-threaded, no GC), self-contained (aka easy to deplo Add the following to your `Cargo.toml` ```toml -sea-streamer = { version = "0", features = ["kafka", "stdio", "socket", "runtime-tokio"] } +sea-streamer = { version = "0", features = ["kafka", "redis", "stdio", "socket", "runtime-tokio"] } ``` Here is a basic [stream consumer](https://github.com/SeaQL/sea-streamer/tree/main/examples/src/bin/consumer.rs): @@ -57,9 +57,8 @@ async fn main() -> Result<()> { let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?; let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime); - options.set_kafka_consumer_options(|options| { - options.set_auto_offset_reset(AutoOffsetReset::Earliest); - }); + options.set_auto_stream_reset(SeaStreamReset::Earliest); + let consumer: SeaConsumer = streamer .create_consumer(stream.stream_keys(), options) .await?; @@ -131,15 +130,18 @@ async fn main() -> Result<()> { Now, let's put them into action. -With Kafka: +With Redis / Kafka: ```shell +STREAMER_URI="redis://localhost:6379" # or +STREAMER_URI="kafka://localhost:9092" + # Produce some input -cargo run --bin producer -- --stream kafka://localhost:9092/hello1 & +cargo run --bin producer -- --stream $STREAMER_URI/hello1 & # Start the processor, producing some output -cargo run --bin processor -- --input kafka://localhost:9092/hello1 --output kafka://localhost:9092/hello2 & +cargo run --bin processor -- --input $STREAMER_URI/hello1 --output $STREAMER_URI/hello2 & # Replay the output -cargo run --bin consumer -- --stream kafka://localhost:9092/hello2 +cargo run --bin consumer -- --stream $STREAMER_URI/hello2 # Remember to stop the processes kill %1 %2 ``` @@ -295,6 +297,8 @@ In Redis, shards and nodes is a M-N mapping - shards can be moved among nodes *a It makes testing much more difficult. Let us know if you'd like to help! +This crate is built on top of [`redis`](https://docs.rs/redis). + ### `sea-streamer-stdio`: Standard I/O Backend This is the `stdio` backend implementation for SeaStreamer. It is designed to be connected together with unix pipes, diff --git a/sea-streamer-redis/README.md b/sea-streamer-redis/README.md index 21dd79e..27123a3 100644 --- a/sea-streamer-redis/README.md +++ b/sea-streamer-redis/README.md @@ -50,3 +50,5 @@ It's quite a difficult task, because clients have to take responsibility when wo In Redis, shards and nodes is a M-N mapping - shards can be moved among nodes *at any time*. It makes testing much more difficult. Let us know if you'd like to help! + +This crate is built on top of [`redis`](https://docs.rs/redis). diff --git a/sea-streamer-redis/src/lib.rs b/sea-streamer-redis/src/lib.rs index 28be537..bfc47fb 100644 --- a/sea-streamer-redis/src/lib.rs +++ b/sea-streamer-redis/src/lib.rs @@ -50,6 +50,8 @@ //! In Redis, shards and nodes is a M-N mapping - shards can be moved among nodes *at any time*. //! It makes testing much more difficult. //! Let us know if you'd like to help! +//! +//! This crate is built on top of [`redis`](https://docs.rs/redis). #![cfg_attr(docsrs, feature(doc_cfg))] #![deny(missing_debug_implementations)] diff --git a/src/lib.rs b/src/lib.rs index 23358ab..4dccb60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,9 +57,8 @@ //! let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?; //! //! let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime); -//! options.set_kafka_consumer_options(|options| { -//! options.set_auto_offset_reset(AutoOffsetReset::Earliest); -//! }); +//! options.set_auto_stream_reset(SeaStreamReset::Earliest); +//! //! let consumer: SeaConsumer = streamer //! .create_consumer(stream.stream_keys(), options) //! .await?; @@ -131,15 +130,18 @@ //! //! Now, let's put them into action. //! -//! With Kafka: +//! With Redis / Kafka: //! //! ```shell +//! STREAMER_URI="redis://localhost:6379" # or +//! STREAMER_URI="kafka://localhost:9092" +//! //! # Produce some input -//! cargo run --bin producer -- --stream kafka://localhost:9092/hello1 & +//! cargo run --bin producer -- --stream $STREAMER_URI/hello1 & //! # Start the processor, producing some output -//! cargo run --bin processor -- --input kafka://localhost:9092/hello1 --output kafka://localhost:9092/hello2 & +//! cargo run --bin processor -- --input $STREAMER_URI/hello1 --output $STREAMER_URI/hello2 & //! # Replay the output -//! cargo run --bin consumer -- --stream kafka://localhost:9092/hello2 +//! cargo run --bin consumer -- --stream $STREAMER_URI/hello2 //! # Remember to stop the processes //! kill %1 %2 //! ```