Skip to content

Commit

Permalink
Merge pull request #746 from primitivefinance/arbiter-engine/run-agents
Browse files Browse the repository at this point in the history
feat(arbiter-engine): Run method for agents and messager echo example
  • Loading branch information
0xJepsen authored Dec 13, 2023
2 parents 4178180 + 6015ac2 commit 83a8123
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 44 deletions.
30 changes: 27 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ arbiter-core = { path = "./arbiter-core" }
crossbeam-channel = { version = "=0.5.8" }
futures-util = { version = "=0.3.29" }
async-trait = { version = "0.1.74" }
tracing = "0.1.40"

# Dependencies for the release build
[dependencies]
Expand Down
2 changes: 0 additions & 2 deletions arbiter-bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,4 @@ license = "Apache-2.0"

[dependencies]
ethers.workspace = true
revm.workspace = true
revm-primitives.workspace = true
serde.workspace = true
2 changes: 1 addition & 1 deletion arbiter-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ thiserror.workspace = true

# Logging
futures-util.workspace = true
tracing = "0.1.40"
tracing.workspace = true

# File types
polars = { version = "0.35.4", features = ["parquet", "csv", "json"] }
Expand Down
1 change: 0 additions & 1 deletion arbiter-core/src/environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use revm::{
},
EVM,
};
// use hashbrown::{hash_map, HashMap as HashMapBrown};
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand Down
5 changes: 0 additions & 5 deletions arbiter-core/src/middleware/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ use futures_timer::Delay;
use rand::{rngs::StdRng, SeedableRng};
use revm::primitives::{CreateScheme, Output, TransactTo, TxEnv, U256};
use serde::{de::DeserializeOwned, Serialize};
// use revm::primitives::{ExecutionResult, Output};
// use super::cast::revm_logs_to_ethers_logs;
// use super::errors::RevmMiddlewareError;

// use async_trait::async_trait;
use thiserror::Error;

use super::*;
Expand Down
8 changes: 6 additions & 2 deletions arbiter-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ readme = "../README.md"
ethers.workspace = true
arbiter-core.workspace = true
arbiter-bindings = { path = "../arbiter-bindings" }
artemis-core = { git = "https://github.com/paradigmxyz/artemis.git"}
artemis-core = { git = "https://github.com/paradigmxyz/artemis.git" }
crossbeam-channel.workspace = true
futures-util.workspace = true
async-trait.workspace = true
serde_json.workspace = true
serde.workspace = true
tokio.workspace = true
anyhow = { version = "=1.0.75" }
async-stream = "0.3.5"
tracing.workspace = true
flume = "0.11.0"

[dev-dependencies]
tracing-subscriber = "0.3.18"
36 changes: 26 additions & 10 deletions arbiter-engine/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use artemis_core::{
engine::Engine,
types::{Collector, Executor},
types::{Collector, Executor, Strategy},
};

