Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(arbiter-engine): Run method for agents and messager echo example #746

Merged
merged 7 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ arbiter-core = { path = "./arbiter-core" }
crossbeam-channel = { version = "=0.5.8" }
futures-util = { version = "=0.3.29" }
async-trait = { version = "0.1.74" }
tracing = "0.1.40"

# Dependencies for the release build
[dependencies]
Expand Down
2 changes: 0 additions & 2 deletions arbiter-bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,4 @@ license = "Apache-2.0"

[dependencies]
ethers.workspace = true
revm.workspace = true
revm-primitives.workspace = true
serde.workspace = true
2 changes: 1 addition & 1 deletion arbiter-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ thiserror.workspace = true

# Logging
futures-util.workspace = true
tracing = "0.1.40"
tracing.workspace = true

# File types
polars = { version = "0.35.4", features = ["parquet", "csv", "json"] }
Expand Down
1 change: 0 additions & 1 deletion arbiter-core/src/environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use revm::{
},
EVM,
};
// use hashbrown::{hash_map, HashMap as HashMapBrown};
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand Down
5 changes: 0 additions & 5 deletions arbiter-core/src/middleware/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ use futures_timer::Delay;
use rand::{rngs::StdRng, SeedableRng};
use revm::primitives::{CreateScheme, Output, TransactTo, TxEnv, U256};
use serde::{de::DeserializeOwned, Serialize};
// use revm::primitives::{ExecutionResult, Output};
// use super::cast::revm_logs_to_ethers_logs;
// use super::errors::RevmMiddlewareError;

// use async_trait::async_trait;
use thiserror::Error;

use super::*;
Expand Down
8 changes: 6 additions & 2 deletions arbiter-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ readme = "../README.md"
ethers.workspace = true
arbiter-core.workspace = true
arbiter-bindings = { path = "../arbiter-bindings" }
artemis-core = { git = "https://github.com/paradigmxyz/artemis.git"}
artemis-core = { git = "https://github.com/paradigmxyz/artemis.git" }
crossbeam-channel.workspace = true
futures-util.workspace = true
async-trait.workspace = true
serde_json.workspace = true
serde.workspace = true
tokio.workspace = true
anyhow = { version = "=1.0.75" }
async-stream = "0.3.5"
tracing.workspace = true
flume = "0.11.0"

[dev-dependencies]
tracing-subscriber = "0.3.18"
35 changes: 25 additions & 10 deletions arbiter-engine/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use artemis_core::{
engine::Engine,
types::{Collector, Executor},
types::{Collector, Executor, Strategy},
};

/// An agent is an entity capable of processing events and producing actions.
Expand All @@ -22,17 +22,18 @@ use artemis_core::{
pub struct Agent<E, A> {
/// Identifier for this agent.
/// Used for routing messages.
_id: String,
pub id: String,

/// The engine that this agent uses to process events and produce actions.
engine: Engine<E, A>, /* Note, agent shouldn't NEED a client as a field as the engine can
* handle this. */
pub(crate) engine: Option<Engine<E, A>>, /* Note, agent shouldn't NEED a client as a field
* as the engine can
* handle this. */

/// Agents that this agent depends on.
dependencies: Vec<String>,
pub dependencies: Vec<String>,

/// Agents that depend on this agent.
dependents: Vec<String>,
pub dependents: Vec<String>,
}

impl<E, A> Agent<E, A>
Expand All @@ -44,21 +45,35 @@ where
/// Produces a new agent with the given identifier.
pub fn new(id: &str) -> Self {
Self {
_id: id.to_owned(),
engine: Engine::new(),
id: id.to_owned(),
engine: Some(Engine::new()),
dependencies: vec![],
dependents: vec![],
}
}

/// Adds a collector to the agent's engine.
pub fn add_collector(&mut self, collector: impl Collector<E> + 'static) {
self.engine.add_collector(Box::new(collector));
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_collector(Box::new(collector));
}

/// Adds an executor to the agent's engine.
pub fn add_executor(&mut self, executor: impl Executor<A> + 'static) {
self.engine.add_executor(Box::new(executor));
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_executor(Box::new(executor));
}

/// Adds a strategy to the agent's engine.
pub fn add_strategy(&mut self, strategy: impl Strategy<E, A> + 'static) {
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_strategy(Box::new(strategy));
}

/// Adds a dependency to the agent.
Expand Down
86 changes: 86 additions & 0 deletions arbiter-engine/src/examples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,89 @@ impl Strategy<Message, SubmitTxToMempool> for TokenAdmin {
}
}
}

