Skip to content

Commit

Permalink
feat: optional stream and default impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Autoparallel committed Feb 16, 2024
1 parent 267c776 commit a764c0a
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions engine/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,24 @@ pub enum State {
/// The [`Behavior`] trait is the lowest level functionality that will be used
/// by a [`StateMachine`]. This constitutes what each state transition will do.
#[async_trait::async_trait]
pub trait Behavior<E>: Serialize + DeserializeOwned + Send + Sync + Debug + 'static {
pub trait Behavior<E: Send + 'static>:
Serialize + DeserializeOwned + Send + Sync + Debug + 'static
{
/// Used to start the agent.
/// This is where the agent can engage in its specific start up activities
/// that it can do given the current state of the world.
async fn startup(
&mut self,
client: Arc<ArbiterMiddleware>,
messager: Messager,
) -> Result<EventStream<E>>;
) -> Result<Option<EventStream<E>>>;

/// Used to process events.
/// This is where the agent can engage in its specific processing
/// of events that can lead to actions being taken.
async fn process(&mut self, event: E) -> Result<ControlFlow>;
async fn process(&mut self, _event: E) -> Result<ControlFlow> {
Ok(ControlFlow::Halt)
}
}
/// A trait for creating a state machine.
///
Expand Down Expand Up @@ -140,7 +144,7 @@ pub trait StateMachine: Send + Sync + Debug + 'static {
/// This method does not return a value, but it may result in state changes
/// within the implementing type or the generation of further instructions
/// or events.
async fn execute(&mut self, instruction: MachineInstruction) -> Result<()>;
async fn execute(&mut self, _instruction: MachineInstruction) -> Result<()>;
}

/// The `Engine` struct represents the core logic unit of a state machine-based
Expand All @@ -161,6 +165,7 @@ pub trait StateMachine: Send + Sync + Debug + 'static {
pub struct Engine<B, E>
where
B: Behavior<E>,
E: Send + 'static,
{
/// The behavior the `Engine` runs.
behavior: Option<B>,
Expand Down Expand Up @@ -215,18 +220,26 @@ where
MachineInstruction::Start(client, messager) => {
self.state = State::Starting;
let mut behavior = self.behavior.take().unwrap();
let behavior_task: JoinHandle<Result<(EventStream<E>, B)>> =
let behavior_task: JoinHandle<Result<(Option<EventStream<E>>, B)>> =
tokio::spawn(async move {
let id = messager.id.clone();
let stream = behavior.startup(client, messager).await?;
debug!("startup complete for behavior {:?}", id);
Ok((stream, behavior))
});
let (stream, behavior) = behavior_task.await??;
self.event_stream = Some(stream);
self.behavior = Some(behavior);
self.execute(MachineInstruction::Process).await?;
Ok(())
match stream {
Some(stream) => {
self.event_stream = Some(stream);
self.behavior = Some(behavior);
self.execute(MachineInstruction::Process).await?;
Ok(())
}
None => {
self.behavior = Some(behavior);
Ok(())
}
}
}
MachineInstruction::Process => {
trace!("Behavior is starting up.");
Expand Down

0 comments on commit a764c0a

Please sign in to comment.