diff --git a/Cargo.lock b/Cargo.lock index f4a0d37ff..3499b5cd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -183,9 +183,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" +checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220" [[package]] name = "anstyle-parse" @@ -314,6 +314,7 @@ dependencies = [ "futures-util", "serde", "serde_json", + "thiserror", "tokio", "tokio-stream", "tracing", @@ -586,14 +587,13 @@ dependencies = [ [[package]] name = "auto_impl" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fee3da8ef1276b0bee5dd1c7258010d8fffd31801447323115a25560e1327b89" +checksum = "823b8bb275161044e2ac7a25879cb3e2480cb403e3943022c7c769c599b756aa" dependencies = [ - "proc-macro-error", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.43", ] [[package]] @@ -772,7 +772,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c48f0051a4b4c5e0b6d365cd04af53aeaa209e3cc15ec2cdb69e73cc87fbd0dc" dependencies = [ "memchr", - "regex-automata 0.4.3", + "regex-automata 0.4.5", "serde", ] @@ -802,9 +802,9 @@ checksum = "e1e5f035d16fc623ae5f74981db80a439803888314e3a555fd6f04acd51a3205" [[package]] name = "bytemuck" -version = "1.14.0" +version = "1.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6" +checksum = "ed2490600f404f2b94c167e31d3ed1d5f3c225a0f3b80230053b3e0b7b962bd9" dependencies = [ "bytemuck_derive", ] @@ -1892,9 +1892,9 @@ checksum = "b90ca2580b73ab6a1f724b76ca11ab632df820fd6040c336200d2c1df7b3c82c" [[package]] name = "eyre" -version = "0.6.11" +version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6267a1fa6f59179ea4afc8e50fd8612a3cc60bc858f786ff877a4a8cb042799" +checksum = "7cd915d99f24784cdc19fd37ef22b97e3ff0ae756c7e492e9fbfe897d61e2aec" dependencies = [ "indenter", "once_cell", @@ -2223,7 +2223,7 @@ dependencies = [ "aho-corasick", "bstr", "log", - "regex-automata 0.4.3", + "regex-automata 0.4.5", "regex-syntax 0.8.2", ] @@ -2514,9 +2514,9 @@ checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" [[package]] name = "indexmap" -version = "2.1.0" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" +checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -2772,9 +2772,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.152" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libloading" @@ -3056,6 +3056,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.45" @@ -3470,18 +3476,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" dependencies = [ "proc-macro2", "quote", @@ -3976,30 +3982,6 @@ dependencies = [ "toml_edit 0.21.1", ] -[[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", -] - [[package]] name = "proc-macro2" version = "1.0.78" @@ -4044,11 +4026,11 @@ dependencies = [ [[package]] name = "pulldown-cmark" -version = "0.9.3" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" +checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.2", "memchr", "unicase", ] @@ -4191,13 +4173,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.2" +version = "1.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.3", + "regex-automata 0.4.5", "regex-syntax 0.8.2", ] @@ -4212,9 +4194,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" dependencies = [ "aho-corasick", "memchr", @@ -4241,9 +4223,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.23" +version = "0.11.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" dependencies = [ "base64 0.21.7", "bytes", @@ -4267,6 +4249,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "sync_wrapper", "system-configuration", "tokio", "tokio-rustls", @@ -4514,9 +4497,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.30" +version = "0.38.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" +checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" dependencies = [ "bitflags 2.4.2", "errno", @@ -4675,9 +4658,9 @@ dependencies = [ [[package]] name = "secp256k1" -version = "0.28.1" +version = "0.28.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f622567e3b4b38154fb8190bcf6b160d7a4301d70595a49195b48c116007a27" +checksum = "d24b59d129cdadea20aea4fb2352fa053712e5d713eee47d700cd4b2bc002f10" dependencies = [ "secp256k1-sys", ] @@ -5134,9 +5117,9 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "svm-rs" -version = "0.3.3" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20689c7d03b6461b502d0b95d6c24874c7d24dea2688af80486a130a06af3b07" +checksum = "11297baafe5fa0c99d5722458eac6a5e25c01eb1b8e5cd137f54079093daa7a4" dependencies = [ "dirs", "fs2", @@ -5187,6 +5170,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "sysinfo" version = "0.30.5" @@ -5326,12 +5315,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" +checksum = "fe80ced77cbfb4cb91a94bf72b378b4b6791a0d9b7f09d0be747d1bdff4e68bd" dependencies = [ "deranged", "itoa", + "num-conv", "powerfmt", "serde", "time-core", @@ -5346,10 +5336,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" dependencies = [ + "num-conv", "time-core", ] @@ -5695,9 +5686,9 @@ checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" [[package]] name = "uncased" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b9bc53168a4be7402ab86c3aad243a84dd7381d09be0eddc81280c1da95ca68" +checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697" dependencies = [ "version_check", ] @@ -5797,9 +5788,9 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "value-trait" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea87257cfcbedcb9444eda79c59fdfea71217e6305afee8ee33f500375c2ac97" +checksum = "dad8db98c1e677797df21ba03fca7d3bf9bec3ca38db930954e4fe6e1ea27eb4" dependencies = [ "float-cmp", "halfbrown", @@ -5943,9 +5934,9 @@ dependencies = [ [[package]] name = "wide" -version = "0.7.13" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c68938b57b33da363195412cfc5fc37c9ed49aa9cfe2156fde64b8d2c9498242" +checksum = "89beec544f246e679fc25490e3f8e08003bc4bf612068f325120dad4cea02c1c" dependencies = [ "bytemuck", "safe_arch", @@ -6135,9 +6126,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.34" +version = "0.5.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7cf47b659b318dccbd69cc4797a39ae128f533dce7902a1096044d1967b9c16" +checksum = "818ce546a11a9986bc24f93d0cdf38a8a1a400f1473ea8c82e59f6e0ffab9249" dependencies = [ "memchr", ] diff --git a/arbiter-core/src/database.rs b/arbiter-core/src/database.rs index 423e6f428..812d970f1 100644 --- a/arbiter-core/src/database.rs +++ b/arbiter-core/src/database.rs @@ -227,7 +227,7 @@ mod tests { assert_eq!(account_a.info.nonce, 1234); assert_eq!(account_a.info.balance, U256::from(0xfacade)); assert_eq!(account_a.info.code, None); - assert_eq!(account_a.info.code_hash, keccak256(&[])); + assert_eq!(account_a.info.code_hash, keccak256([])); let account_b = db .load_account(address!("0000000000000000000000000000000000000001")) diff --git a/arbiter-engine/Cargo.toml b/arbiter-engine/Cargo.toml index fef73219b..57d14be30 100644 --- a/arbiter-engine/Cargo.toml +++ b/arbiter-engine/Cargo.toml @@ -26,6 +26,7 @@ futures = "0.3.30" crossbeam-channel.workspace = true arbiter-core.workspace = true arbiter-bindings.workspace = true +thiserror.workspace = true [dev-dependencies] arbiter-core.workspace = true diff --git a/arbiter-engine/src/agent.rs b/arbiter-engine/src/agent.rs index 349c38163..ad31e77a3 100644 --- a/arbiter-engine/src/agent.rs +++ b/arbiter-engine/src/agent.rs @@ -1,82 +1,26 @@ -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -// TODO: Notes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -// * Maybe we just use tokio for everything (like `select`) so that we don't mix -// futures and tokio together in ways that may be weird. -// When we start running an agent, we should have their messager start producing -// events that can be used by any and all behaviors the agent has that takes in -// messages as an event. Similarly, we should have agents start up any streams -// listeners that they need so those can also produce events. Those can then be -// piped into the behaviors that need them. Can perhaps make behaviors come from -// very specific events (e.g., specific contract events). This means each -// behavior should be a consumer and perhaps the agent itself is the producer -// (or at least relayer). -// This means we should give agents some way to "start streams" that they can -// then use to produce events. -// Behaviors definitely need to be able to reference the agent's client and -// messager so that they can send messages and interact with the blockchain. -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - //! The agent module contains the core agent abstraction for the Arbiter Engine. -use std::{fmt::Debug, pin::Pin, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; -use arbiter_core::{data_collection::EventLogger, middleware::RevmMiddleware}; -use ethers::contract::{EthLogDecode, Event}; -use futures::stream::{Stream, StreamExt}; -use futures_util::future::join_all; +use arbiter_core::middleware::RevmMiddleware; use serde::de::DeserializeOwned; -use tokio::{ - sync::broadcast::{channel, Receiver as BroadcastReceiver, Sender as BroadcastSender}, - task::JoinHandle, -}; +use thiserror::Error; -use self::machine::MachineInstruction; -use super::*; use crate::{ machine::{Behavior, Engine, State, StateMachine}, messager::Messager, - world::World, }; -// TODO: For the time being, these agents are just meant to be for arbiter -// instances. We can generalize later. - /// An agent is an entity capable of processing events and producing actions. /// These are the core actors in simulations or in onchain systems. /// Agents can be connected of other agents either as a dependent, or a /// dependency. /// /// # How it works -/// The [`Agent`] works by implementing the [`StateMachine`] trait. When the -/// [`World`] that owns the [`Agent`] is asked to enter into a new state, the -/// [`World`] will ask each [`Agent`] it owns to run that state transition by -/// calling [`StateMachine::run_state`]. All of the [`Agent`]s at once will then -/// will be able to be asked to block and wait to finish their state transition -/// by calling [`StateMachine::transition`]. Ultimately, the [`Agent`] will -/// transition through the following states: -/// 1. [`State::Uninitialized`]: The [`Agent`] has been created, but has not -/// been started. -/// 2. [`State::Syncing`]: The [`Agent`] is syncing with the world. This is -/// where the [`Agent`] can be brought up to date with the latest state of the -/// world. This could be used if the world was stopped and later restarted. -/// 3. [`State::Startup`]: The [`Agent`] is starting up. This is where the -/// [`Agent`] can be initialized and setup. -/// 4. [`State::Processing`]: The [`Agent`] is processing. This is where the -/// [`Agent`] can process events and produce actions. The [`State::Processing`] -/// stage may run for a long time before all [`Agent`]s are finished processing. -/// This is the main stage of the [`Agent`] that predominantly runs automation. -/// 5. [`State::Stopped`]: The [`Agent`] is stopped. This is where the [`Agent`] -/// can be stopped and state of the [`World`] and its [`Agent`]s can be -/// offloaded and saved. -// todo(matt): use builder pattern where we just have the agent builder -// implement deserialize with just behavior_engines -// -// #[derive(Serialize, Deserialize)] -// pub struct AgentBuilder { -// pub id: String, -// pub behavior_engines: Option>>, -// pub world: &World -// } +/// When the [`World`] that owns the [`Agent`] is ran, it has each [`Agent`] run +/// each of its [`Behavior`]s `startup()` methods. The [`Behavior`]s themselves +/// will return a stream of events that then let the [`Behavior`] move into the +/// `State::Processing` stage. pub struct Agent { /// Identifier for this agent. /// Used for routing messages. @@ -87,79 +31,28 @@ pub struct Agent { /// The messager the agent uses to send and receive messages from other /// agents. - pub messager: Option, + pub messager: Messager, /// The client the agent uses to interact with the blockchain. pub client: Arc, - /// The generalized event streamer for the agent that can stream a JSON - /// `String`of any Ethereum event that can be decoded by behaviors. - pub event_streamer: Option, - /// The engines/behaviors that the agent uses to sync, startup, and process /// events. - behavior_engines: Vec>, - - /// The pipeline for yielding events from the centralized event streamer - /// (for both messages and Ethereum events) to agents. - pub(crate) distributor: (BroadcastSender, BroadcastReceiver), - - broadcast_task: Option + Send>>>>, + pub(crate) behavior_engines: Vec>, } impl Agent { /// Produces a minimal agent builder with the given identifier. pub fn builder(id: &str) -> Result { - let distributor = channel(512); Ok(AgentBuilder { id: id.to_owned(), - distributor, behavior_engines: None, }) } - - /// Adds an Ethereum event to the agent's event streamer. - pub fn with_event( - mut self, - event: Event, RevmMiddleware, D>, - ) -> Self { - self.event_streamer = Some(self.event_streamer.take().unwrap().add_stream(event)); - self - } - - pub(crate) async fn run(&mut self, instruction: MachineInstruction) { - let behavior_tasks = join_all(self.behavior_engines.drain(..).map(|mut engine| { - let instruction_clone = instruction.clone(); - tokio::spawn(async move { - engine.execute(instruction_clone).await; - engine - }) - })); - self.behavior_engines = behavior_tasks - .await - .into_iter() - .map(|res| res.unwrap()) - .collect::>(); - } -} - -/// enum representing the possible error states encountered by the agent builder -#[derive(Debug)] -pub enum AgentBuildError { - MissingBehaviorEngines, -} - -impl std::error::Error for AgentBuildError {} -impl std::fmt::Display for AgentBuildError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - AgentBuildError::MissingBehaviorEngines => { - write!(f, "Behavior engines must be set before building the agent") - } // ... other error variants - } - } } +/// [`AgentBuilder`] represents the intermediate state of agent creation before +/// it is converted into a full on [`Agent`] pub struct AgentBuilder { /// Identifier for this agent. /// Used for routing messages. @@ -167,19 +60,16 @@ pub struct AgentBuilder { /// The engines/behaviors that the agent uses to sync, startup, and process /// events. behavior_engines: Option>>, - /// The pipeline for yielding events from the centralized event streamer - /// (for both messages and Ethereum events) to agents. - pub(crate) distributor: (BroadcastSender, BroadcastReceiver), } impl AgentBuilder { + /// Appends a behavior onto an [`AgentBuilder`]. Behaviors are initialized + /// when the agent builder is added to the [`crate::world::World`] pub fn with_behavior( mut self, behavior: impl Behavior + 'static, ) -> Self { - let event_receiver = self.distributor.0.subscribe(); - - let engine = Engine::new(behavior, event_receiver); + let engine = Engine::new(behavior); if let Some(engines) = &mut self.behavior_engines { engines.push(Box::new(engine)); } else { @@ -188,183 +78,30 @@ impl AgentBuilder { self } - /// Produces a new agent with the given identifier. + /// Produces a new [`Agent`] with the given identifier. pub fn build( self, - messager: Messager, client: Arc, + messager: Messager, ) -> Result { match self.behavior_engines { Some(engines) => Ok(Agent { id: self.id, state: State::Uninitialized, - messager: Some(messager), + messager, client, - event_streamer: Some(EventLogger::builder()), behavior_engines: engines, - distributor: self.distributor, - broadcast_task: None, }), None => Err(AgentBuildError::MissingBehaviorEngines), } } } -#[async_trait::async_trait] -impl StateMachine for Agent { - #[tracing::instrument(skip(self), fields(id = self.id))] - async fn execute(&mut self, instruction: MachineInstruction) { - match instruction { - MachineInstruction::Sync(_, _) => { - debug!("Agent is syncing."); - self.state = State::Syncing; - self.run(MachineInstruction::Sync( - self.messager.clone(), - Some(self.client.clone()), - )) - .await; - } - MachineInstruction::Start => { - debug!("Agent is starting up."); - self.run(instruction).await; - } - MachineInstruction::Process => { - debug!("Agent is processing."); - self.state = State::Processing; - let messager = self.messager.take().unwrap(); - let message_stream = messager - .stream() - .map(|msg| serde_json::to_string(&msg).unwrap_or_else(|e| e.to_string())); - - let eth_event_stream = self.event_streamer.take().unwrap().stream(); - - let mut event_stream: Pin + Send + '_>> = - if let Some(event_stream) = eth_event_stream { - trace!("Merging event streams."); - // Convert the individual streams into a Vec - let all_streams = vec![ - Box::pin(message_stream) as Pin + Send>>, - Box::pin(event_stream), - ]; - // Use select_all to combine them - Box::pin(futures::stream::select_all(all_streams)) - } else { - trace!("Agent only sees message stream."); - Box::pin(message_stream) - }; - - let sender = self.distributor.0.clone(); - self.broadcast_task = Some(tokio::spawn(async move { - while let Some(event) = event_stream.next().await { - sender.send(event).unwrap(); - } - event_stream - })); - self.run(instruction).await; - } - MachineInstruction::Stop => { - unreachable!("This is never explicitly called on an agent.") - } - } - } -} - -#[cfg(test)] -mod tests { - use arbiter_bindings::bindings::arbiter_token::ArbiterToken; - use ethers::types::U256; - - use super::*; - use crate::messager::Message; - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn streaming() { - // std::env::set_var("RUST_LOG", "trace"); - // tracing_subscriber::fmt::init(); - - let world = World::new("world"); - let agent = Agent::builder("agent").unwrap(); - // let arb = ArbiterToken::deploy( - // agent.client.clone(), - // ("ArbiterToken".to_string(), "ARB".to_string(), 18u8), - // ) - // .unwrap() - // .send() - // .await - // .unwrap(); - // - // let mut agent = agent.with_event(arb.events()); - // let address = agent.client.address(); - // - // TODO: (START BLOCK) It would be nice to get this block to be a single - // function that isn't copy and pasted from above. - // let messager = agent.messager.take().unwrap(); - // let message_stream = messager - // .stream() - // .map(|msg| serde_json::to_string(&msg).unwrap_or_else(|e| - // e.to_string())); let eth_event_stream = - // agent.event_streamer.take().unwrap().stream(); - // - // let mut event_stream: Pin + Send + '_>> - // = if let Some(event_stream) = eth_event_stream { - // trace!("Merging event streams."); - // let all_streams = vec![ - // Box::pin(message_stream) as Pin + - // Send>>, Box::pin(event_stream), - // ]; - // Box::pin(futures::stream::select_all(all_streams)) - // } else { - // trace!("Agent only sees message stream."); - // Box::pin(message_stream) - // }; - // TODO: (END BLOCK) - // - // let outside_messager = world.messager.join_with_id(None); - // let message_task = tokio::spawn(async move { - // for _ in 0..5 { - // outside_messager - // .send(Message { - // from: "god".to_string(), - // to: messager::To::All, - // data: "hello".to_string(), - // }) - // .await; - // } - // }); - // - // let eth_event_task = tokio::spawn(async move { - // for i in 0..5 { - // if i == 0 { - // tokio::time::sleep(std::time::Duration::from_secs(1)).await; - // } - // arb.approve(address, U256::from(1)) - // .send() - // .await - // .unwrap() - // .await - // .unwrap(); - // } - // }); - // - // let mut idx = 0; - // let print_task = tokio::spawn(async move { - // while let Some(msg) = event_stream.next().await { - // println!("Printing message in test: {:?}", msg); - // if idx < 5 { - // assert_eq!(msg, - // "{\"from\":\"god\",\"to\":\"All\",\"data\":\"hello\"}"); - // } else { - // assert_eq!(msg, - // "{\"ApprovalFilter\":{\"owner\":\" - // 0xe7a46f3d9f0e9b9c02f58f95e3bcee2db54050b0\",\"spender\":\" - // 0xe7a46f3d9f0e9b9c02f58f95e3bcee2db54050b0\",\"amount\":\"0x1\"}}"); - // } - // idx += 1; - // if idx == 10 { - // break; - // } - // } - // }); - // join_all(vec![message_task, eth_event_task, print_task]).await; - } +/// enum representing the possible error states encountered by the agent builder +#[derive(Debug, Error, Clone, PartialEq, Eq)] +pub enum AgentBuildError { + /// Error representing the case where the agent is missing behavior engines; + /// an agent has to have behaviors to be useful! + #[error("Agent is missing behavior engines")] + MissingBehaviorEngines, } diff --git a/arbiter-engine/src/examples/minter/agents/mod.rs b/arbiter-engine/src/examples/minter/agents/mod.rs new file mode 100644 index 000000000..7db494eed --- /dev/null +++ b/arbiter-engine/src/examples/minter/agents/mod.rs @@ -0,0 +1,3 @@ +use super::*; +pub mod token_admin; +pub mod token_requester; diff --git a/arbiter-engine/src/examples/minter/agents/token_admin.rs b/arbiter-engine/src/examples/minter/agents/token_admin.rs new file mode 100644 index 000000000..f3ef512cf --- /dev/null +++ b/arbiter-engine/src/examples/minter/agents/token_admin.rs @@ -0,0 +1,32 @@ +use super::*; + +#[derive(Debug)] +pub struct TokenAdmin { + /// The identifier of the token admin. + pub token_data: HashMap, + pub tokens: Option>>, + pub client: Option>, + pub messager: Option, + pub count: u64, + pub max_count: Option, + startup_message: Option, +} + +impl TokenAdmin { + pub fn new(max_count: Option) -> Self { + Self { + token_data: HashMap::new(), + tokens: None, + client: None, + messager: None, + count: 0, + max_count, + startup_message: None, + } + } + + /// Adds a token to the token admin. + pub fn add_token(&mut self, token_data: TokenData) { + self.token_data.insert(token_data.name.clone(), token_data); + } +} diff --git a/arbiter-engine/src/examples/minter/agents/token_requester.rs b/arbiter-engine/src/examples/minter/agents/token_requester.rs new file mode 100644 index 000000000..9fa40d9fe --- /dev/null +++ b/arbiter-engine/src/examples/minter/agents/token_requester.rs @@ -0,0 +1,35 @@ +use super::*; + +/// The token requester is responsible for requesting tokens from the token +/// admin. This agents is purely for testing purposes as far as I can tell. +#[derive(Debug)] +pub struct TokenRequester { + /// The tokens that the token requester has requested. + pub token_data: TokenData, + /// The agent ID to request tokens to. + pub request_to: String, + /// Client to have an address to receive token mint to and check balance + pub client: Option>, + /// The messaging layer for the token requester. + pub messager: Option, + pub count: u64, + pub max_count: Option, +} + +impl TokenRequester { + pub fn new(max_count: Option) -> Self { + Self { + token_data: TokenData { + name: TOKEN_NAME.to_owned(), + symbol: TOKEN_SYMBOL.to_owned(), + decimals: TOKEN_DECIMALS, + address: None, + }, + request_to: TOKEN_ADMIN_ID.to_owned(), + client: None, + messager: None, + count: 0, + max_count, + } + } +} diff --git a/arbiter-engine/src/examples/minter/behaviors/mod.rs b/arbiter-engine/src/examples/minter/behaviors/mod.rs new file mode 100644 index 000000000..7db494eed --- /dev/null +++ b/arbiter-engine/src/examples/minter/behaviors/mod.rs @@ -0,0 +1,3 @@ +use super::*; +pub mod token_admin; +pub mod token_requester; diff --git a/arbiter-engine/src/examples/minter/behaviors/token_admin.rs b/arbiter-engine/src/examples/minter/behaviors/token_admin.rs new file mode 100644 index 000000000..f7c09b37c --- /dev/null +++ b/arbiter-engine/src/examples/minter/behaviors/token_admin.rs @@ -0,0 +1,110 @@ +use self::examples::minter::agents::token_admin::TokenAdmin; +use super::*; + +/// Used as an action to ask what tokens are available. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum TokenAdminQuery { + /// Get the address of the token. + AddressOf(String), + + /// Mint tokens. + MintRequest(MintRequest), +} + +/// Used as an action to mint tokens. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct MintRequest { + /// The token to mint. + pub token: String, + + /// The address to mint to. + pub mint_to: Address, + + /// The amount to mint. + pub mint_amount: u64, +} + +#[async_trait::async_trait] +impl Behavior for TokenAdmin { + #[tracing::instrument(skip(self), fields(id = messager.id.as_deref()))] + async fn startup( + &mut self, + client: Arc, + messager: Messager, + ) -> Pin + Send + Sync>> { + self.messager = Some(messager.clone()); + self.client = Some(client.clone()); + for token_data in self.token_data.values_mut() { + let token = ArbiterToken::deploy( + client.clone(), + ( + token_data.name.clone(), + token_data.symbol.clone(), + token_data.decimals, + ), + ) + .unwrap() + .send() + .await + .unwrap(); + + token_data.address = Some(token.address()); + self.tokens + .get_or_insert_with(HashMap::new) + .insert(token_data.name.clone(), token.clone()); + } + Box::pin(messager.stream()) + } + + #[tracing::instrument(skip(self), fields(id = + self.messager.as_ref().unwrap().id.as_deref()))] + async fn process(&mut self, event: Message) -> Option { + if self.tokens.is_none() { + error!( + "There were no tokens to deploy! You must add tokens to + the token admin before running the simulation." + ); + } + + let query: TokenAdminQuery = serde_json::from_str(&event.data).unwrap(); + trace!("Got query: {:?}", query); + let messager = self.messager.as_ref().unwrap(); + match query { + TokenAdminQuery::AddressOf(token_name) => { + trace!( + "Getting address of token with name: {:?}", + token_name.clone() + ); + let token_data = self.token_data.get(&token_name).unwrap(); + let message = Message { + from: messager.id.clone().unwrap(), + to: To::Agent(event.from.clone()), // Reply back to sender + data: serde_json::to_string(&token_data.address).unwrap(), + }; + messager.send(message).await; + } + TokenAdminQuery::MintRequest(mint_request) => { + trace!("Minting tokens: {:?}", mint_request); + let token = self + .tokens + .as_ref() + .unwrap() + .get(&mint_request.token) + .unwrap(); + token + .mint(mint_request.mint_to, U256::from(mint_request.mint_amount)) + .send() + .await + .unwrap() + .await + .unwrap(); + self.count += 1; + if self.count == self.max_count.unwrap_or(u64::MAX) { + warn!("Reached max count. Halting behavior."); + return Some(MachineHalt); + } + } + } + None + } +} diff --git a/arbiter-engine/src/examples/minter/behaviors/token_requester.rs b/arbiter-engine/src/examples/minter/behaviors/token_requester.rs new file mode 100644 index 000000000..6a125485c --- /dev/null +++ b/arbiter-engine/src/examples/minter/behaviors/token_requester.rs @@ -0,0 +1,73 @@ +use arbiter_bindings::bindings::arbiter_token::TransferFilter; +use arbiter_core::data_collection::EventLogger; +use token_admin::{MintRequest, TokenAdminQuery}; + +use self::examples::minter::agents::token_requester::TokenRequester; +use super::*; + +#[async_trait::async_trait] +impl Behavior for TokenRequester { + #[tracing::instrument(skip(self), fields(id = messager.id.as_deref()))] + async fn startup( + &mut self, + client: Arc, + mut messager: Messager, + ) -> Pin + Send + Sync>> { + let message = Message { + from: messager.id.clone().unwrap(), + to: To::Agent(self.request_to.clone()), + data: serde_json::to_string(&TokenAdminQuery::AddressOf(self.token_data.name.clone())) + .unwrap(), + }; + messager.send(message).await; + let message = messager.get_next().await; + let token_address = serde_json::from_str::
(&message.data).unwrap(); + let token = ArbiterToken::new(token_address, client.clone()); + self.token_data.address = Some(token_address); + + let mint_data = serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { + token: self.token_data.name.clone(), + mint_to: client.address(), + mint_amount: 1, + })) + .unwrap(); + let mint_request = Message { + from: messager.id.clone().unwrap(), + to: To::Agent(self.request_to.clone()), + data: mint_data, + }; + messager.send(mint_request).await; + + self.messager = Some(messager.clone()); + self.client = Some(client.clone()); + return Box::pin( + EventLogger::builder() + .add_stream(token.transfer_filter()) + .stream() + .unwrap() + .map(|value| serde_json::from_str(&value).unwrap()), + ); + } + + #[tracing::instrument(skip(self), fields(id = + self.messager.as_ref().unwrap().id.as_deref()))] + async fn process(&mut self, event: TransferFilter) -> Option { + let messager = self.messager.as_ref().unwrap(); + while (self.count < self.max_count.unwrap()) { + debug!("sending message from requester"); + let message = Message { + from: messager.id.clone().unwrap(), + to: To::Agent(self.request_to.clone()), + data: serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { + token: self.token_data.name.clone(), + mint_to: self.client.as_ref().unwrap().address(), + mint_amount: 1, + })) + .unwrap(), + }; + messager.send(message).await; + self.count += 1; + } + Some(MachineHalt) + } +} diff --git a/arbiter-engine/src/examples/minter/mod.rs b/arbiter-engine/src/examples/minter/mod.rs new file mode 100644 index 000000000..b4878aabb --- /dev/null +++ b/arbiter-engine/src/examples/minter/mod.rs @@ -0,0 +1,30 @@ +use super::*; +pub mod agents; +pub mod behaviors; +pub mod token_minter; + +use std::pin::Pin; + +use futures_util::Stream; +use tracing::error; + +use crate::{ + agent::Agent, + machine::{Behavior, MachineHalt, MachineInstruction, StateMachine}, + messager::To, + world::World, +}; + +const TOKEN_ADMIN_ID: &str = "token_admin"; +const REQUESTER_ID: &str = "requester"; +const TOKEN_NAME: &str = "Arbiter Token"; +const TOKEN_SYMBOL: &str = "ARB"; +const TOKEN_DECIMALS: u8 = 18; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct TokenData { + pub name: String, + pub symbol: String, + pub decimals: u8, + pub address: Option
, +} diff --git a/arbiter-engine/src/examples/minter/token_minter.rs b/arbiter-engine/src/examples/minter/token_minter.rs new file mode 100644 index 000000000..e8319aeb2 --- /dev/null +++ b/arbiter-engine/src/examples/minter/token_minter.rs @@ -0,0 +1,60 @@ +use std::{str::FromStr, time::Duration}; + +use agents::{token_admin::TokenAdmin, token_requester::TokenRequester}; +use arbiter_core::data_collection::EventLogger; +use ethers::types::Address; +use tokio::time::timeout; + +use super::*; +use crate::world::World; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn token_minter_simulation() { + let mut world = World::new("test_world"); + let client = RevmMiddleware::new(&world.environment, None).unwrap(); + + // Create the token admin agent + let token_admin = Agent::builder(TOKEN_ADMIN_ID).unwrap(); + let mut token_admin_behavior = TokenAdmin::new(Some(4)); + token_admin_behavior.add_token(TokenData { + name: TOKEN_NAME.to_owned(), + symbol: TOKEN_SYMBOL.to_owned(), + decimals: TOKEN_DECIMALS, + address: None, + }); + // Create the token requester agent + let token_requester = Agent::builder(REQUESTER_ID).unwrap(); + let mut token_requester_behavior = TokenRequester::new(Some(4)); + world.add_agent(token_requester.with_behavior(token_requester_behavior)); + + world.add_agent(token_admin.with_behavior(token_admin_behavior)); + + let arb = ArbiterToken::new( + Address::from_str("0x240a76d4c8a7dafc6286db5fa6b589e8b21fc00f").unwrap(), + client.clone(), + ); + let transfer_event = arb.transfer_filter(); + + let transfer_stream = EventLogger::builder() + .add_stream(arb.transfer_filter()) + .stream() + .unwrap(); + let mut stream = Box::pin(transfer_stream); + world.run().await; + let mut idx = 0; + + loop { + match timeout(Duration::from_secs(1), stream.next()).await { + Ok(Some(event)) => { + println!("Event received in outside world: {:?}", event); + idx += 1; + if idx == 4 { + break; + } + } + _ => { + panic!("Timeout reached. Test failed."); + } + } + } +} diff --git a/arbiter-engine/src/examples/mod.rs b/arbiter-engine/src/examples/mod.rs index 5ca2c05e4..cb7653b28 100644 --- a/arbiter-engine/src/examples/mod.rs +++ b/arbiter-engine/src/examples/mod.rs @@ -18,5 +18,5 @@ use futures_util::{stream, StreamExt}; use super::*; use crate::messager::{Message, Messager}; +mod minter; mod timed_message; -mod token_minter; diff --git a/arbiter-engine/src/examples/timed_message.rs b/arbiter-engine/src/examples/timed_message.rs index 925b5fcce..56fe2be65 100644 --- a/arbiter-engine/src/examples/timed_message.rs +++ b/arbiter-engine/src/examples/timed_message.rs @@ -2,8 +2,10 @@ const AGENT_ID: &str = "agent"; -use std::time::Duration; +use std::{pin::Pin, time::Duration}; +use ethers::types::BigEndianHash; +use futures_util::Stream; use tokio::time::timeout; use self::machine::MachineHalt; @@ -22,6 +24,7 @@ struct TimedMessage { messager: Option, count: u64, max_count: Option, + startup_message: Option, } impl TimedMessage { @@ -30,6 +33,7 @@ impl TimedMessage { receive_data: String, send_data: String, max_count: Option, + startup_message: Option, ) -> Self { Self { delay, @@ -38,12 +42,35 @@ impl TimedMessage { messager: None, count: 0, max_count, + startup_message, } } } #[async_trait::async_trait] impl Behavior for TimedMessage { + async fn startup( + &mut self, + _client: Arc, + messager: Messager, + ) -> Pin + Send + Sync>> { + trace!("Starting up `TimedMessage`."); + self.messager = Some(messager.clone()); + tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await; + if let Some(startup_message) = &self.startup_message { + messager + .clone() + .send(Message { + from: messager.id.clone().unwrap(), + to: To::All, + data: startup_message.clone(), + }) + .await; + } + trace!("Started `TimedMessage`."); + return Box::pin(messager.stream()); + } + async fn process(&mut self, event: Message) -> Option { trace!("Processing event."); let messager = self.messager.as_ref().unwrap(); @@ -66,19 +93,6 @@ impl Behavior for TimedMessage { trace!("Processed event."); None } - - async fn sync(&mut self, messager: Messager, _client: Arc) { - trace!("Syncing state for `TimedMessage`."); - self.messager = Some(messager); - tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await; - trace!("Synced state for `TimedMessage`."); - } - - async fn startup(&mut self) { - trace!("Starting up `TimedMessage`."); - tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await; - trace!("Started up `TimedMessage`."); - } } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -91,19 +105,12 @@ async fn echoer() { "Hello, world!".to_owned(), "Hello, world!".to_owned(), Some(2), + Some("Hello, world!".to_owned()), ); world.add_agent(agent.with_behavior(behavior)); + let messager = world.messager.for_agent("outside_world"); - let messager = world.messager.join_with_id(Some("god".to_owned())); - let task = world.run(); - - let message = Message { - from: "god".to_owned(), - to: To::Agent("agent".to_owned()), - data: "Hello, world!".to_owned(), - }; - messager.send(message).await; - task.await; + world.run().await; let mut stream = Box::pin(messager.stream()); let mut idx = 0; @@ -129,25 +136,22 @@ async fn ping_pong() { let mut world = World::new("world"); let agent = Agent::builder(AGENT_ID).unwrap(); - let behavior_ping = TimedMessage::new(1, "pong".to_owned(), "ping".to_owned(), Some(2)); - let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2)); + let behavior_ping = TimedMessage::new( + 1, + "pong".to_owned(), + "ping".to_owned(), + Some(2), + Some("ping".to_owned()), + ); + let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2), None); world.add_agent( agent .with_behavior(behavior_ping) .with_behavior(behavior_pong), ); - let messager = world.messager.join_with_id(Some("god".to_owned())); - let task = world.run(); - - let init_message = Message { - from: "god".to_owned(), - to: To::Agent("agent".to_owned()), - data: "ping".to_owned(), - }; - messager.send(init_message).await; - - task.await; + let messager = world.messager.for_agent("outside_world"); + world.run().await; let mut stream = Box::pin(messager.stream()); let mut idx = 0; @@ -172,30 +176,23 @@ async fn ping_pong() { async fn ping_pong_two_agent() { let mut world = World::new("world"); - let behavior_ping = TimedMessage::new(1, "pong".to_owned(), "ping".to_owned(), Some(2)); - let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2)); - let agent_ping = Agent::builder("agent_ping") - .unwrap() - .with_behavior(behavior_ping); - let agent_pong = Agent::builder("agent_pong") - .unwrap() - .with_behavior(behavior_pong); + let agent_ping = Agent::builder("agent_ping").unwrap(); + let agent_pong = Agent::builder("agent_pong").unwrap(); - world.add_agent(agent_ping); - world.add_agent(agent_pong); - - let messager = world.messager.join_with_id(Some("god".to_owned())); - let task = world.run(); - - let init_message = Message { - from: "god".to_owned(), - to: To::All, - data: "ping".to_owned(), - }; + let behavior_ping = TimedMessage::new( + 1, + "pong".to_owned(), + "ping".to_owned(), + Some(2), + Some("ping".to_owned()), + ); + let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2), None); - messager.send(init_message).await; + world.add_agent(agent_ping.with_behavior(behavior_ping)); + world.add_agent(agent_pong.with_behavior(behavior_pong)); - task.await; + let messager = world.messager.for_agent("outside_world"); + world.run().await; let mut stream = Box::pin(messager.stream()); let mut idx = 0; diff --git a/arbiter-engine/src/examples/token_minter.rs b/arbiter-engine/src/examples/token_minter.rs deleted file mode 100644 index c7dc8adc7..000000000 --- a/arbiter-engine/src/examples/token_minter.rs +++ /dev/null @@ -1,359 +0,0 @@ -use std::{str::FromStr, time::Duration}; - -use anyhow::Context; -use arbiter_bindings::bindings::arbiter_token; -use arbiter_core::data_collection::EventLogger; -use ethers::{ - abi::token, - types::{transaction::request, Filter}, -}; -use tokio::time::timeout; -use tracing::error; - -use self::machine::MachineHalt; -use super::*; -use crate::{ - agent::Agent, - machine::{Behavior, MachineInstruction, StateMachine}, - messager::To, - world::World, -}; - -const TOKEN_ADMIN_ID: &str = "token_admin"; -const REQUESTER_ID: &str = "requester"; -const TOKEN_NAME: &str = "Arbiter Token"; -const TOKEN_SYMBOL: &str = "ARB"; -const TOKEN_DECIMALS: u8 = 18; - -/// The token admin is responsible for handling token minting requests. -#[derive(Debug)] -pub struct TokenAdmin { - /// The identifier of the token admin. - pub token_data: HashMap, - - pub tokens: Option>>, - - pub client: Option>, - - pub messager: Option, - - count: u64, - - max_count: Option, -} - -impl TokenAdmin { - pub fn new(count: u64, max_count: Option) -> Self { - Self { - token_data: HashMap::new(), - tokens: None, - client: None, - messager: None, - count, - max_count, - } - } - - /// Adds a token to the token admin. - pub fn add_token(&mut self, token_data: TokenData) { - self.token_data.insert(token_data.name.clone(), token_data); - } -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct TokenData { - pub name: String, - pub symbol: String, - pub decimals: u8, - pub address: Option
, -} - -/// Used as an action to ask what tokens are available. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub enum TokenAdminQuery { - /// Get the address of the token. - AddressOf(String), - - /// Mint tokens. - MintRequest(MintRequest), -} - -/// Used as an action to mint tokens. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct MintRequest { - /// The token to mint. - pub token: String, - - /// The address to mint to. - pub mint_to: Address, - - /// The amount to mint. - pub mint_amount: u64, -} - -#[async_trait::async_trait] -impl Behavior for TokenAdmin { - #[tracing::instrument(skip(self), fields(id = messager.id.as_deref()))] - async fn sync(&mut self, messager: Messager, client: Arc) { - for token_data in self.token_data.values_mut() { - let token = ArbiterToken::deploy( - client.clone(), - ( - token_data.name.clone(), - token_data.symbol.clone(), - token_data.decimals, - ), - ) - .unwrap() - .send() - .await - .unwrap(); - token_data.address = Some(token.address()); - self.tokens - .get_or_insert_with(HashMap::new) - .insert(token_data.name.clone(), token.clone()); - debug!("Deployed token: {:?}", token); - } - self.messager = Some(messager); - self.client = Some(client); - } - - #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] - async fn process(&mut self, event: Message) -> Option { - if self.tokens.is_none() { - error!( - "There were no tokens to deploy! You must add tokens to -the token admin before running the simulation." - ); - } - - let query: TokenAdminQuery = serde_json::from_str(&event.data).unwrap(); - trace!("Got query: {:?}", query); - let messager = self.messager.as_ref().unwrap(); - match query { - TokenAdminQuery::AddressOf(token_name) => { - trace!( - "Getting address of token with name: {:?}", - token_name.clone() - ); - let token_data = self.token_data.get(&token_name).unwrap(); - let message = Message { - from: messager.id.clone().unwrap(), - to: To::Agent(event.from.clone()), // Reply back to sender - data: serde_json::to_string(token_data).unwrap(), - }; - messager.send(message).await; - } - TokenAdminQuery::MintRequest(mint_request) => { - trace!("Minting tokens: {:?}", mint_request); - let token = self - .tokens - .as_ref() - .unwrap() - .get(&mint_request.token) - .unwrap(); - token - .mint(mint_request.mint_to, U256::from(mint_request.mint_amount)) - .send() - .await - .unwrap() - .await - .unwrap(); - self.count += 1; - if self.count == self.max_count.unwrap_or(u64::MAX) { - warn!("Reached max count. Halting behavior."); - return Some(MachineHalt); - } - } - } - None - } -} - -/// The token requester is responsible for requesting tokens from the token -/// admin. This agents is purely for testing purposes as far as I can tell. -#[derive(Debug)] -pub struct TokenRequester { - /// The tokens that the token requester has requested. - pub token_data: TokenData, - - /// The agent ID to request tokens to. - pub request_to: String, - - /// Client to have an address to receive token mint to and check balance - pub client: Option>, - - /// The messaging layer for the token requester. - pub messager: Option, - - pub count: u64, - - pub max_count: Option, -} - -impl TokenRequester { - pub fn new(count: u64, max_count: Option) -> Self { - Self { - token_data: TokenData { - name: TOKEN_NAME.to_owned(), - symbol: TOKEN_SYMBOL.to_owned(), - decimals: TOKEN_DECIMALS, - address: None, - }, - request_to: TOKEN_ADMIN_ID.to_owned(), - client: None, - messager: None, - count, - max_count, - } - } -} - -#[async_trait::async_trait] -impl Behavior for TokenRequester { - #[tracing::instrument(skip(self), fields(id = messager.id.as_deref()))] - async fn sync(&mut self, messager: Messager, client: Arc) { - self.messager = Some(messager); - self.client = Some(client); - } - - #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] - async fn startup(&mut self) { - let messager = self.messager.as_ref().unwrap(); - trace!("Requesting address of token: {:?}", self.token_data.name); - let message = Message { - from: messager.id.clone().unwrap(), - to: To::Agent(self.request_to.clone()), - data: serde_json::to_string(&TokenAdminQuery::AddressOf(self.token_data.name.clone())) - .unwrap(), - }; - messager.send(message).await; - } - - #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] - async fn process(&mut self, event: Message) -> Option { - if let Ok(token_data) = serde_json::from_str::(&event.data) { - let messager = self.messager.as_ref().unwrap(); - trace!( - "Got -token data: {:?}", - token_data - ); - trace!( - "Requesting first mint of -token: {:?}", - self.token_data.name - ); - let message = Message { - from: messager.id.clone().unwrap(), - to: To::Agent(self.request_to.clone()), - data: serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { - token: self.token_data.name.clone(), - mint_to: self.client.as_ref().unwrap().address(), - mint_amount: 1, - })) - .unwrap(), - }; - messager.send(message).await; - } - Some(MachineHalt) - } -} -// world.run() -> sync state -> iterate over agents, `execute` each agent -> -// agent sync state -> iterate over behaviors -> behavior sync state -#[async_trait::async_trait] -impl Behavior for TokenRequester { - async fn sync(&mut self, messager: Messager, client: Arc) { - self.client = Some(client); - self.messager = Some(messager); - } - - #[tracing::instrument(skip(self), fields(id = self.messager.as_ref().unwrap().id.as_deref()))] - async fn process(&mut self, event: arbiter_token::TransferFilter) -> Option { - let messager = self.messager.as_ref().unwrap(); - trace!( - "Got event for -`TokenRequester` logger: {:?}", - event - ); - std::thread::sleep(std::time::Duration::from_secs(1)); - let message = Message { - from: messager.id.clone().unwrap(), - to: To::Agent(self.request_to.clone()), - data: serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { - token: self.token_data.name.clone(), - mint_to: self.client.as_ref().unwrap().address(), - mint_amount: 1, - })) - .unwrap(), - }; - messager.send(message).await; - self.count += 1; - if self.count == self.max_count.unwrap_or(u64::MAX) { - warn!("Reached max count. Halting behavior."); - return Some(MachineHalt); - } - None - } -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn token_minter_simulation() { - // 3. have a method on world to update mutable map of addresses - let mut world = World::new("test_world"); - // self.contracts: HashMap - - // Create the token admin agent - // 1. use agent builder struct to get rid of reference to world - let token_admin = Agent::builder(TOKEN_ADMIN_ID).unwrap(); - let mut token_admin_behavior = TokenAdmin::new(0, Some(4)); - token_admin_behavior.add_token(TokenData { - name: TOKEN_NAME.to_owned(), - symbol: TOKEN_SYMBOL.to_owned(), - decimals: TOKEN_DECIMALS, - address: None, - }); - world.add_agent(token_admin.with_behavior(token_admin_behavior)); - - // Create the token requester agent - let token_requester = Agent::builder(REQUESTER_ID).unwrap(); - let token_requester_behavior = TokenRequester::new(0, Some(4)); - // 2. appropriately handle event driven behaviors - // let arb = ArbiterToken::new( - // Address::from_str("0x240a76d4c8a7dafc6286db5fa6b589e8b21fc00f").unwrap(), - // token_requester.client.clone(), - // ); - // let transfer_event = arb.transfer_filter(); - // - // let token_requester_behavior_again = TokenRequester::new(0, Some(4)); - // world.add_agent( - // token_requester - // .with_behavior::(token_requester_behavior) - // .with_behavior::(token_requester_behavior_again) - // .with_event(transfer_event), - // ); - // - // let transfer_stream = EventLogger::builder() - // .add_stream(arb.transfer_filter()) - // .stream() - // .unwrap(); - // let mut stream = Box::pin(transfer_stream); - // let mut idx = 0; - // - // world.run().await; - // - // loop { - // match timeout(Duration::from_secs(1), stream.next()).await { - // Ok(Some(event)) => { - // println!("Event received in outside world: {:?}", event); - // idx += 1; - // if idx == 4 { - // break; - // } - // } - // _ => { - // panic!("Timeout reached. Test failed."); - // } - // } - // } -} diff --git a/arbiter-engine/src/lib.rs b/arbiter-engine/src/lib.rs index 1a2869242..52106e5a2 100644 --- a/arbiter-engine/src/lib.rs +++ b/arbiter-engine/src/lib.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; +use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; #[allow(unused)] use tracing::{debug, trace, warn}; diff --git a/arbiter-engine/src/machine.rs b/arbiter-engine/src/machine.rs index 856f42c97..c7229a334 100644 --- a/arbiter-engine/src/machine.rs +++ b/arbiter-engine/src/machine.rs @@ -1,21 +1,11 @@ //! The [`StateMachine`] trait, [`Behavior`] trait, and the [`Engine`] that runs //! [`Behavior`]s. -// TODO: Notes -// I think we should have the `sync` stage of the behavior receive the client -// and messager and then the user can decide if it wants to use those in their -// behavior. - -// Could typestate pattern help here at all? Sync could produce a `Synced` state -// behavior that can then not have options for client and messager. Then the -// user can decide if they want to use those in their behavior and get a bit -// simpler UX. - -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, pin::Pin, sync::Arc}; use arbiter_core::middleware::RevmMiddleware; +use futures_util::{Stream, StreamExt}; use serde::de::DeserializeOwned; -use tokio::sync::broadcast::Receiver; use self::messager::Messager; use super::*; @@ -23,21 +13,14 @@ use super::*; /// The instructions that can be sent to a [`StateMachine`]. #[derive(Clone, Debug)] pub enum MachineInstruction { - /// Used to make a [`StateMachine`] sync with the world. - Sync(Option, Option>), - /// Used to make a [`StateMachine`] start up. - Start, + Start(Arc, Messager), /// Used to make a [`StateMachine`] process events. /// This will offload the process into a task that can be halted by sending /// a [`MachineHalt`] message from the [`Messager`]. For our purposes, the /// [`crate::world::World`] will handle this. Process, - - /// Used to make a [`StateMachine`] stop. Only applicable for the - /// [`crate::world::World`] currently. - Stop, } /// The message that can be used in a [`StateMachine`] to halt its processing. @@ -52,12 +35,6 @@ pub enum State { /// This is the state adopted by the entity when it is first created. Uninitialized, - /// The entity is syncing with the world. - /// This can be used to bring the entity back up to date with the latest - /// state of the world. This could be used if the world was stopped and - /// later restarted. - Syncing, - /// The entity is starting up. /// This is where the entity can engage in its specific start up activities /// that it can do given the current state of the world. @@ -68,10 +45,6 @@ pub enum State { /// This is where the entity can engage in its specific processing /// of events that can lead to actions being taken. Processing, - - /// The entity is stopped. - /// This is where state can be offloaded and saved if need be. - Stopped, } // NOTE: `async_trait::async_trait` is used throughout to make the trait object @@ -81,14 +54,14 @@ pub enum State { /// by a [`StateMachine`]. This constitutes what each state transition will do. #[async_trait::async_trait] pub trait Behavior: Send + Sync + 'static { - /// Used to bring the agent back up to date with the latest state of the - /// world. This could be used if the world was stopped and later restarted. - async fn sync(&mut self, _messager: Messager, _client: Arc) {} - /// Used to start the agent. /// This is where the agent can engage in its specific start up activities /// that it can do given the current state of the world. - async fn startup(&mut self) {} + async fn startup( + &mut self, + client: Arc, + messager: Messager, + ) -> Pin + Send + Sync>>; /// Used to process events. /// This is where the agent can engage in its specific processing @@ -122,7 +95,7 @@ where /// The receiver of events that the [`Engine`] will process. /// The [`State::Processing`] stage will attempt a decode of the [`String`]s /// into the event type ``. - event_receiver: Option>, + event_stream: Option + Send + Sync>>>, phantom: std::marker::PhantomData, } @@ -133,11 +106,11 @@ where E: DeserializeOwned + Send + Sync + 'static, { /// Creates a new [`Engine`] with the given [`Behavior`] and [`Receiver`]. - pub(crate) fn new(behavior: B, event_receiver: Receiver) -> Self { + pub(crate) fn new(behavior: B) -> Self { Self { behavior: Some(behavior), state: State::Uninitialized, - event_receiver: Some(event_receiver), + event_stream: None, phantom: std::marker::PhantomData, } } @@ -151,61 +124,40 @@ where { async fn execute(&mut self, instruction: MachineInstruction) { match instruction { - MachineInstruction::Sync(messager, client) => { - trace!("Behavior is syncing."); - self.state = State::Syncing; - let mut behavior = self.behavior.take().unwrap(); - let behavior_task = tokio::spawn(async move { - behavior.sync(messager.unwrap(), client.unwrap()).await; - behavior - }); - self.behavior = Some(behavior_task.await.unwrap()); - } - MachineInstruction::Start => { + MachineInstruction::Start(client, messager) => { trace!("Behavior is starting up."); self.state = State::Starting; let mut behavior = self.behavior.take().unwrap(); let behavior_task = tokio::spawn(async move { - behavior.startup().await; - behavior + let id = messager.id.clone(); + debug!("starting up stream for {:?}!", id); + let stream = behavior.startup(client, messager).await; + debug!("startup complete for {:?}!", id); + (stream, behavior) }); - self.behavior = Some(behavior_task.await.unwrap()); + let (stream, behavior) = behavior_task.await.unwrap(); + self.event_stream = Some(stream); + self.behavior = Some(behavior); + // TODO: This feels weird but I think it works properly? + self.execute(MachineInstruction::Process).await; } MachineInstruction::Process => { trace!("Behavior is processing."); let mut behavior = self.behavior.take().unwrap(); - let mut receiver = self.event_receiver.take().unwrap(); + let mut stream = self.event_stream.take().unwrap(); let behavior_task = tokio::spawn(async move { - while let Ok(event) = receiver.recv().await { - let decoding_result = serde_json::from_str::(&event); - match decoding_result { - Ok(event) => { - let halt_option = behavior.process(event).await; - if halt_option.is_some() { - break; - } - } - Err(_) => match serde_json::from_str::(&event) { - Ok(_) => { - warn!("Behavior received `MachineHalt` message. Breaking!"); - break; - } - Err(_) => { - trace!( - "Event received by behavior that could not be deserialized." - ); - continue; - } - }, + while let Some(event) = stream.next().await { + let halt_option = behavior.process(event).await; + if halt_option.is_some() { + break; } } behavior }); + // TODO: This could be removed as we probably don't need to have the behavior + // stored once its done. self.behavior = Some(behavior_task.await.unwrap()); } - MachineInstruction::Stop => { - unreachable!("This is never explicitly called on an engine.") - } } } } diff --git a/arbiter-engine/src/messager.rs b/arbiter-engine/src/messager.rs index 041b7ccd6..15d95687d 100644 --- a/arbiter-engine/src/messager.rs +++ b/arbiter-engine/src/messager.rs @@ -1,5 +1,9 @@ //! The messager module contains the core messager layer for the Arbiter Engine. +// TODO: Allow for modulating the capacity of the messager. +// TODO: It might be nice to have some kind of messaging header so that we can +// pipe messages to agents and pipe messages across worlds. + use futures_util::Stream; use tokio::sync::broadcast::{channel, Receiver, Sender}; @@ -51,9 +55,6 @@ impl Clone for Messager { } impl Messager { - // TODO: Allow for modulating the capacity of the messager. - // TODO: It might be nice to have some kind of messaging header so that we can - // pipe messages to agents and pipe messages across worlds. /// Creates a new messager with the given capacity. #[allow(clippy::new_without_default)] pub fn new() -> Self { @@ -65,8 +66,8 @@ impl Messager { } } - // TODO: Okay if we do something kinda like this, then agents don't even need to - // filter the `to` field or set the `from` field. Let's give this a shot! + /// Returns a [`Messager`] interface connected to the same instance but with + /// the `id` provided. pub(crate) fn for_agent(&self, id: &str) -> Self { Self { broadcast_sender: self.broadcast_sender.clone(), @@ -75,6 +76,27 @@ impl Messager { } } + /// utility function for getting the next value from the broadcast_receiver + /// without streaming + pub async fn get_next(&mut self) -> Message { + while let Ok(message) = self.broadcast_receiver.as_mut().unwrap().recv().await { + match &message.to { + To::All => { + return message; + } + To::Agent(id) => { + if let Some(self_id) = &self.id { + if id == self_id { + return message; + } + } + continue; + } + } + } + unreachable!() + } + /// Returns a stream of messages that are either sent to [`To::All`] or to /// the agent via [`To::Agent(id)`]. pub fn stream(mut self) -> impl Stream + Send { @@ -97,16 +119,6 @@ impl Messager { } } - /// Returns a [`Messager`] interface connected to the same instance but with - /// the `id` provided. - pub fn join_with_id(&self, id: Option) -> Messager { - Messager { - broadcast_sender: self.broadcast_sender.clone(), - broadcast_receiver: Some(self.broadcast_sender.subscribe()), - id, - } - } - /// Sends a message to the messager. pub async fn send(&self, message: Message) { trace!("Sending message via messager."); diff --git a/arbiter-engine/src/world.rs b/arbiter-engine/src/world.rs index 548bb6325..005b5b616 100644 --- a/arbiter-engine/src/world.rs +++ b/arbiter-engine/src/world.rs @@ -15,51 +15,26 @@ //! The world module contains the core world abstraction for the Arbiter Engine. -use arbiter_core::{ - environment::{Environment, EnvironmentBuilder}, - middleware::RevmMiddleware, -}; +use std::collections::VecDeque; + +use arbiter_core::{environment::Environment, middleware::RevmMiddleware}; use futures_util::future::join_all; -use tokio::sync::broadcast::Sender as BroadcastSender; -use tracing::info; +use tokio::spawn; -use self::{ - agent::AgentBuilder, - machine::{MachineHalt, MachineInstruction}, -}; +use self::{agent::AgentBuilder, machine::MachineInstruction}; use super::*; -use crate::{ - agent::Agent, - machine::{State, StateMachine}, - messager::Messager, -}; +use crate::{agent::Agent, machine::State, messager::Messager}; /// A world is a collection of agents that use the same type of provider, e.g., /// operate on the same blockchain or same `Environment`. The world is /// responsible for managing the agents and their state transitions. /// /// # How it works -/// The [`World`] works by implementing the [`StateMachine`] trait. When the -/// [`World`] is asked to enter into a new state, it will ask each [`Agent`] it -/// owns to run that state transition by calling [`StateMachine::run_state`]. -/// All of the [`Agent`]s at once will then be able to be asked to block and -/// wait to finish their state transition by calling -/// [`StateMachine::transition`]. Ultimately, the [`World`] will transition -/// through the following states: -/// 1. [`State::Uninitialized`]: The [`World`] has been created, but has not -/// been started. -/// 2. [`State::Syncing`]: The [`World`] is syncing with the agents. This is -/// where the [`World`] can be brought up to date with the latest state of the -/// agents. This could be used if the world was stopped and later restarted. -/// 3. [`State::Startup`]: The [`World`] is starting up. This is where the -/// [`World`] can be initialized and setup. -/// 4. [`State::Processing`]: The [`World`] is processing. This is where the -/// [`World`] can process events and produce actions. The [`State::Processing`] -/// stage may run for a long time before all [`World`]s are finished processing. -/// This is the main stage of the [`World`] that predominantly runs automation. -/// 5. [`State::Stopped`]: The [`World`] is stopped. This is where the [`World`] -/// can be stopped and state of the [`World`] and its [`Agent`]s can be -/// offloaded and saved. +/// The [`World`] holds on to a collection of [`Agent`]s and can run them all +/// concurrently when the [`run`] method is called. The [`World`] takes in +/// [`AgentBuilder`]s and when it does so, it creates [`Agent`]s that are now +/// connected to the world via a client ([`Arc`]) and a messager +/// ([`Messager`]). pub struct World { /// The identifier of the world. pub id: String, @@ -70,8 +45,6 @@ pub struct World { /// The agents in the world. pub agents: Option>, - agent_distributors: Option>>, - /// The environment for the world. pub environment: Environment, @@ -86,142 +59,47 @@ impl World { id: id.to_owned(), state: State::Uninitialized, agents: Some(HashMap::new()), - agent_distributors: None, environment: Environment::builder().build(), messager: Messager::new(), } } - /// Creates a new [World] with the given identifier and provider. - pub fn new_with_env(id: &str, environment: Environment) -> Self { - Self { - id: id.to_owned(), - agents: Some(HashMap::new()), - state: State::Uninitialized, - agent_distributors: None, - environment, - messager: Messager::new(), - } - } - /// Adds an agent to the world. pub fn add_agent(&mut self, agent_builder: AgentBuilder) { let id = agent_builder.id.clone(); - let messager = self.messager.for_agent(&id); let client = RevmMiddleware::new(&self.environment, Some(&id)).unwrap(); - let agent = agent_builder.build(messager, client).unwrap(); + let messager = self.messager.for_agent(&id); + let agent = agent_builder.build(client, messager).unwrap(); let agents = self.agents.as_mut().unwrap(); agents.insert(id.to_owned(), agent); } /// Runs the world through up to the [`State::Processing`] stage. - pub async fn run(&mut self) { - self.execute(MachineInstruction::Sync(None, None)).await; - self.execute(MachineInstruction::Start).await; - self.execute(MachineInstruction::Process).await; - } - - /// Stops the world by stopping all the behaviors that each of the agents is - /// running. - pub async fn stop(&mut self) { - self.execute(MachineInstruction::Stop).await; - } -} - -// TODO: Idea, when we enter the `State::Processing`, we should pass the task -// into the struct. When we call `MachineInstruction::Stop` we should do message -// passing that will kill the tasks so that they return. This will allow us to -// do graceful shutdowns. - -// TODO: Worth explaining how the process stage is offloaded so it is -// understandable. - -// Right now what we do is we send a HALT message via the agent's distributor -// which means all behaviors should receive this now. If those behaviors all see -// this HALT message and then exit their process, then the await should finish. -// Actually we can probably not have to get the distributors up this high, but -// let's work with this for now. - -#[async_trait::async_trait] -impl StateMachine for World { - async fn execute(&mut self, instruction: MachineInstruction) { - match instruction { - MachineInstruction::Sync(_, _) => { - info!("World is syncing."); - self.state = State::Syncing; - let agents = self.agents.take().unwrap(); - let agent_tasks = join_all(agents.into_values().map(|mut agent| { - let instruction_clone = instruction.clone(); - tokio::spawn(async move { - agent.execute(instruction_clone).await; - agent - }) - })); - self.agents = Some( - agent_tasks - .await - .into_iter() - .map(|res| { - let agent = res.unwrap(); - (agent.id.clone(), agent) - }) - .collect::>(), - ); + pub async fn run(&mut self) -> Result<()> { + let mut tasks = vec![]; + let agents = self + .agents + .take() + .ok_or_else(|| anyhow!("No agents found! Has the world already been run?"))?; + let mut messagers = VecDeque::new(); + for (_, agent) in agents.iter() { + for _ in &agent.behavior_engines { + messagers.push_back(agent.messager.clone()); } - MachineInstruction::Start => { - info!("World is starting up."); - self.state = State::Starting; - let agents = self.agents.take().unwrap(); - let agent_tasks = join_all(agents.into_values().map(|mut agent| { - let instruction_clone = instruction.clone(); - tokio::spawn(async move { - agent.execute(instruction_clone).await; - agent - }) - })); - self.agents = Some( - agent_tasks + } + for (_, mut agent) in agents { + for mut engine in agent.behavior_engines.drain(..) { + let client = agent.client.clone(); + let messager = messagers.pop_front().unwrap(); + tasks.push(spawn(async move { + engine + .execute(MachineInstruction::Start(client, messager)) .await - .into_iter() - .map(|res| { - let agent = res.unwrap(); - (agent.id.clone(), agent) - }) - .collect::>(), - ); - } - MachineInstruction::Process => { - info!("World is processing."); - self.state = State::Processing; - let agents = self.agents.take().unwrap(); - let mut agent_distributors = vec![]; - let agent_processors = join_all(agents.into_values().map(|mut agent| { - agent_distributors.push(agent.distributor.0.clone()); - let instruction_clone = instruction.clone(); - tokio::spawn(async move { - agent.execute(instruction_clone).await; - agent - }) })); - self.agent_distributors = Some(agent_distributors); - self.agents = Some( - agent_processors - .await - .into_iter() - .map(|res| { - let agent = res.unwrap(); - (agent.id.clone(), agent) - }) - .collect::>(), - ); - } - MachineInstruction::Stop => { - let halt = serde_json::to_string(&MachineHalt).unwrap(); - for tx in self.agent_distributors.take().unwrap() { - tx.send(halt.clone()).unwrap(); - } } } + join_all(tasks).await; + Ok(()) } }