diff --git a/Cargo.lock b/Cargo.lock index 19c6b6c8..0964d1f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -205,6 +205,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "anyhow" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" + [[package]] name = "arbiter" version = "0.4.13" @@ -273,6 +279,7 @@ dependencies = [ name = "arbiter-engine" version = "0.1.0" dependencies = [ + "anyhow", "arbiter-bindings", "arbiter-core", "arbiter-macros", diff --git a/arbiter-engine/Cargo.toml b/arbiter-engine/Cargo.toml index 88091e33..b998593c 100644 --- a/arbiter-engine/Cargo.toml +++ b/arbiter-engine/Cargo.toml @@ -34,6 +34,7 @@ toml.workspace = true thiserror.workspace = true tracing.workspace = true +anyhow = "1.0.79" crossbeam-channel.workspace = true diff --git a/arbiter-engine/src/examples/minter/behaviors/mod.rs b/arbiter-engine/src/examples/minter/behaviors/mod.rs index 2998593e..5e4f33f2 100644 --- a/arbiter-engine/src/examples/minter/behaviors/mod.rs +++ b/arbiter-engine/src/examples/minter/behaviors/mod.rs @@ -1,3 +1,5 @@ use super::*; pub(crate) mod token_admin; pub(crate) mod token_requester; + +use anyhow::Result; diff --git a/arbiter-engine/src/examples/minter/behaviors/token_admin.rs b/arbiter-engine/src/examples/minter/behaviors/token_admin.rs index 0eea2a0b..5147dd2b 100644 --- a/arbiter-engine/src/examples/minter/behaviors/token_admin.rs +++ b/arbiter-engine/src/examples/minter/behaviors/token_admin.rs @@ -31,7 +31,7 @@ impl Behavior for TokenAdmin { &mut self, client: Arc, messager: Messager, - ) -> Result, ArbiterEngineError> { + ) -> Result> { self.messager = Some(messager.clone()); self.client = Some(client.clone()); for token_data in self.token_data.values_mut() { @@ -53,12 +53,12 @@ impl Behavior for TokenAdmin { .get_or_insert_with(HashMap::new) .insert(token_data.name.clone(), token.clone()); } - messager.stream() + Ok(messager.stream()?) } #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] - async fn process(&mut self, event: Message) -> Result { + async fn process(&mut self, event: Message) -> Result { if self.tokens.is_none() { error!( "There were no tokens to deploy! You must add tokens to diff --git a/arbiter-engine/src/examples/minter/behaviors/token_requester.rs b/arbiter-engine/src/examples/minter/behaviors/token_requester.rs index 58274ed3..7bf46a78 100644 --- a/arbiter-engine/src/examples/minter/behaviors/token_requester.rs +++ b/arbiter-engine/src/examples/minter/behaviors/token_requester.rs @@ -15,7 +15,7 @@ impl Behavior for TokenRequester { &mut self, client: Arc, mut messager: Messager, - ) -> Result, ArbiterEngineError> { + ) -> Result> { messager .send( To::Agent(self.request_to.clone()), @@ -49,7 +49,7 @@ impl Behavior for TokenRequester { #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] - async fn process(&mut self, event: TransferFilter) -> Result { + async fn process(&mut self, event: TransferFilter) -> Result { let messager = self.messager.as_ref().unwrap(); while (self.count < self.max_count.unwrap()) { debug!("sending message from requester"); diff --git a/arbiter-engine/src/examples/timed_message/mod.rs b/arbiter-engine/src/examples/timed_message/mod.rs index 6413b324..8ce2b7d9 100644 --- a/arbiter-engine/src/examples/timed_message/mod.rs +++ b/arbiter-engine/src/examples/timed_message/mod.rs @@ -4,6 +4,7 @@ const AGENT_ID: &str = "agent"; use std::{pin::Pin, time::Duration}; +use anyhow::Result; use arbiter_macros::Behaviors; use ethers::types::BigEndianHash; use futures_util::Stream; @@ -56,15 +57,15 @@ impl Behavior for TimedMessage { &mut self, _client: Arc, messager: Messager, - ) -> Result, ArbiterEngineError> { + ) -> Result> { if let Some(startup_message) = &self.startup_message { messager.send(To::All, startup_message).await; } self.messager = Some(messager.clone()); - messager.stream() + Ok(messager.stream()?) } - async fn process(&mut self, event: Message) -> Result { + async fn process(&mut self, event: Message) -> Result { if event.data == serde_json::to_string(&self.receive_data).unwrap() { let messager = self.messager.clone().unwrap(); messager.send(To::All, self.send_data.clone()).await; diff --git a/arbiter-engine/src/machine.rs b/arbiter-engine/src/machine.rs index 334fb45c..0358e661 100644 --- a/arbiter-engine/src/machine.rs +++ b/arbiter-engine/src/machine.rs @@ -3,6 +3,7 @@ use std::pin::Pin; +use anyhow::Result; use arbiter_core::middleware::ArbiterMiddleware; use futures_util::{Stream, StreamExt}; use tokio::task::JoinHandle; @@ -77,12 +78,12 @@ pub trait Behavior: Serialize + DeserializeOwned + Send + Sync + Debug + 'sta &mut self, client: Arc, messager: Messager, - ) -> Result, ArbiterEngineError>; + ) -> Result>; /// Used to process events. /// This is where the agent can engage in its specific processing /// of events that can lead to actions being taken. - async fn process(&mut self, event: E) -> Result; + async fn process(&mut self, event: E) -> Result; } /// A trait for creating a state machine. /// @@ -139,7 +140,7 @@ pub trait StateMachine: Send + Sync + Debug + 'static { /// This method does not return a value, but it may result in state changes /// within the implementing type or the generation of further instructions /// or events. - async fn execute(&mut self, instruction: MachineInstruction) -> Result<(), ArbiterEngineError>; + async fn execute(&mut self, instruction: MachineInstruction) -> Result<()>; } /// The `Engine` struct represents the core logic unit of a state machine-based @@ -210,14 +211,14 @@ where B: Behavior + Debug + Serialize + DeserializeOwned, E: DeserializeOwned + Serialize + Send + Sync + Debug + 'static, { - async fn execute(&mut self, instruction: MachineInstruction) -> Result<(), ArbiterEngineError> { + async fn execute(&mut self, instruction: MachineInstruction) -> Result<()> { // NOTE: The unwraps here are safe because the `Behavior` in an engine is only // accessed here and it is private. match instruction { MachineInstruction::Start(client, messager) => { self.state = State::Starting; let mut behavior = self.behavior.take().unwrap(); - let behavior_task: JoinHandle, B), ArbiterEngineError>> = + let behavior_task: JoinHandle, B)>> = tokio::spawn(async move { let id = messager.id.clone(); let stream = behavior.startup(client, messager).await?; @@ -234,18 +235,17 @@ where trace!("Behavior is starting up."); let mut behavior = self.behavior.take().unwrap(); let mut stream = self.event_stream.take().unwrap(); - let behavior_task: JoinHandle> = - tokio::spawn(async move { - while let Some(event) = stream.next().await { - match behavior.process(event).await? { - ControlFlow::Halt => { - break; - } - ControlFlow::Continue => {} + let behavior_task: JoinHandle> = tokio::spawn(async move { + while let Some(event) = stream.next().await { + match behavior.process(event).await? { + ControlFlow::Halt => { + break; } + ControlFlow::Continue => {} } - Ok(behavior) - }); + } + Ok(behavior) + }); // TODO: We don't have to store the behavior again here, we could just discard // it. self.behavior = Some(behavior_task.await??);