#[cfg(test)]
mod tests {

use arbiter_core::{
environment::builder::EnvironmentBuilder, middleware::connection::Connection,
};
use ethers::providers::Provider;

use super::*;
use crate::{agent::Agent, messager::Messager, world::World};

struct TimedMessage {
delay: u64,
message: Message,
}

#[async_trait::async_trait]
impl Strategy<Message, Message> for TimedMessage {
#[tracing::instrument(skip(self), level = "trace")]
async fn sync_state(&mut self) -> Result<()> {
trace!("Syncing state.");
Ok(())
}

#[tracing::instrument(skip(self, event), level = "trace")]
async fn process_event(&mut self, event: Message) -> Vec<Message> {
trace!("Processing event.");
if event.to == self.message.to {
let message = Message {
from: "agent1".to_owned(),
to: "agent1".to_owned(),
data: "Hello, world!".to_owned(),
};
if event.data == "Start" {
vec![message]
} else {
tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await;
vec![message]
}
} else {
vec![]
}
}
}

#[tokio::test]
async fn base_simulation() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE) // Set the maximum level to TRACE
.finish();

let _guard = tracing::subscriber::set_default(subscriber);
let environment = EnvironmentBuilder::new().build();
let connection = Connection::from(&environment);
let provider = Provider::new(connection);
let mut world = World::new("test_world", provider);

let mut agent = Agent::new("agent1");
let messager = Messager::new();
agent.add_collector(messager.clone());
agent.add_executor(messager.clone());

let strategy = TimedMessage {
delay: 1,
message: Message {
from: "agent1".to_owned(),
to: "agent1".to_owned(),
data: "Hello, world!".to_owned(),
},
};
agent.add_strategy(strategy);

world.add_agent(agent);
let world_task = tokio::spawn(async move { world.run().await });

let message = Message {
from: "agent1".to_owned(),
to: "agent1".to_owned(),
data: "Start".to_owned(),
};
let send_result = messager.execute(message).await;

world_task.await.unwrap();
}
}
1 change: 1 addition & 0 deletions arbiter-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use anyhow::Result;
use serde::{Deserialize, Serialize};
use tracing::{debug, trace, warn};

pub mod agent;
pub mod examples;
Expand Down
33 changes: 14 additions & 19 deletions arbiter-engine/src/messager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! The messager module contains the core messager layer for the Arbiter Engine.

use artemis_core::types::{Collector, CollectorStream, Executor};
use tokio::sync::broadcast::Sender;
use flume::{unbounded, Receiver, Sender};

use super::*;

Expand All @@ -22,42 +22,37 @@ pub struct Message {
}

/// A messager that can be used to send messages between agents.
#[derive(Clone, Debug)]
pub struct Messager {
broadcaster: Sender<Message>,
sender: Sender<Message>,
receiver: Receiver<Message>,
}

impl Messager {
/// Creates a new messager with the given capacity.
pub fn new(capacity: usize) -> Self {
Self {
broadcaster: Sender::new(capacity),
}
}
}

impl Default for Messager {
fn default() -> Self {
Self::new(32)
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let (sender, receiver) = unbounded();
Self { sender, receiver }
}
}

#[async_trait::async_trait]
impl Collector<Message> for Messager {
#[tracing::instrument(skip(self), level = "debug", target = "messager")]
async fn get_event_stream(&self) -> Result<CollectorStream<'_, Message>> {
let mut subscription = self.broadcaster.subscribe();
let stream = async_stream::stream! {
while let Ok(message) = subscription.recv().await {
yield message;
}
};
debug!("Getting the event stream for the messager.");
let stream = self.receiver.clone().into_stream();
Ok(Box::pin(stream))
}
}

#[async_trait::async_trait]
impl Executor<Message> for Messager {
#[tracing::instrument(skip(self), level = "trace", target = "messager")]
async fn execute(&self, message: Message) -> Result<()> {
let _buf_len = self.broadcaster.send(message)?;
trace!("Broadcasting message.");
self.sender.send(message)?;
Ok(())
}
}
Loading
Loading