Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(arbiter-engine): run multiple worlds in parallel #826

Merged
merged 7 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions arbiter-core/src/data_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@
metadata: Option<Value>,
}

impl Debug for EventLogger {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventLogger")
.field("receiver", &self.receiver)
.field("output_file_type", &self.output_file_type)
.field("directory", &self.directory)
.field("file_name", &self.file_name)
.field("metadata", &self.metadata)
.finish()
}

Check warning on line 84 in arbiter-core/src/data_collection.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-core/src/data_collection.rs#L76-L84

Added lines #L76 - L84 were not covered by tests
}

/// `OutputFileType` is an enumeration that represents the different types of
/// file formats that the `EventLogger` can output to.
#[derive(Debug, Clone, Copy, Serialize)]
Expand Down
13 changes: 13 additions & 0 deletions arbiter-engine/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@
broadcast_task: Option<JoinHandle<Pin<Box<dyn Stream<Item = String> + Send>>>>,
}

impl Debug for Agent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Agent")
.field("id", &self.id)
.field("state", &self.state)
.field("messager", &self.messager)
.field("client", &self.client)
.field("event_streamer", &self.event_streamer)
.field("behavior_engines", &self.behavior_engines)
.finish()
}
}

Check warning on line 112 in arbiter-engine/src/agent.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/agent.rs#L102-L112

Added lines #L102 - L112 were not covered by tests

impl Agent {
/// Produces a new agent with the given identifier.
pub fn new(id: &str, world: &World) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion arbiter-engine/src/examples/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ use futures_util::{stream, StreamExt};

use super::*;
use crate::messager::{Message, Messager};
mod timed_message;
pub(crate) mod timed_message;
mod token_minter;
23 changes: 18 additions & 5 deletions arbiter-engine/src/examples/timed_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ use crate::{
world::World,
};

struct TimedMessage {
#[derive(Debug)]
pub(crate) struct TimedMessage {
delay: u64,
receive_data: String,
send_data: String,
messager: Option<Messager>,
count: u64,
max_count: Option<u64>,
start_message: Option<String>,
}

impl TimedMessage {
Expand All @@ -30,6 +32,7 @@ impl TimedMessage {
receive_data: String,
send_data: String,
max_count: Option<u64>,
start_message: Option<String>,
) -> Self {
Self {
delay,
Expand All @@ -38,6 +41,7 @@ impl TimedMessage {
messager: None,
count: 0,
max_count,
start_message,
}
}
}
Expand Down Expand Up @@ -76,6 +80,14 @@ impl Behavior<Message> for TimedMessage {

async fn startup(&mut self) {
trace!("Starting up `TimedMessage`.");
if let Some(start_message) = &self.start_message {
let message = Message {
from: self.messager.as_ref().unwrap().id.clone().unwrap(),
to: To::All,
data: start_message.clone(),
};
self.messager.as_ref().unwrap().send(message).await;
}
tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await;
trace!("Started up `TimedMessage`.");
}
Expand All @@ -91,6 +103,7 @@ async fn echoer() {
"Hello, world!".to_owned(),
"Hello, world!".to_owned(),
Some(2),
None,
);
world.add_agent(agent.with_behavior(behavior));

Expand Down Expand Up @@ -129,8 +142,8 @@ async fn ping_pong() {
let mut world = World::new("world");

let agent = Agent::new(AGENT_ID, &world);
let behavior_ping = TimedMessage::new(1, "pong".to_owned(), "ping".to_owned(), Some(2));
let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2));
let behavior_ping = TimedMessage::new(1, "pong".to_owned(), "ping".to_owned(), Some(2), None);
let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2), None);
world.add_agent(
agent
.with_behavior(behavior_ping)
Expand Down Expand Up @@ -173,10 +186,10 @@ async fn ping_pong_two_agent() {
let mut world = World::new("world");

let agent_ping = Agent::new("agent_ping", &world);
let behavior_ping = TimedMessage::new(1, "pong".to_owned(), "ping".to_owned(), Some(2));
let behavior_ping = TimedMessage::new(1, "pong".to_owned(), "ping".to_owned(), Some(2), None);

let agent_pong = Agent::new("agent_pong", &world);
let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2));
let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2), None);