/// An agent is an entity capable of processing events and producing actions.
Expand All @@ -22,17 +22,18 @@ use artemis_core::{
pub struct Agent<E, A> {
/// Identifier for this agent.
/// Used for routing messages.
_id: String,
pub id: String,

/// The engine that this agent uses to process events and produce actions.
engine: Engine<E, A>, /* Note, agent shouldn't NEED a client as a field as the engine can
* handle this. */
pub(crate) engine: Option<Engine<E, A>>, /* Note, agent shouldn't NEED a client as a field
* as the engine can
* handle this. */

/// Agents that this agent depends on.
dependencies: Vec<String>,
pub dependencies: Vec<String>,

/// Agents that depend on this agent.
dependents: Vec<String>,
pub dependents: Vec<String>,
}

impl<E, A> Agent<E, A>
Expand All @@ -44,21 +45,35 @@ where
/// Produces a new agent with the given identifier.
pub fn new(id: &str) -> Self {
Self {
_id: id.to_owned(),
engine: Engine::new(),
id: id.to_owned(),
engine: Some(Engine::new()),
dependencies: vec![],
dependents: vec![],
}
}

/// Adds a collector to the agent's engine.
pub fn add_collector(&mut self, collector: impl Collector<E> + 'static) {
self.engine.add_collector(Box::new(collector));
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_collector(Box::new(collector));
}

/// Adds an executor to the agent's engine.
pub fn add_executor(&mut self, executor: impl Executor<A> + 'static) {
self.engine.add_executor(Box::new(executor));
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_executor(Box::new(executor));
}

/// Adds a strategy to the agent's engine.
pub fn add_strategy(&mut self, strategy: impl Strategy<E, A> + 'static) {
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_strategy(Box::new(strategy));
}

/// Adds a dependency to the agent.
Expand Down Expand Up @@ -86,6 +101,7 @@ mod tests {

use super::*;

#[ignore]
#[tokio::test]
async fn test_agent() {
// Startup
Expand Down
87 changes: 87 additions & 0 deletions arbiter-engine/src/examples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,90 @@ impl Strategy<Message, SubmitTxToMempool> for TokenAdmin {
}
}
}

#[cfg(test)]
mod tests {

use arbiter_core::{
environment::builder::EnvironmentBuilder, middleware::connection::Connection,
};
use ethers::providers::Provider;

use super::*;
use crate::{agent::Agent, messager::Messager, world::World};

struct TimedMessage {
delay: u64,
message: Message,
}

#[async_trait::async_trait]
impl Strategy<Message, Message> for TimedMessage {
#[tracing::instrument(skip(self), level = "trace")]
async fn sync_state(&mut self) -> Result<()> {
trace!("Syncing state.");
Ok(())
}

#[tracing::instrument(skip(self, event), level = "trace")]
async fn process_event(&mut self, event: Message) -> Vec<Message> {
trace!("Processing event.");
if event.to == self.message.to {
let message = Message {
from: "agent1".to_owned(),
to: "agent1".to_owned(),
data: "Hello, world!".to_owned(),
};
if event.data == "Start" {
vec![message]
} else {
tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await;
vec![message]
}
} else {
vec![]
}
}
}

#[ignore]
#[tokio::test]
async fn base_simulation() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE) // Set the maximum level to TRACE
.finish();

let _guard = tracing::subscriber::set_default(subscriber);
let environment = EnvironmentBuilder::new().build();
let connection = Connection::from(&environment);
let provider = Provider::new(connection);
let mut world = World::new("test_world", provider);

let mut agent = Agent::new("agent1");
let messager = Messager::new();
agent.add_collector(messager.clone());
agent.add_executor(messager.clone());

let strategy = TimedMessage {
delay: 1,
message: Message {
from: "agent1".to_owned(),
to: "agent1".to_owned(),
data: "Hello, world!".to_owned(),
},
};
agent.add_strategy(strategy);

world.add_agent(agent);
let world_task = tokio::spawn(async move { world.run().await });

let message = Message {
from: "agent1".to_owned(),
to: "agent1".to_owned(),
data: "Start".to_owned(),
};
let send_result = messager.execute(message).await;

world_task.await.unwrap();
}
}
1 change: 1 addition & 0 deletions arbiter-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use anyhow::Result;
use serde::{Deserialize, Serialize};
use tracing::{debug, trace, warn};

pub mod agent;
pub mod examples;
Expand Down
33 changes: 14 additions & 19 deletions arbiter-engine/src/messager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! The messager module contains the core messager layer for the Arbiter Engine.
use artemis_core::types::{Collector, CollectorStream, Executor};
use tokio::sync::broadcast::Sender;
use flume::{unbounded, Receiver, Sender};

use super::*;

Expand All @@ -22,42 +22,37 @@ pub struct Message {
}

/// A messager that can be used to send messages between agents.
#[derive(Clone, Debug)]
pub struct Messager {
broadcaster: Sender<Message>,
sender: Sender<Message>,
receiver: Receiver<Message>,
}

impl Messager {
/// Creates a new messager with the given capacity.
pub fn new(capacity: usize) -> Self {
Self {
broadcaster: Sender::new(capacity),
}
}
}

impl Default for Messager {
fn default() -> Self {
Self::new(32)
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let (sender, receiver) = unbounded();
Self { sender, receiver }
}
}

#[async_trait::async_trait]
impl Collector<Message> for Messager {
#[tracing::instrument(skip(self), level = "debug", target = "messager")]
async fn get_event_stream(&self) -> Result<CollectorStream<'_, Message>> {
let mut subscription = self.broadcaster.subscribe();
let stream = async_stream::stream! {
while let Ok(message) = subscription.recv().await {
yield message;
}
};
debug!("Getting the event stream for the messager.");
let stream = self.receiver.clone().into_stream();
Ok(Box::pin(stream))
}
}

#[async_trait::async_trait]
impl Executor<Message> for Messager {
#[tracing::instrument(skip(self), level = "trace", target = "messager")]
async fn execute(&self, message: Message) -> Result<()> {
let _buf_len = self.broadcaster.send(message)?;
trace!("Broadcasting message.");
self.sender.send(message)?;
Ok(())
}
}
Loading

0 comments on commit 83a8123

Please sign in to comment.