From 95569f18fb2aaaff56fe68fee7d49592679d63e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Sat, 4 Apr 2020 16:26:46 +0100 Subject: [PATCH 1/5] add pub sub client implementation with examples --- examples/pub.rs | 31 +++++- examples/sub.rs | 38 +++++++- src/client.rs | 226 +++++++++++++++++++++++++++++++++++++++++-- src/cmd/publish.rs | 13 ++- src/cmd/subscribe.rs | 26 ++++- src/server.rs | 2 +- 6 files changed, 316 insertions(+), 20 deletions(-) diff --git a/examples/pub.rs b/examples/pub.rs index 607aa27..0a5d05d 100644 --- a/examples/pub.rs +++ b/examples/pub.rs @@ -1,4 +1,31 @@ +//! Publish to a redis channel example. +//! +//! A simple client that connects to a mini-redis server, and +//! publishes a message on `foo` channel +//! +//! You can test this out by running: +//! +//! cargo run --bin server +//! +//! Then in another terminal run: +//! +//! cargo run --example sub +//! +//! And then in another terminal run: +//! +//! cargo run --example pub + +#![warn(rust_2018_idioms)] + +use mini_redis::{client, Result}; + #[tokio::main] -async fn main() { - unimplemented!(); +async fn main() -> Result<()> { + // Open a connection to the mini-redis address. + let mut client = client::connect("127.0.0.1:6379").await?; + + // publish message `bar` on channel foo + client.publish("foo", "bar".into()).await?; + + Ok(()) } diff --git a/examples/sub.rs b/examples/sub.rs index eda175c..97823e0 100644 --- a/examples/sub.rs +++ b/examples/sub.rs @@ -1,6 +1,38 @@ -/// Subscribe to a redis channel +//! Subscribe to a redis channel example. +//! +//! A simple client that connects to a mini-redis server, subscribes to "foo" and "bar" channels +//! and awaits messages published on those channels +//! +//! You can test this out by running: +//! +//! cargo run --bin server +//! +//! Then in another terminal run: +//! +//! cargo run --example sub +//! +//! And then in another terminal run: +//! +//! cargo run --example pub + +#![warn(rust_2018_idioms)] + +use mini_redis::{client, Result}; +use tokio::stream::StreamExt; #[tokio::main] -async fn main() { - unimplemented!(); +pub async fn main() -> Result<()> { + // Open a connection to the mini-redis address. + let client = client::connect("127.0.0.1:6379").await?; + + + // subscribe to channel foo + let mut result = client.subscribe(vec!["foo".into()]).await?; + + // await messages on channel foo + while let Some(Ok(msg)) = result.next().await { + println!("got message from the channel: {}; message = {:?}", msg.channel, msg.content); + } + + Ok(()) } diff --git a/src/client.rs b/src/client.rs index 0568976..beb3963 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,14 @@ +use crate::cmd::{Get, Publish, Set, Subscribe, Unsubscribe}; use crate::{Connection, Frame}; -use crate::cmd::{Get, Set}; use bytes::Bytes; +use std::future::Future; use std::io::{Error, ErrorKind}; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; use tokio::net::{TcpStream, ToSocketAddrs}; +use tokio::stream::Stream; use tracing::{debug, instrument}; /// Mini asynchronous Redis client @@ -47,7 +51,31 @@ impl Client { key: key.to_string(), value: value, expire: None, - }).await + }) + .await + } + + /// publish `message` on the `channel` + #[instrument(skip(self))] + pub async fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result { + self.publish_cmd(Publish { + channel: channel.to_string(), + message: message, + }) + .await + } + + /// subscribe to the list of channels + /// when client sends `SUBSCRIBE`, server's handle for client start's accepting only + /// `SUBSCRIBE` and `UNSUBSCRIBE` commands so we consume client and return Subscribe + #[instrument(skip(self))] + pub async fn subscribe(mut self, channels: Vec) -> crate::Result { + let subscribed_channels = self.subscribe_cmd(Subscribe { channels: channels }).await?; + + Ok(Subscriber { + conn: self.conn, + subscribed_channels, + }) } /// Set the value of a key to `value`. The value expires after `expiration`. @@ -62,7 +90,8 @@ impl Client { key: key.to_string(), value: value.into(), expire: Some(expiration), - }).await + }) + .await } async fn set_cmd(&mut self, cmd: Set) -> crate::Result<()> { @@ -81,6 +110,52 @@ impl Client { } } + async fn publish_cmd(&mut self, cmd: Publish) -> crate::Result { + // Convert the `Publish` command into a frame + let frame = cmd.into_frame(); + + debug!(request = ?frame); + + // Write the frame to the socket + self.conn.write_frame(&frame).await?; + + // Read the response + match self.read_response().await? { + Frame::Integer(response) => Ok(response), + frame => Err(frame.to_error()), + } + } + + async fn subscribe_cmd(&mut self, cmd: Subscribe) -> crate::Result> { + // Convert the `Subscribe` command into a frame + let channels = cmd.channels.clone(); + let frame = cmd.into_frame(); + + debug!(request = ?frame); + + // Write the frame to the socket + self.conn.write_frame(&frame).await?; + + // Read the response + for channel in &channels { + let response = self.read_response().await?; + match response { + Frame::Array(ref frame) => match frame.as_slice() { + [subscribe, schannel] + if subscribe.to_string() == "subscribe" + && &schannel.to_string() == channel => + { + () + } + _ => return Err(response.to_error()), + }, + frame => return Err(frame.to_error()), + }; + } + + Ok(channels) + } + /// Reads a response frame from the socket. If an `Error` frame is read, it /// is converted to `Err`. async fn read_response(&mut self) -> crate::Result { @@ -89,20 +164,155 @@ impl Client { debug!(?response); match response { - Some(Frame::Error(msg)) => { - Err(msg.into()) + Some(Frame::Error(msg)) => Err(msg.into()), + Some(frame) => Ok(frame), + None => { + // Receiving `None` here indicates the server has closed the + // connection without sending a frame. This is unexpected and is + // represented as a "connection reset by peer" error. + let err = Error::new(ErrorKind::ConnectionReset, "connection reset by server"); + + Err(err.into()) } + } + } +} + +pub struct Subscriber { + conn: Connection, + subscribed_channels: Vec, +} + +impl Subscriber { + /// Subscribe to a list of new channels + #[instrument(skip(self))] + pub async fn subscribe(&mut self, channels: Vec) -> crate::Result<()> { + let cmd = Subscribe { channels: channels }; + + let channels = cmd.channels.clone(); + let frame = cmd.into_frame(); + + debug!(request = ?frame); + + // Write the frame to the socket + self.conn.write_frame(&frame).await?; + + // Read the response + for channel in &channels { + let response = self.read_response().await?; + match response { + Frame::Array(ref frame) => match frame.as_slice() { + [subscribe, schannel] + if &subscribe.to_string() == "subscribe" + && &schannel.to_string() == channel => + { + () + } + _ => return Err(response.to_error()), + }, + frame => return Err(frame.to_error()), + }; + } + + self.subscribed_channels.extend(channels); + + Ok(()) + } + + /// Unsubscribe to a list of new channels + #[instrument(skip(self))] + pub async fn unsubscribe(&mut self, channels: Vec) -> crate::Result<()> { + let cmd = Unsubscribe { channels: channels }; + + let mut channels = cmd.channels.clone(); + let frame = cmd.into_frame(); + + debug!(request = ?frame); + + // Write the frame to the socket + self.conn.write_frame(&frame).await?; + + // if the input channel list is empty, server acknowledges as unsubscribing + // from all subscribed channels, so we assert that the unsubscribe list received + // matches the client subscribed one + if channels.is_empty() { + channels = self.subscribed_channels.clone(); + } + + // Read the response + for channel in &channels { + let response = self.read_response().await?; + match response { + Frame::Array(ref frame) => match frame.as_slice() { + [unsubscribe, uchannel] + if &unsubscribe.to_string() == "unsubscribe" + && &uchannel.to_string() == channel => + { + self.subscribed_channels + .retain(|channel| channel != &uchannel.to_string()); + } + _ => return Err(response.to_error()), + }, + frame => return Err(frame.to_error()), + }; + } + + Ok(()) + } + + /// Reads a response frame from the socket. If an `Error` frame is read, it + /// is converted to `Err`. + async fn read_response(&mut self) -> crate::Result { + let response = self.conn.read_frame().await?; + + debug!(?response); + + match response { + Some(Frame::Error(msg)) => Err(msg.into()), Some(frame) => Ok(frame), None => { // Receiving `None` here indicates the server has closed the // connection without sending a frame. This is unexpected and is // represented as a "connection reset by peer" error. - let err = Error::new( - ErrorKind::ConnectionReset, - "connection reset by server"); + let err = Error::new(ErrorKind::ConnectionReset, "connection reset by server"); Err(err.into()) } } } } + +impl Stream for Subscriber { + type Item = crate::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut read_frame = Box::pin(self.conn.read_frame()); + match Pin::new(&mut read_frame).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(None)) => Poll::Ready(None), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err.into()))), + Poll::Ready(Ok(Some(mframe))) => { + debug!(?mframe); + match mframe { + Frame::Array(ref frame) => match frame.as_slice() { + [message, channel, content] if &message.to_string() == "message" => { + Poll::Ready(Some(Ok(Message { + channel: channel.to_string(), + content: Bytes::from(content.to_string()), + }))) + } + _ => Poll::Ready(Some(Err(mframe.to_error()))), + }, + frame => Poll::Ready(Some(Err(frame.to_error()))), + } + } + } + } +} + +/// A message received on a subscribed channel +#[derive(Debug, Clone)] +pub struct Message { + pub channel: String, + pub content: Bytes, +} diff --git a/src/cmd/publish.rs b/src/cmd/publish.rs index 7e937f2..dc13a7e 100644 --- a/src/cmd/publish.rs +++ b/src/cmd/publish.rs @@ -4,8 +4,8 @@ use bytes::Bytes; #[derive(Debug)] pub struct Publish { - channel: String, - message: Bytes, + pub(crate) channel: String, + pub(crate) message: Bytes, } impl Publish { @@ -24,4 +24,13 @@ impl Publish { dst.write_frame(&response).await?; Ok(()) } + + pub(crate) fn into_frame(self) -> Frame { + let mut frame = Frame::array(); + frame.push_bulk(Bytes::from("publish".as_bytes())); + frame.push_bulk(Bytes::from(self.channel.into_bytes())); + frame.push_bulk(self.message); + + frame + } } diff --git a/src/cmd/subscribe.rs b/src/cmd/subscribe.rs index d97202e..ed6bce5 100644 --- a/src/cmd/subscribe.rs +++ b/src/cmd/subscribe.rs @@ -1,5 +1,5 @@ -use crate::{Command, Connection, Db, Frame, Shutdown}; use crate::cmd::{Parse, ParseError}; +use crate::{Command, Connection, Db, Frame, Shutdown}; use bytes::Bytes; use tokio::select; @@ -7,12 +7,12 @@ use tokio::stream::{StreamExt, StreamMap}; #[derive(Debug)] pub struct Subscribe { - channels: Vec, + pub(crate) channels: Vec, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Unsubscribe { - channels: Vec, + pub(crate) channels: Vec, } impl Subscribe { @@ -147,6 +147,15 @@ impl Subscribe { }; } } + + pub(crate) fn into_frame(self) -> Frame { + let mut frame = Frame::array(); + frame.push_bulk(Bytes::from("subscribe".as_bytes())); + for channel in self.channels { + frame.push_bulk(Bytes::from(channel.into_bytes())); + } + frame + } } impl Unsubscribe { @@ -166,4 +175,13 @@ impl Unsubscribe { Ok(Unsubscribe { channels }) } + + pub(crate) fn into_frame(self) -> Frame { + let mut frame = Frame::array(); + frame.push_bulk(Bytes::from("unsubscribe".as_bytes())); + for channel in self.channels { + frame.push_bulk(Bytes::from(channel.into_bytes())); + } + frame + } } diff --git a/src/server.rs b/src/server.rs index 4710563..2f76f65 100644 --- a/src/server.rs +++ b/src/server.rs @@ -57,8 +57,8 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result< tokio::select! { res = server.run() => { if let Err(err) = res { - // TODO: gracefully handle this error error!(cause = %err, "failed to accept"); + return Err(err.into()); } } _ = shutdown => { From 96c1a1ab83f272fa5da98ab177f585dbd3e17be7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Sat, 4 Apr 2020 19:10:08 +0100 Subject: [PATCH 2/5] replace subscribed_channels list Vec with HashSet to avoid duplicates --- src/client.rs | 17 ++++++++++------- src/cmd/subscribe.rs | 4 ++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/client.rs b/src/client.rs index beb3963..30297c1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,6 +4,8 @@ use crate::{Connection, Frame}; use bytes::Bytes; use std::future::Future; use std::io::{Error, ErrorKind}; +use std::iter::FromIterator; +use std::collections::HashSet; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; @@ -66,11 +68,13 @@ impl Client { } /// subscribe to the list of channels - /// when client sends `SUBSCRIBE`, server's handle for client start's accepting only - /// `SUBSCRIBE` and `UNSUBSCRIBE` commands so we consume client and return Subscribe + /// when client sends a `SUBSCRIBE` command, server's handle for client enters a mode where only + /// `SUBSCRIBE` and `UNSUBSCRIBE` commands are allowed, so we consume client and return Subscribe type + /// which only allows `SUBSCRIBE` and `UNSUBSCRIBE` commands #[instrument(skip(self))] pub async fn subscribe(mut self, channels: Vec) -> crate::Result { - let subscribed_channels = self.subscribe_cmd(Subscribe { channels: channels }).await?; + let channels = self.subscribe_cmd(Subscribe { channels: channels }).await?; + let subscribed_channels = HashSet::from_iter(channels); Ok(Subscriber { conn: self.conn, @@ -180,7 +184,7 @@ impl Client { pub struct Subscriber { conn: Connection, - subscribed_channels: Vec, + subscribed_channels: HashSet, } impl Subscriber { @@ -236,7 +240,7 @@ impl Subscriber { // from all subscribed channels, so we assert that the unsubscribe list received // matches the client subscribed one if channels.is_empty() { - channels = self.subscribed_channels.clone(); + channels = Vec::from_iter(self.subscribed_channels.clone()); } // Read the response @@ -248,8 +252,7 @@ impl Subscriber { if &unsubscribe.to_string() == "unsubscribe" && &uchannel.to_string() == channel => { - self.subscribed_channels - .retain(|channel| channel != &uchannel.to_string()); + self.subscribed_channels.remove(&uchannel.to_string()); } _ => return Err(response.to_error()), }, diff --git a/src/cmd/subscribe.rs b/src/cmd/subscribe.rs index ed6bce5..7e6cdef 100644 --- a/src/cmd/subscribe.rs +++ b/src/cmd/subscribe.rs @@ -98,8 +98,8 @@ impl Subscribe { res = dst.read_frame() => { let frame = match res? { Some(frame) => frame, - // How to handle remote client closing write half - None => unimplemented!(), + // How to handle remote client closing write half? + None => return Ok(()) }; // A command has been received from the client. From 7700bac2f4ed4d0b7e86f2b310a23a7692f1207c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Sun, 5 Apr 2020 01:06:17 +0100 Subject: [PATCH 3/5] update Subscriber to use async-stream instead of manual Stream impl --- Cargo.lock | 22 +++++++++++++ Cargo.toml | 1 + examples/sub.rs | 9 +++--- src/client.rs | 84 ++++++++++++++++++++++++++++++------------------- src/lib.rs | 2 ++ 5 files changed, 81 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23dec1a..3d77455 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -29,6 +29,25 @@ name = "arc-swap" version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "async-stream" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "async-stream-impl 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "async-stream-impl" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "atoi" version = "0.3.2" @@ -302,6 +321,7 @@ dependencies = [ name = "mini-redis" version = "0.1.0" dependencies = [ + "async-stream 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "clap 3.0.0-beta.1 (git+https://github.com/clap-rs/clap/)", @@ -1011,6 +1031,8 @@ dependencies = [ "checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" "checksum ansi_term 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" "checksum arc-swap 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d663a8e9a99154b5fb793032533f6328da35e23aac63d5c152279aa8ba356825" +"checksum async-stream 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "22068c0c19514942eefcfd4daf8976ef1aad84e61539f95cd200c35202f80af5" +"checksum async-stream-impl 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "25f9db3b38af870bf7e5cc649167533b493928e50744e2c30ae350230b414670" "checksum atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e0afb7287b68575f5ca0e5c7e40191cbd4be59d325781f46faa603e176eaef47" "checksum atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" "checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" diff --git a/Cargo.toml b/Cargo.toml index a8d8009..202689c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ tokio = { git = "https://github.com/tokio-rs/tokio", features = ["full"] } tracing = "0.1.13" tracing-futures = { version = "0.2.3", features = ["tokio"] } tracing-subscriber = "0.2.2" +async-stream = "0.2.1" [dev-dependencies] # Enable test-utilities in dev mode only. This is mostly for tests. diff --git a/examples/sub.rs b/examples/sub.rs index 97823e0..ccd3cf9 100644 --- a/examples/sub.rs +++ b/examples/sub.rs @@ -18,7 +18,6 @@ #![warn(rust_2018_idioms)] use mini_redis::{client, Result}; -use tokio::stream::StreamExt; #[tokio::main] pub async fn main() -> Result<()> { @@ -27,12 +26,12 @@ pub async fn main() -> Result<()> { // subscribe to channel foo - let mut result = client.subscribe(vec!["foo".into()]).await?; + let mut subscriber = client.subscribe(vec!["foo".into()]).await?; // await messages on channel foo - while let Some(Ok(msg)) = result.next().await { - println!("got message from the channel: {}; message = {:?}", msg.channel, msg.content); - } + let msg = subscriber.next_message().await? ; + println!("got message from the channel: {}; message = {:?}", msg.channel, msg.content); + Ok(()) } diff --git a/src/client.rs b/src/client.rs index 30297c1..812e896 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,16 +2,14 @@ use crate::cmd::{Get, Publish, Set, Subscribe, Unsubscribe}; use crate::{Connection, Frame}; use bytes::Bytes; -use std::future::Future; use std::io::{Error, ErrorKind}; use std::iter::FromIterator; use std::collections::HashSet; -use std::pin::Pin; -use std::task::{Context, Poll}; use std::time::Duration; use tokio::net::{TcpStream, ToSocketAddrs}; use tokio::stream::Stream; use tracing::{debug, instrument}; +use async_stream::stream; /// Mini asynchronous Redis client pub struct Client { @@ -188,6 +186,32 @@ pub struct Subscriber { } impl Subscriber { + + /// await for next message published on the subscribed channels + pub async fn next_message(&mut self) -> crate::Result { + match self.receive_message().await { + Some(message) => message, + None => { + // Receiving `None` here indicates the server has closed the + // connection without sending a frame. This is unexpected and is + // represented as a "connection reset by peer" error. + let err = Error::new(ErrorKind::ConnectionReset, "connection reset by server"); + + Err(err.into()) + } + } + } + + /// Convert the subscriber into a Stream + /// yielding new messages published on subscribed channels + pub fn into_stream(mut self) -> impl Stream> { + stream! { + while let Some(message) = self.receive_message().await { + yield message; + } + } + } + /// Subscribe to a list of new channels #[instrument(skip(self))] pub async fn subscribe(&mut self, channels: Vec) -> crate::Result<()> { @@ -263,7 +287,31 @@ impl Subscriber { Ok(()) } - /// Reads a response frame from the socket. If an `Error` frame is read, it + /// Receives a frame published from server on socket and convert it to a `Message` + /// if frame is not `Frame::Array` with proper message structure return Err + async fn receive_message(&mut self) -> Option> { + match self.conn.read_frame().await { + Ok(None) => None, + Err(err) => Some(Err(err.into())), + Ok(Some(mframe)) => { + debug!(?mframe); + match mframe { + Frame::Array(ref frame) => match frame.as_slice() { + [message, channel, content] if &message.to_string() == "message" => { + Some(Ok(Message { + channel: channel.to_string(), + content: Bytes::from(content.to_string()), + })) + } + _ => Some(Err(mframe.to_error())), + }, + frame => Some(Err(frame.to_error())), + } + } + } + } + + /// Reads a response frame to a command from the socket. If an `Error` frame is read, it /// is converted to `Err`. async fn read_response(&mut self) -> crate::Result { let response = self.conn.read_frame().await?; @@ -285,34 +333,6 @@ impl Subscriber { } } -impl Stream for Subscriber { - type Item = crate::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let mut read_frame = Box::pin(self.conn.read_frame()); - match Pin::new(&mut read_frame).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(None)) => Poll::Ready(None), - Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err.into()))), - Poll::Ready(Ok(Some(mframe))) => { - debug!(?mframe); - match mframe { - Frame::Array(ref frame) => match frame.as_slice() { - [message, channel, content] if &message.to_string() == "message" => { - Poll::Ready(Some(Ok(Message { - channel: channel.to_string(), - content: Bytes::from(content.to_string()), - }))) - } - _ => Poll::Ready(Some(Err(mframe.to_error()))), - }, - frame => Poll::Ready(Some(Err(frame.to_error()))), - } - } - } - } -} - /// A message received on a subscribed channel #[derive(Debug, Clone)] pub struct Message { diff --git a/src/lib.rs b/src/lib.rs index a8a28da..e988d27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +// required because async_trait +#![recursion_limit="256"] pub const DEFAULT_PORT: &str = "6379"; pub mod client; From 0de6a413f9cdc423049a875d9bb5b9b86f5960a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Sun, 5 Apr 2020 01:10:25 +0100 Subject: [PATCH 4/5] revert update to error handling server.rs, as #21 handles it --- src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index 2f76f65..4710563 100644 --- a/src/server.rs +++ b/src/server.rs @@ -57,8 +57,8 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result< tokio::select! { res = server.run() => { if let Err(err) = res { + // TODO: gracefully handle this error error!(cause = %err, "failed to accept"); - return Err(err.into()); } } _ = shutdown => { From fa09e46e0601055846345573105d6c592a6021e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Sun, 5 Apr 2020 18:29:13 +0100 Subject: [PATCH 5/5] remove uneeded recursion limit extension --- src/lib.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ce19586..83de51c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,11 +25,6 @@ //! intermediate representation between a "command" and the byte //! representation. -// required because async_trait -#![recursion_limit="256"] -pub const DEFAULT_PORT: &str = "6379"; - - pub mod client; pub mod cmd;