From 547af9298cb1b111f55503d7bccb7e1fae64104a Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Thu, 25 Jan 2024 18:55:36 -0700 Subject: [PATCH 1/4] save --- arbiter-engine/src/machine.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/arbiter-engine/src/machine.rs b/arbiter-engine/src/machine.rs index c16cd623..55897dfa 100644 --- a/arbiter-engine/src/machine.rs +++ b/arbiter-engine/src/machine.rs @@ -1,18 +1,25 @@ //! The [`StateMachine`] trait, [`Behavior`] trait, and the [`Engine`] that runs //! [`Behavior`]s. -use std::fmt::Debug; +// TODO: Notes +// I think we should have the `sync` stage of the behavior receive the client +// and messager and then the user can decide if it wants to use those in their +// behavior. +use std::{fmt::Debug, sync::Arc}; + +use arbiter_core::middleware::RevmMiddleware; use serde::de::DeserializeOwned; use tokio::sync::broadcast::Receiver; +use self::messager::Messager; use super::*; /// The instructions that can be sent to a [`StateMachine`]. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Debug)] pub enum MachineInstruction { /// Used to make a [`StateMachine`] sync with the world. - Sync, + Sync(Option, Option>), /// Used to make a [`StateMachine`] start up. Start, @@ -71,7 +78,7 @@ pub enum State { pub trait Behavior: Send + Sync + 'static { /// Used to bring the agent back up to date with the latest state of the /// world. This could be used if the world was stopped and later restarted. - async fn sync(&mut self) {} + async fn sync(&mut self, messager: Messager, client: Arc) {} /// Used to start the agent. /// This is where the agent can engage in its specific start up activities @@ -144,7 +151,7 @@ where self.state = State::Syncing; let mut behavior = self.behavior.take().unwrap(); let behavior_task = tokio::spawn(async move { - behavior.sync().await; + behavior.sync(messager, client).await; behavior }); self.behavior = Some(behavior_task.await.unwrap()); From 25a1e48bedcc3993d0a5b8280e4b3dc66dc65b50 Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Fri, 26 Jan 2024 09:49:12 -0700 Subject: [PATCH 2/4] enhance: behaviors receive messager and client --- arbiter-engine/src/agent.rs | 11 +- arbiter-engine/src/examples/timed_message.rs | 103 +-- arbiter-engine/src/examples/token_minter.rs | 770 ++++++++++--------- arbiter-engine/src/machine.rs | 6 +- arbiter-engine/src/messager.rs | 10 + arbiter-engine/src/world.rs | 13 +- 6 files changed, 452 insertions(+), 461 deletions(-) diff --git a/arbiter-engine/src/agent.rs b/arbiter-engine/src/agent.rs index 9ad86c7f..e862e9e0 100644 --- a/arbiter-engine/src/agent.rs +++ b/arbiter-engine/src/agent.rs @@ -144,8 +144,9 @@ impl Agent { pub(crate) async fn run(&mut self, instruction: MachineInstruction) { let behavior_engines = self.behavior_engines.take().unwrap(); let behavior_tasks = join_all(behavior_engines.into_iter().map(|mut engine| { + let instruction_clone = instruction.clone(); tokio::spawn(async move { - engine.execute(instruction).await; + engine.execute(instruction_clone).await; engine }) })); @@ -164,10 +165,14 @@ impl StateMachine for Agent { #[tracing::instrument(skip(self), fields(id = self.id))] async fn execute(&mut self, instruction: MachineInstruction) { match instruction { - MachineInstruction::Sync => { + MachineInstruction::Sync(_, _) => { debug!("Agent is syncing."); self.state = State::Syncing; - self.run(instruction).await; + self.run(MachineInstruction::Sync( + self.messager.clone(), + Some(self.client.clone()), + )) + .await; } MachineInstruction::Start => { debug!("Agent is starting up."); diff --git a/arbiter-engine/src/examples/timed_message.rs b/arbiter-engine/src/examples/timed_message.rs index 74d95748..71315e0a 100644 --- a/arbiter-engine/src/examples/timed_message.rs +++ b/arbiter-engine/src/examples/timed_message.rs @@ -19,23 +19,42 @@ struct TimedMessage { delay: u64, receive_data: String, send_data: String, - messager: Messager, + messager: Option, count: u64, max_count: Option, } +impl TimedMessage { + pub fn new( + delay: u64, + receive_data: String, + send_data: String, + max_count: Option, + ) -> Self { + Self { + delay, + receive_data, + send_data, + messager: None, + count: 0, + max_count, + } + } +} + #[async_trait::async_trait] impl Behavior for TimedMessage { async fn process(&mut self, event: Message) -> Option { trace!("Processing event."); + let messager = self.messager.as_ref().unwrap(); if event.data == self.receive_data { trace!("Event matches message. Sending a new message."); let message = Message { - from: self.messager.id.clone().unwrap(), + from: messager.id.clone().unwrap(), to: To::All, data: self.send_data.clone(), }; - self.messager.send(message).await; + messager.send(message).await; self.count += 1; } if self.count == self.max_count.unwrap_or(u64::MAX) { @@ -48,8 +67,9 @@ impl Behavior for TimedMessage { None } - async fn sync(&mut self) { + async fn sync(&mut self, messager: Messager, _client: Arc) { trace!("Syncing state for `TimedMessage`."); + self.messager = Some(messager); tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await; trace!("Synced state for `TimedMessage`."); } @@ -63,24 +83,18 @@ impl Behavior for TimedMessage { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn echoer() { - // std::env::set_var("RUST_LOG", "trace"); - // tracing_subscriber::fmt::init(); + std::env::set_var("RUST_LOG", "trace"); + tracing_subscriber::fmt::init(); let mut world = World::new("world"); let agent = Agent::new(AGENT_ID, &world); - let behavior = TimedMessage { - delay: 1, - receive_data: "Hello, world!".to_owned(), - send_data: "Hello, world!".to_owned(), - messager: agent - .messager - .as_ref() - .unwrap() - .join_with_id(Some(AGENT_ID.to_owned())), - count: 0, - max_count: Some(2), - }; + let behavior = TimedMessage::new( + 1, + "Hello, world!".to_owned(), + "Hello, world!".to_owned(), + Some(2), + ); world.add_agent(agent.with_behavior(behavior)); let messager = world.messager.join_with_id(Some("god".to_owned())); @@ -121,31 +135,8 @@ async fn ping_pong() { let mut world = World::new("world"); let agent = Agent::new(AGENT_ID, &world); - let behavior_ping = TimedMessage { - delay: 1, - receive_data: "pong".to_owned(), - send_data: "ping".to_owned(), - messager: agent - .messager - .as_ref() - .unwrap() - .join_with_id(Some(AGENT_ID.to_owned())), - count: 0, - max_count: Some(2), - }; - let behavior_pong = TimedMessage { - delay: 1, - receive_data: "ping".to_owned(), - send_data: "pong".to_owned(), - messager: agent - .messager - .as_ref() - .unwrap() - .join_with_id(Some(AGENT_ID.to_owned())), - count: 0, - max_count: Some(2), - }; - + let behavior_ping = TimedMessage::new(1, "pong".to_owned(), "ping".to_owned(), Some(2)); + let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2)); world.add_agent( agent .with_behavior(behavior_ping) @@ -191,32 +182,10 @@ async fn ping_pong_two_agent() { let mut world = World::new("world"); let agent_ping = Agent::new("agent_ping", &world); - let behavior_ping = TimedMessage { - delay: 1, - receive_data: "pong".to_owned(), - send_data: "ping".to_owned(), - messager: agent_ping - .messager - .as_ref() - .unwrap() - .join_with_id(Some("agent_ping".to_owned())), - count: 0, - max_count: Some(2), - }; + let behavior_ping = TimedMessage::new(1, "pong".to_owned(), "ping".to_owned(), Some(2)); let agent_pong = Agent::new("agent_pong", &world); - let behavior_pong = TimedMessage { - delay: 1, - receive_data: "ping".to_owned(), - send_data: "pong".to_owned(), - messager: agent_pong - .messager - .as_ref() - .unwrap() - .join_with_id(Some("agent_pong".to_owned())), - count: 0, - max_count: Some(2), - }; + let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2)); world.add_agent(agent_ping.with_behavior(behavior_ping)); world.add_agent(agent_pong.with_behavior(behavior_pong)); diff --git a/arbiter-engine/src/examples/token_minter.rs b/arbiter-engine/src/examples/token_minter.rs index f75f7ef4..c0951878 100644 --- a/arbiter-engine/src/examples/token_minter.rs +++ b/arbiter-engine/src/examples/token_minter.rs @@ -1,384 +1,386 @@ -use std::{str::FromStr, time::Duration}; - -use anyhow::Context; -use arbiter_bindings::bindings::arbiter_token; -use arbiter_core::data_collection::EventLogger; -use ethers::{ - abi::token, - types::{transaction::request, Filter}, -}; -use tokio::time::timeout; -use tracing::error; - -use self::machine::MachineHalt; -use super::*; -use crate::{ - agent::Agent, - machine::{Behavior, MachineInstruction, StateMachine}, - messager::To, - world::World, -}; - -const TOKEN_ADMIN_ID: &str = "token_admin"; -const REQUESTER_ID: &str = "requester"; -const TOKEN_NAME: &str = "Arbiter Token"; -const TOKEN_SYMBOL: &str = "ARB"; -const TOKEN_DECIMALS: u8 = 18; - -/// The token admin is responsible for handling token minting requests. -#[derive(Debug)] -pub struct TokenAdmin { - /// The identifier of the token admin. - pub token_data: HashMap, - - pub tokens: Option>>, - - // TODO: We should not have to have a client or a messager put here - // explicitly, they should come from the Agent the behavior is given to. - pub client: Arc, - pub messager: Messager, - - count: u64, - - max_count: Option, -} - -impl TokenAdmin { - pub fn new( - client: Arc, - messager: Messager, - count: u64, - max_count: Option, - ) -> Self { - Self { - token_data: HashMap::new(), - tokens: None, - client, - messager, - count, - max_count, - } - } - - /// Adds a token to the token admin. - pub fn add_token(&mut self, token_data: TokenData) { - self.token_data.insert(token_data.name.clone(), token_data); - } -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct TokenData { - pub name: String, - pub symbol: String, - pub decimals: u8, - pub address: Option
, -} - -/// Used as an action to ask what tokens are available. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub enum TokenAdminQuery { - /// Get the address of the token. - AddressOf(String), - - /// Mint tokens. - MintRequest(MintRequest), -} - -/// Used as an action to mint tokens. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct MintRequest { - /// The token to mint. - pub token: String, - - /// The address to mint to. - pub mint_to: Address, - - /// The amount to mint. - pub mint_amount: u64, -} - -#[async_trait::async_trait] -impl Behavior for TokenAdmin { - #[tracing::instrument(skip(self), fields(id = -self.messager.id.as_deref()))] - async fn sync(&mut self) { - for token_data in self.token_data.values_mut() { - let token = ArbiterToken::deploy( - self.client.clone(), - ( - token_data.name.clone(), - token_data.symbol.clone(), - token_data.decimals, - ), - ) - .unwrap() - .send() - .await - .unwrap(); - token_data.address = Some(token.address()); - self.tokens - .get_or_insert_with(HashMap::new) - .insert(token_data.name.clone(), token.clone()); - debug!("Deployed token: {:?}", token); - } - } - - #[tracing::instrument(skip(self), fields(id = -self.messager.id.as_deref()))] - async fn process(&mut self, event: Message) -> Option { - if self.tokens.is_none() { - error!( - "There were no tokens to deploy! You must add tokens to -the token admin before running the simulation." - ); - } - - let query: TokenAdminQuery = serde_json::from_str(&event.data).unwrap(); - trace!("Got query: {:?}", query); - match query { - TokenAdminQuery::AddressOf(token_name) => { - trace!( - "Getting address of token with name: {:?}", - token_name.clone() - ); - let token_data = self.token_data.get(&token_name).unwrap(); - let message = Message { - from: self.messager.id.clone().unwrap(), - to: To::Agent(event.from.clone()), // Reply back to sender - data: serde_json::to_string(token_data).unwrap(), - }; - self.messager.send(message).await; - } - TokenAdminQuery::MintRequest(mint_request) => { - trace!("Minting tokens: {:?}", mint_request); - let token = self - .tokens - .as_ref() - .unwrap() - .get(&mint_request.token) - .unwrap(); - token - .mint(mint_request.mint_to, U256::from(mint_request.mint_amount)) - .send() - .await - .unwrap() - .await - .unwrap(); - self.count += 1; - if self.count == self.max_count.unwrap_or(u64::MAX) { - warn!("Reached max count. Halting behavior."); - return Some(MachineHalt); - } - } - } - None - } -} - -/// The token requester is responsible for requesting tokens from the token -/// admin. This agents is purely for testing purposes as far as I can tell. -#[derive(Debug)] -pub struct TokenRequester { - /// The tokens that the token requester has requested. - pub token_data: TokenData, - - /// The agent ID to request tokens to. - pub request_to: String, - - /// Client to have an address to receive token mint to and check balance - pub client: Arc, - - /// The messaging layer for the token requester. - pub messager: Messager, - - pub count: u64, - - pub max_count: Option, -} - -impl TokenRequester { - pub fn new( - client: Arc, - messager: Messager, - count: u64, - max_count: Option, - ) -> Self { - Self { - token_data: TokenData { - name: TOKEN_NAME.to_owned(), - symbol: TOKEN_SYMBOL.to_owned(), - decimals: TOKEN_DECIMALS, - address: None, - }, - request_to: TOKEN_ADMIN_ID.to_owned(), - client, - messager, - count, - max_count, - } - } -} - -#[async_trait::async_trait] -impl Behavior for TokenRequester { - #[tracing::instrument(skip(self), fields(id = -self.messager.id.as_deref()))] - async fn startup(&mut self) { - trace!("Requesting address of token: {:?}", self.token_data.name); - let message = Message { - from: self.messager.id.clone().unwrap(), - to: To::Agent(self.request_to.clone()), - data: serde_json::to_string(&TokenAdminQuery::AddressOf(self.token_data.name.clone())) - .unwrap(), - }; - self.messager.send(message).await; - } - - #[tracing::instrument(skip(self), fields(id = -self.messager.id.as_deref()))] - async fn process(&mut self, event: Message) -> Option { - if let Ok(token_data) = serde_json::from_str::(&event.data) { - trace!( - "Got -token data: {:?}", - token_data - ); - trace!( - "Requesting first mint of -token: {:?}", - self.token_data.name - ); - let message = Message { - from: self.messager.id.clone().unwrap(), - to: To::Agent(self.request_to.clone()), - data: serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { - token: self.token_data.name.clone(), - mint_to: self.client.address(), - mint_amount: 1, - })) - .unwrap(), - }; - self.messager.send(message).await; - } - Some(MachineHalt) - } -} - -#[async_trait::async_trait] -impl Behavior for TokenRequester { - #[tracing::instrument(skip(self), fields(id = -self.messager.id.as_deref()))] - async fn process(&mut self, event: arbiter_token::TransferFilter) -> Option { - trace!( - "Got event for -`TokenRequester` logger: {:?}", - event - ); - std::thread::sleep(std::time::Duration::from_secs(1)); - let message = Message { - from: self.messager.id.clone().unwrap(), - to: To::Agent(self.request_to.clone()), - data: serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { - token: self.token_data.name.clone(), - mint_to: self.client.address(), - mint_amount: 1, - })) - .unwrap(), - }; - self.messager.send(message).await; - self.count += 1; - if self.count == self.max_count.unwrap_or(u64::MAX) { - warn!("Reached max count. Halting behavior."); - return Some(MachineHalt); - } - None - } -} - -#[ignore] -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn token_minter_simulation() { - // std::env::set_var("RUST_LOG", "trace"); - // tracing_subscriber::fmt::init(); - - let mut world = World::new("test_world"); - - // Create the token admin agent - let token_admin = Agent::new(TOKEN_ADMIN_ID, &world); - let mut token_admin_behavior = TokenAdmin::new( - token_admin.client.clone(), - token_admin - .messager - .as_ref() - .unwrap() - .join_with_id(Some(TOKEN_ADMIN_ID.to_owned())), - 0, - Some(4), - ); - token_admin_behavior.add_token(TokenData { - name: TOKEN_NAME.to_owned(), - symbol: TOKEN_SYMBOL.to_owned(), - decimals: TOKEN_DECIMALS, - address: None, - }); - world.add_agent(token_admin.with_behavior(token_admin_behavior)); - - // Create the token requester agent - let token_requester = Agent::new(REQUESTER_ID, &world); - let token_requester_behavior = TokenRequester::new( - token_requester.client.clone(), - token_requester - .messager - .as_ref() - .unwrap() - .join_with_id(Some(REQUESTER_ID.to_owned())), - 0, - Some(4), - ); - let arb = ArbiterToken::new( - Address::from_str("0x240a76d4c8a7dafc6286db5fa6b589e8b21fc00f").unwrap(), - token_requester.client.clone(), - ); - let transfer_event = arb.transfer_filter(); - - let token_requester_behavior_again = TokenRequester::new( - token_requester.client.clone(), - token_requester - .messager - .as_ref() - .unwrap() - .join_with_id(Some(REQUESTER_ID.to_owned())), - 0, - Some(4), - ); - world.add_agent( - token_requester - .with_behavior::(token_requester_behavior) - .with_behavior::(token_requester_behavior_again) - .with_event(transfer_event), - ); - - let transfer_stream = EventLogger::builder() - .add_stream(arb.transfer_filter()) - .stream() - .unwrap(); - let mut stream = Box::pin(transfer_stream); - let mut idx = 0; - - world.run().await; - - loop { - match timeout(Duration::from_secs(1), stream.next()).await { - Ok(Some(event)) => { - println!("Event received in outside world: {:?}", event); - idx += 1; - if idx == 4 { - break; - } - } - _ => { - panic!("Timeout reached. Test failed."); - } - } - } -} +// use std::{str::FromStr, time::Duration}; + +// use anyhow::Context; +// use arbiter_bindings::bindings::arbiter_token; +// use arbiter_core::data_collection::EventLogger; +// use ethers::{ +// abi::token, +// types::{transaction::request, Filter}, +// }; +// use tokio::time::timeout; +// use tracing::error; + +// use self::machine::MachineHalt; +// use super::*; +// use crate::{ +// agent::Agent, +// machine::{Behavior, MachineInstruction, StateMachine}, +// messager::To, +// world::World, +// }; + +// const TOKEN_ADMIN_ID: &str = "token_admin"; +// const REQUESTER_ID: &str = "requester"; +// const TOKEN_NAME: &str = "Arbiter Token"; +// const TOKEN_SYMBOL: &str = "ARB"; +// const TOKEN_DECIMALS: u8 = 18; + +// /// The token admin is responsible for handling token minting requests. +// #[derive(Debug)] +// pub struct TokenAdmin { +// /// The identifier of the token admin. +// pub token_data: HashMap, + +// pub tokens: Option>>, + +// // TODO: We should not have to have a client or a messager put here +// // explicitly, they should come from the Agent the behavior is given to. +// pub client: Arc, +// pub messager: Messager, + +// count: u64, + +// max_count: Option, +// } + +// impl TokenAdmin { +// pub fn new( +// client: Arc, +// messager: Messager, +// count: u64, +// max_count: Option, +// ) -> Self { +// Self { +// token_data: HashMap::new(), +// tokens: None, +// client, +// messager, +// count, +// max_count, +// } +// } + +// /// Adds a token to the token admin. +// pub fn add_token(&mut self, token_data: TokenData) { +// self.token_data.insert(token_data.name.clone(), token_data); +// } +// } + +// #[derive(Clone, Debug, Deserialize, Serialize)] +// pub struct TokenData { +// pub name: String, +// pub symbol: String, +// pub decimals: u8, +// pub address: Option
, +// } + +// /// Used as an action to ask what tokens are available. +// #[derive(Clone, Debug, Deserialize, Serialize)] +// pub enum TokenAdminQuery { +// /// Get the address of the token. +// AddressOf(String), + +// /// Mint tokens. +// MintRequest(MintRequest), +// } + +// /// Used as an action to mint tokens. +// #[derive(Clone, Debug, Deserialize, Serialize)] +// pub struct MintRequest { +// /// The token to mint. +// pub token: String, + +// /// The address to mint to. +// pub mint_to: Address, + +// /// The amount to mint. +// pub mint_amount: u64, +// } + +// #[async_trait::async_trait] +// impl Behavior for TokenAdmin { +// #[tracing::instrument(skip(self), fields(id = +// self.messager.id.as_deref()))] +// async fn sync(&mut self) { +// for token_data in self.token_data.values_mut() { +// let token = ArbiterToken::deploy( +// self.client.clone(), +// ( +// token_data.name.clone(), +// token_data.symbol.clone(), +// token_data.decimals, +// ), +// ) +// .unwrap() +// .send() +// .await +// .unwrap(); +// token_data.address = Some(token.address()); +// self.tokens +// .get_or_insert_with(HashMap::new) +// .insert(token_data.name.clone(), token.clone()); +// debug!("Deployed token: {:?}", token); +// } +// } + +// #[tracing::instrument(skip(self), fields(id = +// self.messager.id.as_deref()))] +// async fn process(&mut self, event: Message) -> Option { +// if self.tokens.is_none() { +// error!( +// "There were no tokens to deploy! You must add tokens to +// the token admin before running the simulation." +// ); +// } + +// let query: TokenAdminQuery = +// serde_json::from_str(&event.data).unwrap(); trace!("Got query: {:?}", +// query); match query { +// TokenAdminQuery::AddressOf(token_name) => { +// trace!( +// "Getting address of token with name: {:?}", +// token_name.clone() +// ); +// let token_data = self.token_data.get(&token_name).unwrap(); +// let message = Message { +// from: self.messager.id.clone().unwrap(), +// to: To::Agent(event.from.clone()), // Reply back to +// sender data: serde_json::to_string(token_data).unwrap(), +// }; +// self.messager.send(message).await; +// } +// TokenAdminQuery::MintRequest(mint_request) => { +// trace!("Minting tokens: {:?}", mint_request); +// let token = self +// .tokens +// .as_ref() +// .unwrap() +// .get(&mint_request.token) +// .unwrap(); +// token +// .mint(mint_request.mint_to, +// U256::from(mint_request.mint_amount)) .send() +// .await +// .unwrap() +// .await +// .unwrap(); +// self.count += 1; +// if self.count == self.max_count.unwrap_or(u64::MAX) { +// warn!("Reached max count. Halting behavior."); +// return Some(MachineHalt); +// } +// } +// } +// None +// } +// } + +// /// The token requester is responsible for requesting tokens from the token +// /// admin. This agents is purely for testing purposes as far as I can tell. +// #[derive(Debug)] +// pub struct TokenRequester { +// /// The tokens that the token requester has requested. +// pub token_data: TokenData, + +// /// The agent ID to request tokens to. +// pub request_to: String, + +// /// Client to have an address to receive token mint to and check balance +// pub client: Arc, + +// /// The messaging layer for the token requester. +// pub messager: Messager, + +// pub count: u64, + +// pub max_count: Option, +// } + +// impl TokenRequester { +// pub fn new( +// client: Arc, +// messager: Messager, +// count: u64, +// max_count: Option, +// ) -> Self { +// Self { +// token_data: TokenData { +// name: TOKEN_NAME.to_owned(), +// symbol: TOKEN_SYMBOL.to_owned(), +// decimals: TOKEN_DECIMALS, +// address: None, +// }, +// request_to: TOKEN_ADMIN_ID.to_owned(), +// client, +// messager, +// count, +// max_count, +// } +// } +// } + +// #[async_trait::async_trait] +// impl Behavior for TokenRequester { +// #[tracing::instrument(skip(self), fields(id = +// self.messager.id.as_deref()))] +// async fn startup(&mut self) { +// trace!("Requesting address of token: {:?}", self.token_data.name); +// let message = Message { +// from: self.messager.id.clone().unwrap(), +// to: To::Agent(self.request_to.clone()), +// data: +// serde_json::to_string(&TokenAdminQuery::AddressOf(self.token_data.name. +// clone())) .unwrap(), +// }; +// self.messager.send(message).await; +// } + +// #[tracing::instrument(skip(self), fields(id = +// self.messager.id.as_deref()))] +// async fn process(&mut self, event: Message) -> Option { +// if let Ok(token_data) = +// serde_json::from_str::(&event.data) { trace!( +// "Got +// token data: {:?}", +// token_data +// ); +// trace!( +// "Requesting first mint of +// token: {:?}", +// self.token_data.name +// ); +// let message = Message { +// from: self.messager.id.clone().unwrap(), +// to: To::Agent(self.request_to.clone()), +// data: +// serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { +// token: self.token_data.name.clone(), mint_to: +// self.client.address(), mint_amount: 1, +// })) +// .unwrap(), +// }; +// self.messager.send(message).await; +// } +// Some(MachineHalt) +// } +// } + +// #[async_trait::async_trait] +// impl Behavior for TokenRequester { +// #[tracing::instrument(skip(self), fields(id = +// self.messager.id.as_deref()))] +// async fn process(&mut self, event: arbiter_token::TransferFilter) -> +// Option { trace!( +// "Got event for +// `TokenRequester` logger: {:?}", +// event +// ); +// std::thread::sleep(std::time::Duration::from_secs(1)); +// let message = Message { +// from: self.messager.id.clone().unwrap(), +// to: To::Agent(self.request_to.clone()), +// data: +// serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { +// token: self.token_data.name.clone(), mint_to: +// self.client.address(), mint_amount: 1, +// })) +// .unwrap(), +// }; +// self.messager.send(message).await; +// self.count += 1; +// if self.count == self.max_count.unwrap_or(u64::MAX) { +// warn!("Reached max count. Halting behavior."); +// return Some(MachineHalt); +// } +// None +// } +// } + +// #[ignore] +// #[tokio::test(flavor = "multi_thread", worker_threads = 4)] +// async fn token_minter_simulation() { +// // std::env::set_var("RUST_LOG", "trace"); +// // tracing_subscriber::fmt::init(); + +// let mut world = World::new("test_world"); + +// // Create the token admin agent +// let token_admin = Agent::new(TOKEN_ADMIN_ID, &world); +// let mut token_admin_behavior = TokenAdmin::new( +// token_admin.client.clone(), +// token_admin +// .messager +// .as_ref() +// .unwrap() +// .join_with_id(Some(TOKEN_ADMIN_ID.to_owned())), +// 0, +// Some(4), +// ); +// token_admin_behavior.add_token(TokenData { +// name: TOKEN_NAME.to_owned(), +// symbol: TOKEN_SYMBOL.to_owned(), +// decimals: TOKEN_DECIMALS, +// address: None, +// }); +// world.add_agent(token_admin.with_behavior(token_admin_behavior)); + +// // Create the token requester agent +// let token_requester = Agent::new(REQUESTER_ID, &world); +// let token_requester_behavior = TokenRequester::new( +// token_requester.client.clone(), +// token_requester +// .messager +// .as_ref() +// .unwrap() +// .join_with_id(Some(REQUESTER_ID.to_owned())), +// 0, +// Some(4), +// ); +// let arb = ArbiterToken::new( +// Address::from_str("0x240a76d4c8a7dafc6286db5fa6b589e8b21fc00f"). +// unwrap(), token_requester.client.clone(), +// ); +// let transfer_event = arb.transfer_filter(); + +// let token_requester_behavior_again = TokenRequester::new( +// token_requester.client.clone(), +// token_requester +// .messager +// .as_ref() +// .unwrap() +// .join_with_id(Some(REQUESTER_ID.to_owned())), +// 0, +// Some(4), +// ); +// world.add_agent( +// token_requester +// .with_behavior::(token_requester_behavior) +// +// .with_behavior::(token_requester_behavior_again) +// .with_event(transfer_event), +// ); + +// let transfer_stream = EventLogger::builder() +// .add_stream(arb.transfer_filter()) +// .stream() +// .unwrap(); +// let mut stream = Box::pin(transfer_stream); +// let mut idx = 0; + +// world.run().await; + +// loop { +// match timeout(Duration::from_secs(1), stream.next()).await { +// Ok(Some(event)) => { +// println!("Event received in outside world: {:?}", event); +// idx += 1; +// if idx == 4 { +// break; +// } +// } +// _ => { +// panic!("Timeout reached. Test failed."); +// } +// } +// } +// } diff --git a/arbiter-engine/src/machine.rs b/arbiter-engine/src/machine.rs index 55897dfa..2b9383c1 100644 --- a/arbiter-engine/src/machine.rs +++ b/arbiter-engine/src/machine.rs @@ -6,6 +6,8 @@ // and messager and then the user can decide if it wants to use those in their // behavior. +// Could typestate pattern help here at all? + use std::{fmt::Debug, sync::Arc}; use arbiter_core::middleware::RevmMiddleware; @@ -146,12 +148,12 @@ where { async fn execute(&mut self, instruction: MachineInstruction) { match instruction { - MachineInstruction::Sync => { + MachineInstruction::Sync(messager, client) => { trace!("Behavior is syncing."); self.state = State::Syncing; let mut behavior = self.behavior.take().unwrap(); let behavior_task = tokio::spawn(async move { - behavior.sync(messager, client).await; + behavior.sync(messager.unwrap(), client.unwrap()).await; behavior }); self.behavior = Some(behavior_task.await.unwrap()); diff --git a/arbiter-engine/src/messager.rs b/arbiter-engine/src/messager.rs index 290fd6f3..041b7ccd 100644 --- a/arbiter-engine/src/messager.rs +++ b/arbiter-engine/src/messager.rs @@ -40,6 +40,16 @@ pub struct Messager { broadcast_receiver: Option>, } +impl Clone for Messager { + fn clone(&self) -> Self { + Self { + broadcast_sender: self.broadcast_sender.clone(), + broadcast_receiver: Some(self.broadcast_sender.subscribe()), + id: self.id.clone(), + } + } +} + impl Messager { // TODO: Allow for modulating the capacity of the messager. // TODO: It might be nice to have some kind of messaging header so that we can diff --git a/arbiter-engine/src/world.rs b/arbiter-engine/src/world.rs index 13b65d3a..41ad218b 100644 --- a/arbiter-engine/src/world.rs +++ b/arbiter-engine/src/world.rs @@ -107,7 +107,7 @@ impl World { /// Runs the world through up to the [`State::Processing`] stage. pub async fn run(&mut self) { - self.execute(MachineInstruction::Sync).await; + self.execute(MachineInstruction::Sync(None, None)).await; self.execute(MachineInstruction::Start).await; self.execute(MachineInstruction::Process).await; } @@ -137,13 +137,14 @@ impl World { impl StateMachine for World { async fn execute(&mut self, instruction: MachineInstruction) { match instruction { - MachineInstruction::Sync => { + MachineInstruction::Sync(_, _) => { info!("World is syncing."); self.state = State::Syncing; let agents = self.agents.take().unwrap(); let agent_tasks = join_all(agents.into_values().map(|mut agent| { + let instruction_clone = instruction.clone(); tokio::spawn(async move { - agent.execute(instruction).await; + agent.execute(instruction_clone).await; agent }) })); @@ -163,8 +164,9 @@ impl StateMachine for World { self.state = State::Starting; let agents = self.agents.take().unwrap(); let agent_tasks = join_all(agents.into_values().map(|mut agent| { + let instruction_clone = instruction.clone(); tokio::spawn(async move { - agent.execute(instruction).await; + agent.execute(instruction_clone).await; agent }) })); @@ -186,8 +188,9 @@ impl StateMachine for World { let mut agent_distributors = vec![]; let agent_processors = join_all(agents.into_values().map(|mut agent| { agent_distributors.push(agent.distributor.0.clone()); + let instruction_clone = instruction.clone(); tokio::spawn(async move { - agent.execute(instruction).await; + agent.execute(instruction_clone).await; agent }) })); From 32da94428362235de942ac2c2d6290fc5e3c3f9d Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Fri, 26 Jan 2024 13:21:19 -0700 Subject: [PATCH 3/4] fix: tests --- arbiter-engine/src/examples/timed_message.rs | 9 - arbiter-engine/src/examples/token_minter.rs | 764 +++++++++---------- arbiter-engine/src/machine.rs | 5 +- 3 files changed, 382 insertions(+), 396 deletions(-) diff --git a/arbiter-engine/src/examples/timed_message.rs b/arbiter-engine/src/examples/timed_message.rs index 71315e0a..df76c0c8 100644 --- a/arbiter-engine/src/examples/timed_message.rs +++ b/arbiter-engine/src/examples/timed_message.rs @@ -83,9 +83,6 @@ impl Behavior for TimedMessage { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn echoer() { - std::env::set_var("RUST_LOG", "trace"); - tracing_subscriber::fmt::init(); - let mut world = World::new("world"); let agent = Agent::new(AGENT_ID, &world); @@ -129,9 +126,6 @@ async fn echoer() { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn ping_pong() { - // std::env::set_var("RUST_LOG", "trace"); - // tracing_subscriber::fmt::init(); - let mut world = World::new("world"); let agent = Agent::new(AGENT_ID, &world); @@ -176,9 +170,6 @@ async fn ping_pong() { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn ping_pong_two_agent() { - // std::env::set_var("RUST_LOG", "trace"); - // tracing_subscriber::fmt::init(); - let mut world = World::new("world"); let agent_ping = Agent::new("agent_ping", &world); diff --git a/arbiter-engine/src/examples/token_minter.rs b/arbiter-engine/src/examples/token_minter.rs index c0951878..230bacae 100644 --- a/arbiter-engine/src/examples/token_minter.rs +++ b/arbiter-engine/src/examples/token_minter.rs @@ -1,386 +1,378 @@ -// use std::{str::FromStr, time::Duration}; - -// use anyhow::Context; -// use arbiter_bindings::bindings::arbiter_token; -// use arbiter_core::data_collection::EventLogger; -// use ethers::{ -// abi::token, -// types::{transaction::request, Filter}, -// }; -// use tokio::time::timeout; -// use tracing::error; - -// use self::machine::MachineHalt; -// use super::*; -// use crate::{ -// agent::Agent, -// machine::{Behavior, MachineInstruction, StateMachine}, -// messager::To, -// world::World, -// }; - -// const TOKEN_ADMIN_ID: &str = "token_admin"; -// const REQUESTER_ID: &str = "requester"; -// const TOKEN_NAME: &str = "Arbiter Token"; -// const TOKEN_SYMBOL: &str = "ARB"; -// const TOKEN_DECIMALS: u8 = 18; - -// /// The token admin is responsible for handling token minting requests. -// #[derive(Debug)] -// pub struct TokenAdmin { -// /// The identifier of the token admin. -// pub token_data: HashMap, - -// pub tokens: Option>>, - -// // TODO: We should not have to have a client or a messager put here -// // explicitly, they should come from the Agent the behavior is given to. -// pub client: Arc, -// pub messager: Messager, - -// count: u64, - -// max_count: Option, -// } - -// impl TokenAdmin { -// pub fn new( -// client: Arc, -// messager: Messager, -// count: u64, -// max_count: Option, -// ) -> Self { -// Self { -// token_data: HashMap::new(), -// tokens: None, -// client, -// messager, -// count, -// max_count, -// } -// } - -// /// Adds a token to the token admin. -// pub fn add_token(&mut self, token_data: TokenData) { -// self.token_data.insert(token_data.name.clone(), token_data); -// } -// } - -// #[derive(Clone, Debug, Deserialize, Serialize)] -// pub struct TokenData { -// pub name: String, -// pub symbol: String, -// pub decimals: u8, -// pub address: Option
, -// } - -// /// Used as an action to ask what tokens are available. -// #[derive(Clone, Debug, Deserialize, Serialize)] -// pub enum TokenAdminQuery { -// /// Get the address of the token. -// AddressOf(String), - -// /// Mint tokens. -// MintRequest(MintRequest), -// } - -// /// Used as an action to mint tokens. -// #[derive(Clone, Debug, Deserialize, Serialize)] -// pub struct MintRequest { -// /// The token to mint. -// pub token: String, - -// /// The address to mint to. -// pub mint_to: Address, - -// /// The amount to mint. -// pub mint_amount: u64, -// } - -// #[async_trait::async_trait] -// impl Behavior for TokenAdmin { -// #[tracing::instrument(skip(self), fields(id = -// self.messager.id.as_deref()))] -// async fn sync(&mut self) { -// for token_data in self.token_data.values_mut() { -// let token = ArbiterToken::deploy( -// self.client.clone(), -// ( -// token_data.name.clone(), -// token_data.symbol.clone(), -// token_data.decimals, -// ), -// ) -// .unwrap() -// .send() -// .await -// .unwrap(); -// token_data.address = Some(token.address()); -// self.tokens -// .get_or_insert_with(HashMap::new) -// .insert(token_data.name.clone(), token.clone()); -// debug!("Deployed token: {:?}", token); -// } -// } - -// #[tracing::instrument(skip(self), fields(id = -// self.messager.id.as_deref()))] -// async fn process(&mut self, event: Message) -> Option { -// if self.tokens.is_none() { -// error!( -// "There were no tokens to deploy! You must add tokens to -// the token admin before running the simulation." -// ); -// } - -// let query: TokenAdminQuery = -// serde_json::from_str(&event.data).unwrap(); trace!("Got query: {:?}", -// query); match query { -// TokenAdminQuery::AddressOf(token_name) => { -// trace!( -// "Getting address of token with name: {:?}", -// token_name.clone() -// ); -// let token_data = self.token_data.get(&token_name).unwrap(); -// let message = Message { -// from: self.messager.id.clone().unwrap(), -// to: To::Agent(event.from.clone()), // Reply back to -// sender data: serde_json::to_string(token_data).unwrap(), -// }; -// self.messager.send(message).await; -// } -// TokenAdminQuery::MintRequest(mint_request) => { -// trace!("Minting tokens: {:?}", mint_request); -// let token = self -// .tokens -// .as_ref() -// .unwrap() -// .get(&mint_request.token) -// .unwrap(); -// token -// .mint(mint_request.mint_to, -// U256::from(mint_request.mint_amount)) .send() -// .await -// .unwrap() -// .await -// .unwrap(); -// self.count += 1; -// if self.count == self.max_count.unwrap_or(u64::MAX) { -// warn!("Reached max count. Halting behavior."); -// return Some(MachineHalt); -// } -// } -// } -// None -// } -// } - -// /// The token requester is responsible for requesting tokens from the token -// /// admin. This agents is purely for testing purposes as far as I can tell. -// #[derive(Debug)] -// pub struct TokenRequester { -// /// The tokens that the token requester has requested. -// pub token_data: TokenData, - -// /// The agent ID to request tokens to. -// pub request_to: String, - -// /// Client to have an address to receive token mint to and check balance -// pub client: Arc, - -// /// The messaging layer for the token requester. -// pub messager: Messager, - -// pub count: u64, - -// pub max_count: Option, -// } - -// impl TokenRequester { -// pub fn new( -// client: Arc, -// messager: Messager, -// count: u64, -// max_count: Option, -// ) -> Self { -// Self { -// token_data: TokenData { -// name: TOKEN_NAME.to_owned(), -// symbol: TOKEN_SYMBOL.to_owned(), -// decimals: TOKEN_DECIMALS, -// address: None, -// }, -// request_to: TOKEN_ADMIN_ID.to_owned(), -// client, -// messager, -// count, -// max_count, -// } -// } -// } - -// #[async_trait::async_trait] -// impl Behavior for TokenRequester { -// #[tracing::instrument(skip(self), fields(id = -// self.messager.id.as_deref()))] -// async fn startup(&mut self) { -// trace!("Requesting address of token: {:?}", self.token_data.name); -// let message = Message { -// from: self.messager.id.clone().unwrap(), -// to: To::Agent(self.request_to.clone()), -// data: -// serde_json::to_string(&TokenAdminQuery::AddressOf(self.token_data.name. -// clone())) .unwrap(), -// }; -// self.messager.send(message).await; -// } - -// #[tracing::instrument(skip(self), fields(id = -// self.messager.id.as_deref()))] -// async fn process(&mut self, event: Message) -> Option { -// if let Ok(token_data) = -// serde_json::from_str::(&event.data) { trace!( -// "Got -// token data: {:?}", -// token_data -// ); -// trace!( -// "Requesting first mint of -// token: {:?}", -// self.token_data.name -// ); -// let message = Message { -// from: self.messager.id.clone().unwrap(), -// to: To::Agent(self.request_to.clone()), -// data: -// serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { -// token: self.token_data.name.clone(), mint_to: -// self.client.address(), mint_amount: 1, -// })) -// .unwrap(), -// }; -// self.messager.send(message).await; -// } -// Some(MachineHalt) -// } -// } - -// #[async_trait::async_trait] -// impl Behavior for TokenRequester { -// #[tracing::instrument(skip(self), fields(id = -// self.messager.id.as_deref()))] -// async fn process(&mut self, event: arbiter_token::TransferFilter) -> -// Option { trace!( -// "Got event for -// `TokenRequester` logger: {:?}", -// event -// ); -// std::thread::sleep(std::time::Duration::from_secs(1)); -// let message = Message { -// from: self.messager.id.clone().unwrap(), -// to: To::Agent(self.request_to.clone()), -// data: -// serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { -// token: self.token_data.name.clone(), mint_to: -// self.client.address(), mint_amount: 1, -// })) -// .unwrap(), -// }; -// self.messager.send(message).await; -// self.count += 1; -// if self.count == self.max_count.unwrap_or(u64::MAX) { -// warn!("Reached max count. Halting behavior."); -// return Some(MachineHalt); -// } -// None -// } -// } - -// #[ignore] -// #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -// async fn token_minter_simulation() { -// // std::env::set_var("RUST_LOG", "trace"); -// // tracing_subscriber::fmt::init(); - -// let mut world = World::new("test_world"); - -// // Create the token admin agent -// let token_admin = Agent::new(TOKEN_ADMIN_ID, &world); -// let mut token_admin_behavior = TokenAdmin::new( -// token_admin.client.clone(), -// token_admin -// .messager -// .as_ref() -// .unwrap() -// .join_with_id(Some(TOKEN_ADMIN_ID.to_owned())), -// 0, -// Some(4), -// ); -// token_admin_behavior.add_token(TokenData { -// name: TOKEN_NAME.to_owned(), -// symbol: TOKEN_SYMBOL.to_owned(), -// decimals: TOKEN_DECIMALS, -// address: None, -// }); -// world.add_agent(token_admin.with_behavior(token_admin_behavior)); - -// // Create the token requester agent -// let token_requester = Agent::new(REQUESTER_ID, &world); -// let token_requester_behavior = TokenRequester::new( -// token_requester.client.clone(), -// token_requester -// .messager -// .as_ref() -// .unwrap() -// .join_with_id(Some(REQUESTER_ID.to_owned())), -// 0, -// Some(4), -// ); -// let arb = ArbiterToken::new( -// Address::from_str("0x240a76d4c8a7dafc6286db5fa6b589e8b21fc00f"). -// unwrap(), token_requester.client.clone(), -// ); -// let transfer_event = arb.transfer_filter(); - -// let token_requester_behavior_again = TokenRequester::new( -// token_requester.client.clone(), -// token_requester -// .messager -// .as_ref() -// .unwrap() -// .join_with_id(Some(REQUESTER_ID.to_owned())), -// 0, -// Some(4), -// ); -// world.add_agent( -// token_requester -// .with_behavior::(token_requester_behavior) -// -// .with_behavior::(token_requester_behavior_again) -// .with_event(transfer_event), -// ); - -// let transfer_stream = EventLogger::builder() -// .add_stream(arb.transfer_filter()) -// .stream() -// .unwrap(); -// let mut stream = Box::pin(transfer_stream); -// let mut idx = 0; - -// world.run().await; - -// loop { -// match timeout(Duration::from_secs(1), stream.next()).await { -// Ok(Some(event)) => { -// println!("Event received in outside world: {:?}", event); -// idx += 1; -// if idx == 4 { -// break; -// } -// } -// _ => { -// panic!("Timeout reached. Test failed."); -// } -// } -// } -// } +use std::{str::FromStr, time::Duration}; + +use anyhow::Context; +use arbiter_bindings::bindings::arbiter_token; +use arbiter_core::data_collection::EventLogger; +use ethers::{ + abi::token, + types::{transaction::request, Filter}, +}; +use tokio::time::timeout; +use tracing::error; + +use self::machine::MachineHalt; +use super::*; +use crate::{ + agent::Agent, + machine::{Behavior, MachineInstruction, StateMachine}, + messager::To, + world::World, +}; + +const TOKEN_ADMIN_ID: &str = "token_admin"; +const REQUESTER_ID: &str = "requester"; +const TOKEN_NAME: &str = "Arbiter Token"; +const TOKEN_SYMBOL: &str = "ARB"; +const TOKEN_DECIMALS: u8 = 18; + +/// The token admin is responsible for handling token minting requests. +#[derive(Debug)] +pub struct TokenAdmin { + /// The identifier of the token admin. + pub token_data: HashMap, + + pub tokens: Option>>, + + pub client: Option>, + + pub messager: Option, + + count: u64, + + max_count: Option, +} + +impl TokenAdmin { + pub fn new(count: u64, max_count: Option) -> Self { + Self { + token_data: HashMap::new(), + tokens: None, + client: None, + messager: None, + count, + max_count, + } + } + + /// Adds a token to the token admin. + pub fn add_token(&mut self, token_data: TokenData) { + self.token_data.insert(token_data.name.clone(), token_data); + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct TokenData { + pub name: String, + pub symbol: String, + pub decimals: u8, + pub address: Option
, +} + +/// Used as an action to ask what tokens are available. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum TokenAdminQuery { + /// Get the address of the token. + AddressOf(String), + + /// Mint tokens. + MintRequest(MintRequest), +} + +/// Used as an action to mint tokens. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct MintRequest { + /// The token to mint. + pub token: String, + + /// The address to mint to. + pub mint_to: Address, + + /// The amount to mint. + pub mint_amount: u64, +} + +#[async_trait::async_trait] +impl Behavior for TokenAdmin { + #[tracing::instrument(skip(self), fields(id = messager.id.as_deref()))] + async fn sync(&mut self, messager: Messager, client: Arc) { + for token_data in self.token_data.values_mut() { + let token = ArbiterToken::deploy( + client.clone(), + ( + token_data.name.clone(), + token_data.symbol.clone(), + token_data.decimals, + ), + ) + .unwrap() + .send() + .await + .unwrap(); + token_data.address = Some(token.address()); + self.tokens + .get_or_insert_with(HashMap::new) + .insert(token_data.name.clone(), token.clone()); + debug!("Deployed token: {:?}", token); + } + self.messager = Some(messager); + self.client = Some(client); + } + + #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] + async fn process(&mut self, event: Message) -> Option { + if self.tokens.is_none() { + error!( + "There were no tokens to deploy! You must add tokens to +the token admin before running the simulation." + ); + } + + let query: TokenAdminQuery = serde_json::from_str(&event.data).unwrap(); + trace!("Got query: {:?}", query); + let messager = self.messager.as_ref().unwrap(); + match query { + TokenAdminQuery::AddressOf(token_name) => { + trace!( + "Getting address of token with name: {:?}", + token_name.clone() + ); + let token_data = self.token_data.get(&token_name).unwrap(); + let message = Message { + from: messager.id.clone().unwrap(), + to: To::Agent(event.from.clone()), // Reply back to sender + data: serde_json::to_string(token_data).unwrap(), + }; + messager.send(message).await; + } + TokenAdminQuery::MintRequest(mint_request) => { + trace!("Minting tokens: {:?}", mint_request); + let token = self + .tokens + .as_ref() + .unwrap() + .get(&mint_request.token) + .unwrap(); + token + .mint(mint_request.mint_to, U256::from(mint_request.mint_amount)) + .send() + .await + .unwrap() + .await + .unwrap(); + self.count += 1; + if self.count == self.max_count.unwrap_or(u64::MAX) { + warn!("Reached max count. Halting behavior."); + return Some(MachineHalt); + } + } + } + None + } +} + +/// The token requester is responsible for requesting tokens from the token +/// admin. This agents is purely for testing purposes as far as I can tell. +#[derive(Debug)] +pub struct TokenRequester { + /// The tokens that the token requester has requested. + pub token_data: TokenData, + + /// The agent ID to request tokens to. + pub request_to: String, + + /// Client to have an address to receive token mint to and check balance + pub client: Option>, + + /// The messaging layer for the token requester. + pub messager: Option, + + pub count: u64, + + pub max_count: Option, +} + +impl TokenRequester { + pub fn new( + client: Arc, + messager: Messager, + count: u64, + max_count: Option, + ) -> Self { + Self { + token_data: TokenData { + name: TOKEN_NAME.to_owned(), + symbol: TOKEN_SYMBOL.to_owned(), + decimals: TOKEN_DECIMALS, + address: None, + }, + request_to: TOKEN_ADMIN_ID.to_owned(), + client: None, + messager: None, + count, + max_count, + } + } +} + +#[async_trait::async_trait] +impl Behavior for TokenRequester { + #[tracing::instrument(skip(self), fields(id = messager.id.as_deref()))] + async fn sync(&mut self, messager: Messager, client: Arc) { + self.messager = Some(messager); + self.client = Some(client); + } + + #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] + async fn startup(&mut self) { + let messager = self.messager.as_ref().unwrap(); + trace!("Requesting address of token: {:?}", self.token_data.name); + let message = Message { + from: messager.id.clone().unwrap(), + to: To::Agent(self.request_to.clone()), + data: serde_json::to_string(&TokenAdminQuery::AddressOf(self.token_data.name.clone())) + .unwrap(), + }; + messager.send(message).await; + } + + #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] + async fn process(&mut self, event: Message) -> Option { + if let Ok(token_data) = serde_json::from_str::(&event.data) { + let messager = self.messager.as_ref().unwrap(); + trace!( + "Got +token data: {:?}", + token_data + ); + trace!( + "Requesting first mint of +token: {:?}", + self.token_data.name + ); + let message = Message { + from: messager.id.clone().unwrap(), + to: To::Agent(self.request_to.clone()), + data: serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { + token: self.token_data.name.clone(), + mint_to: self.client.as_ref().unwrap().address(), + mint_amount: 1, + })) + .unwrap(), + }; + messager.send(message).await; + } + Some(MachineHalt) + } +} + +#[async_trait::async_trait] +impl Behavior for TokenRequester { + async fn sync(&mut self, messager: Messager, client: Arc) { + self.client = Some(client); + self.messager = Some(messager); + } + + #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] + async fn process(&mut self, event: arbiter_token::TransferFilter) -> Option { + let messager = self.messager.as_ref().unwrap(); + trace!( + "Got event for +`TokenRequester` logger: {:?}", + event + ); + std::thread::sleep(std::time::Duration::from_secs(1)); + let message = Message { + from: messager.id.clone().unwrap(), + to: To::Agent(self.request_to.clone()), + data: serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { + token: self.token_data.name.clone(), + mint_to: self.client.as_ref().unwrap().address(), + mint_amount: 1, + })) + .unwrap(), + }; + messager.send(message).await; + self.count += 1; + if self.count == self.max_count.unwrap_or(u64::MAX) { + warn!("Reached max count. Halting behavior."); + return Some(MachineHalt); + } + None + } +} + +#[ignore] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn token_minter_simulation() { + let mut world = World::new("test_world"); + + // Create the token admin agent + let token_admin = Agent::new(TOKEN_ADMIN_ID, &world); + let mut token_admin_behavior = TokenAdmin::new(0, Some(4)); + token_admin_behavior.add_token(TokenData { + name: TOKEN_NAME.to_owned(), + symbol: TOKEN_SYMBOL.to_owned(), + decimals: TOKEN_DECIMALS, + address: None, + }); + world.add_agent(token_admin.with_behavior(token_admin_behavior)); + + // Create the token requester agent + let token_requester = Agent::new(REQUESTER_ID, &world); + let token_requester_behavior = TokenRequester::new( + token_requester.client.clone(), + token_requester + .messager + .as_ref() + .unwrap() + .join_with_id(Some(REQUESTER_ID.to_owned())), + 0, + Some(4), + ); + let arb = ArbiterToken::new( + Address::from_str("0x240a76d4c8a7dafc6286db5fa6b589e8b21fc00f").unwrap(), + token_requester.client.clone(), + ); + let transfer_event = arb.transfer_filter(); + + let token_requester_behavior_again = TokenRequester::new( + token_requester.client.clone(), + token_requester + .messager + .as_ref() + .unwrap() + .join_with_id(Some(REQUESTER_ID.to_owned())), + 0, + Some(4), + ); + world.add_agent( + token_requester + .with_behavior::(token_requester_behavior) + .with_behavior::(token_requester_behavior_again) + .with_event(transfer_event), + ); + + let transfer_stream = EventLogger::builder() + .add_stream(arb.transfer_filter()) + .stream() + .unwrap(); + let mut stream = Box::pin(transfer_stream); + let mut idx = 0; + + world.run().await; + + loop { + match timeout(Duration::from_secs(1), stream.next()).await { + Ok(Some(event)) => { + println!("Event received in outside world: {:?}", event); + idx += 1; + if idx == 4 { + break; + } + } + _ => { + panic!("Timeout reached. Test failed."); + } + } + } +} diff --git a/arbiter-engine/src/machine.rs b/arbiter-engine/src/machine.rs index 2b9383c1..18ae5cc5 100644 --- a/arbiter-engine/src/machine.rs +++ b/arbiter-engine/src/machine.rs @@ -6,7 +6,10 @@ // and messager and then the user can decide if it wants to use those in their // behavior. -// Could typestate pattern help here at all? +// Could typestate pattern help here at all? Sync could produce a `Synced` state +// behavior that can then not have options for client and messager. Then the +// user can decide if they want to use those in their behavior and get a bit +// simpler UX. use std::{fmt::Debug, sync::Arc}; From d31821a8807862415b8f45b319b759ef2313a9a7 Mon Sep 17 00:00:00 2001 From: Colin Roberts Date: Fri, 26 Jan 2024 15:41:40 -0700 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=A7=BD=20remove=20lint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- arbiter-engine/src/machine.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arbiter-engine/src/machine.rs b/arbiter-engine/src/machine.rs index 18ae5cc5..856f42c9 100644 --- a/arbiter-engine/src/machine.rs +++ b/arbiter-engine/src/machine.rs @@ -83,7 +83,7 @@ pub enum State { pub trait Behavior: Send + Sync + 'static { /// Used to bring the agent back up to date with the latest state of the /// world. This could be used if the world was stopped and later restarted. - async fn sync(&mut self, messager: Messager, client: Arc) {} + async fn sync(&mut self, _messager: Messager, _client: Arc) {} /// Used to start the agent. /// This is where the agent can engage in its specific start up activities