world.add_agent(agent_ping.with_behavior(behavior_ping));
world.add_agent(agent_pong.with_behavior(behavior_pong));
Expand Down
1 change: 1 addition & 0 deletions arbiter-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ pub mod agent;
pub mod examples;
pub mod machine;
pub mod messager;
pub mod universe;
pub mod world;
7 changes: 4 additions & 3 deletions arbiter-engine/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub enum MachineInstruction {
pub struct MachineHalt;

/// The state used by any entity implementing [`StateMachine`].
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum State {
/// The entity is not yet running any process.
/// This is the state adopted by the entity when it is first created.
Expand Down Expand Up @@ -80,7 +80,7 @@ pub enum State {
/// The [`Behavior`] trait is the lowest level functionality that will be used
/// by a [`StateMachine`]. This constitutes what each state transition will do.
#[async_trait::async_trait]
pub trait Behavior<E>: Send + Sync + 'static {
pub trait Behavior<E>: Send + Sync + Debug + 'static {
/// Used to bring the agent back up to date with the latest state of the
/// world. This could be used if the world was stopped and later restarted.
async fn sync(&mut self, _messager: Messager, _client: Arc<RevmMiddleware>) {}
Expand All @@ -97,7 +97,7 @@ pub trait Behavior<E>: Send + Sync + 'static {
}

#[async_trait::async_trait]
pub(crate) trait StateMachine: Send + Sync + 'static {
pub(crate) trait StateMachine: Send + Sync + Debug + 'static {
async fn execute(&mut self, instruction: MachineInstruction);
}

Expand All @@ -109,6 +109,7 @@ pub(crate) trait StateMachine: Send + Sync + 'static {
/// generics can be collapsed into a `dyn` trait object so that, for example,
/// [`agent::Agent`]s can own multiple [`Behavior`]s with different event `<E>`
/// types.
#[derive(Debug)]
pub struct Engine<B, E>
where
B: Behavior<E>,
Expand Down
158 changes: 158 additions & 0 deletions arbiter-engine/src/universe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//! The [`universe`] module contains the [`Universe`] struct which is the
//! primary interface for creating and running many `World`s in parallel.

use std::collections::HashMap;

use anyhow::Result;
use futures_util::future::join_all;
use tokio::task::{spawn, JoinError};

use crate::world::World;

/// The [`Universe`] struct is the primary interface for creating and running
/// many `World`s in parallel. At the moment, is a wrapper around a
/// [`HashMap`] of [`World`]s, but can be extended to handle generics inside of
/// [`World`]s and crosstalk between [`World`]s.
#[derive(Debug, Default)]

Check warning on line 16 in arbiter-engine/src/universe.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/universe.rs#L16

Added line #L16 was not covered by tests
pub struct Universe {
worlds: Option<HashMap<String, World>>,
world_tasks: Option<Vec<Result<World, JoinError>>>,
}

impl Universe {
/// Creates a new [`Universe`].
pub fn new() -> Self {
Self {
worlds: Some(HashMap::new()),
world_tasks: None,
}
}

/// Adds a [`World`] to the [`Universe`].
pub fn add_world(&mut self, world: World) {
if let Some(worlds) = self.worlds.as_mut() {
worlds.insert(world.id.clone(), world);
}
}

/// Runs all of the [`World`]s in the [`Universe`] in parallel.
pub async fn run_worlds(&mut self) -> Result<()> {
if self.is_online() {
return Err(anyhow::anyhow!("Universe is already running."));
}
let mut tasks = Vec::new();
for (_, world) in self.worlds.take().unwrap().drain() {
tasks.push(spawn(async move { world.run().await }));
}
self.world_tasks = Some(join_all(tasks.into_iter()).await);
Ok(())
}

/// Returns `true` if the [`Universe`] is running.
pub fn is_online(&self) -> bool {
self.world_tasks.is_some()
}
}

#[cfg(test)]
mod tests {
use std::fs::{read_to_string, remove_file, File};

use tracing_subscriber::{fmt, EnvFilter};

use super::*;
use crate::{agent::Agent, examples::timed_message::*, machine::State};

#[tokio::test]
async fn run_universe() {
let mut universe = Universe::new();
let world = World::new("test");
universe.add_world(world);
universe.run_worlds().await.unwrap();
let world = universe.world_tasks.unwrap().remove(0).unwrap();
assert_eq!(world.state, State::Processing);
}

#[tokio::test]
#[should_panic(expected = "Universe is already running.")]
async fn cant_run_twice() {
let mut universe = Universe::new();
let world1 = World::new("test");
universe.add_world(world1);
universe.run_worlds().await.unwrap();
universe.run_worlds().await.unwrap();
}

#[tokio::test]
async fn run_parallel() {
std::env::set_var("RUST_LOG", "trace");
let file = File::create("test_logs_engine.log").expect("Unable to create log file");

let subscriber = fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_writer(file)
.finish();

tracing::subscriber::set_global_default(subscriber)
.expect("setting default subscriber failed");

let mut world1 = World::new("test1");
let agent1 = Agent::new("agent1", &world1);
let behavior1 = TimedMessage::new(
1,
"echo".to_owned(),
"echo".to_owned(),
Some(5),
Some("echo".to_owned()),
);
world1.add_agent(agent1.with_behavior(behavior1));

let mut world2 = World::new("test2");
let agent2 = Agent::new("agent2", &world2);
let behavior2 = TimedMessage::new(
1,
"echo".to_owned(),
"echo".to_owned(),
Some(5),
Some("echo".to_owned()),
);
world2.add_agent(agent2.with_behavior(behavior2));

let mut universe = Universe::new();
universe.add_world(world1);
universe.add_world(world2);

universe.run_worlds().await.unwrap();

let parsed_file = read_to_string("test_logs_engine.log").expect("Unable to read log file");

// Define the line to check (excluding the timestamp)
let line_to_check = "World is syncing.";

// Assert that the lines appear consecutively
assert!(
lines_appear_consecutively(&parsed_file, line_to_check),
"The lines do not appear consecutively"

Check warning on line 135 in arbiter-engine/src/universe.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/universe.rs#L135

Added line #L135 was not covered by tests
);
remove_file("test_logs_engine.log").expect("Unable to remove log file");
}

fn lines_appear_consecutively(file_contents: &str, line_to_check: &str) -> bool {
let mut lines = file_contents.lines();

while let Some(line) = lines.next() {
if line.contains(line_to_check) {
println!("Found line: {}", line);
// Check if the next line also contains the line_to_check
if let Some(next_line) = lines.next() {
if next_line.contains(line_to_check) {
println!("Found next line: {}", next_line);
return true;
}
}

Check warning on line 152 in arbiter-engine/src/universe.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/universe.rs#L151-L152

Added lines #L151 - L152 were not covered by tests
}
}

false

Check warning on line 156 in arbiter-engine/src/universe.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/universe.rs#L156

Added line #L156 was not covered by tests
}
}
4 changes: 3 additions & 1 deletion arbiter-engine/src/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
/// 5. [`State::Stopped`]: The [`World`] is stopped. This is where the [`World`]
/// can be stopped and state of the [`World`] and its [`Agent`]s can be
/// offloaded and saved.
#[derive(Debug)]

Check warning on line 57 in arbiter-engine/src/world.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/world.rs#L57

Added line #L57 was not covered by tests
pub struct World {
/// The identifier of the world.
pub id: String,
Expand Down Expand Up @@ -106,10 +107,11 @@
}

/// Runs the world through up to the [`State::Processing`] stage.
pub async fn run(&mut self) {
pub async fn run(mut self) -> Self {
self.execute(MachineInstruction::Sync(None, None)).await;
self.execute(MachineInstruction::Start).await;
self.execute(MachineInstruction::Process).await;
self
}

/// Stops the world by stopping all the behaviors that each of the agents is
Expand Down
Loading