From d1a467c53d2fea177120cf90bc0bd84e705baa7b Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Fri, 19 Jan 2024 15:52:48 -0700 Subject: [PATCH] push up: test adjustment --- arbiter-engine/src/examples/timed_message.rs | 13 ++++++++----- arbiter-engine/src/examples/token_minter.rs | 10 +++++++--- arbiter-engine/src/machine.rs | 9 +++++++-- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/arbiter-engine/src/examples/timed_message.rs b/arbiter-engine/src/examples/timed_message.rs index 881d3e8f..90c69a96 100644 --- a/arbiter-engine/src/examples/timed_message.rs +++ b/arbiter-engine/src/examples/timed_message.rs @@ -2,6 +2,7 @@ const AGENT_ID: &str = "agent"; +use self::machine::MachineHalt; use super::*; use crate::{ agent::Agent, @@ -21,12 +22,9 @@ struct TimedMessage { #[async_trait::async_trait] impl Behavior for TimedMessage { - async fn process(&mut self, event: Message) { + async fn process(&mut self, event: Message) -> Option { trace!("Processing event."); - if self.count == self.max_count.unwrap_or(u64::MAX) {} - if event.data != self.receive_data { - return; - } else { + if event.data == self.receive_data { trace!("Event matches message. Sending a new message."); let message = Message { from: self.messager.id.clone().unwrap(), @@ -34,10 +32,15 @@ impl Behavior for TimedMessage { data: self.send_data.clone(), }; self.messager.send(message).await; + self.count += 1; + } + if self.count == self.max_count.unwrap_or(u64::MAX) { + return Some(MachineHalt); } tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await; trace!("Processed event."); + None } async fn sync(&mut self) { diff --git a/arbiter-engine/src/examples/token_minter.rs b/arbiter-engine/src/examples/token_minter.rs index 35036fa2..f6fdb019 100644 --- a/arbiter-engine/src/examples/token_minter.rs +++ b/arbiter-engine/src/examples/token_minter.rs @@ -8,6 +8,7 @@ use ethers::{ }; use tracing::error; +use self::machine::MachineHalt; use super::*; use crate::{ agent::Agent, @@ -111,7 +112,7 @@ self.messager.id.as_deref()))] #[tracing::instrument(skip(self), fields(id = self.messager.id.as_deref()))] - async fn process(&mut self, event: Message) { + async fn process(&mut self, event: Message) -> Option { if self.tokens.is_none() { error!( "There were no tokens to deploy! You must add tokens to @@ -152,6 +153,7 @@ the token admin before running the simulation." .unwrap(); } } + None } } @@ -205,7 +207,7 @@ self.messager.id.as_deref()))] #[tracing::instrument(skip(self), fields(id = self.messager.id.as_deref()))] - async fn process(&mut self, event: Message) { + async fn process(&mut self, event: Message) -> Option { if let Ok(token_data) = serde_json::from_str::(&event.data) { trace!( "Got @@ -229,6 +231,7 @@ token: {:?}", }; self.messager.send(message).await; } + None } } @@ -236,7 +239,7 @@ token: {:?}", impl Behavior for TokenRequester { #[tracing::instrument(skip(self), fields(id = self.messager.id.as_deref()))] - async fn process(&mut self, event: arbiter_token::TransferFilter) { + async fn process(&mut self, event: arbiter_token::TransferFilter) -> Option { trace!( "Got event for `TokenRequester` logger: {:?}", @@ -254,6 +257,7 @@ self.messager.id.as_deref()))] .unwrap(), }; self.messager.send(message).await; + None } } diff --git a/arbiter-engine/src/machine.rs b/arbiter-engine/src/machine.rs index 50f9ec70..0a1e5c33 100644 --- a/arbiter-engine/src/machine.rs +++ b/arbiter-engine/src/machine.rs @@ -79,7 +79,7 @@ pub trait Behavior: Send + Sync + 'static { /// Used to process events. /// This is where the agent can engage in its specific processing /// of events that can lead to actions being taken. - async fn process(&mut self, event: E); + async fn process(&mut self, event: E) -> Option; } #[async_trait::async_trait] @@ -166,7 +166,12 @@ where println!("Event received: {:?}", event); let decoding_result = serde_json::from_str::(&event); match decoding_result { - Ok(event) => behavior.process(event).await, + Ok(event) => { + let halt_option = behavior.process(event).await; + if halt_option.is_some() { + break; + } + } Err(_) => match serde_json::from_str::(&event) { Ok(_) => { warn!("Behavior received `MachineHalt` message. Breaking!");