Skip to content

Commit

Permalink
Merge pull request #820 from primitivefinance/engine/refactor-behaviors
Browse files Browse the repository at this point in the history
refactor(arbiter-engine): agent-behavior flow
  • Loading branch information
0xJepsen authored Jan 30, 2024
2 parents 4a7bb99 + d31821a commit 62769cf
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 139 deletions.
11 changes: 8 additions & 3 deletions arbiter-engine/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ impl Agent {
pub(crate) async fn run(&mut self, instruction: MachineInstruction) {
let behavior_engines = self.behavior_engines.take().unwrap();
let behavior_tasks = join_all(behavior_engines.into_iter().map(|mut engine| {
let instruction_clone = instruction.clone();
tokio::spawn(async move {
engine.execute(instruction).await;
engine.execute(instruction_clone).await;
engine
})
}));
Expand All @@ -164,10 +165,14 @@ impl StateMachine for Agent {
#[tracing::instrument(skip(self), fields(id = self.id))]
async fn execute(&mut self, instruction: MachineInstruction) {
match instruction {
MachineInstruction::Sync => {
MachineInstruction::Sync(_, _) => {
debug!("Agent is syncing.");
self.state = State::Syncing;
self.run(instruction).await;
self.run(MachineInstruction::Sync(
self.messager.clone(),
Some(self.client.clone()),
))
.await;
}
MachineInstruction::Start => {
debug!("Agent is starting up.");
Expand Down
108 changes: 34 additions & 74 deletions arbiter-engine/src/examples/timed_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,42 @@ struct TimedMessage {
delay: u64,
receive_data: String,
send_data: String,
messager: Messager,
messager: Option<Messager>,
count: u64,
max_count: Option<u64>,
}

impl TimedMessage {
pub fn new(
delay: u64,
receive_data: String,
send_data: String,
max_count: Option<u64>,
) -> Self {
Self {
delay,
receive_data,
send_data,
messager: None,
count: 0,
max_count,
}
}
}

#[async_trait::async_trait]
impl Behavior<Message> for TimedMessage {
async fn process(&mut self, event: Message) -> Option<MachineHalt> {
trace!("Processing event.");
let messager = self.messager.as_ref().unwrap();
if event.data == self.receive_data {
trace!("Event matches message. Sending a new message.");
let message = Message {
from: self.messager.id.clone().unwrap(),
from: messager.id.clone().unwrap(),
to: To::All,
data: self.send_data.clone(),
};
self.messager.send(message).await;
messager.send(message).await;
self.count += 1;
}
if self.count == self.max_count.unwrap_or(u64::MAX) {
Expand All @@ -48,8 +67,9 @@ impl Behavior<Message> for TimedMessage {
None
}

async fn sync(&mut self) {
async fn sync(&mut self, messager: Messager, _client: Arc<RevmMiddleware>) {
trace!("Syncing state for `TimedMessage`.");
self.messager = Some(messager);
tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await;
trace!("Synced state for `TimedMessage`.");
}
Expand All @@ -63,24 +83,15 @@ impl Behavior<Message> for TimedMessage {

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn echoer() {
// std::env::set_var("RUST_LOG", "trace");
// tracing_subscriber::fmt::init();

let mut world = World::new("world");

let agent = Agent::new(AGENT_ID, &world);
let behavior = TimedMessage {
delay: 1,
receive_data: "Hello, world!".to_owned(),
send_data: "Hello, world!".to_owned(),
messager: agent
.messager
.as_ref()
.unwrap()
.join_with_id(Some(AGENT_ID.to_owned())),
count: 0,
max_count: Some(2),
};
let behavior = TimedMessage::new(
1,
"Hello, world!".to_owned(),
"Hello, world!".to_owned(),
Some(2),
);
world.add_agent(agent.with_behavior(behavior));

let messager = world.messager.join_with_id(Some("god".to_owned()));
Expand Down Expand Up @@ -115,37 +126,11 @@ async fn echoer() {

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn ping_pong() {
// std::env::set_var("RUST_LOG", "trace");
// tracing_subscriber::fmt::init();

let mut world = World::new("world");

let agent = Agent::new(AGENT_ID, &world);
let behavior_ping = TimedMessage {
delay: 1,
receive_data: "pong".to_owned(),
send_data: "ping".to_owned(),
messager: agent
.messager
.as_ref()
.unwrap()
.join_with_id(Some(AGENT_ID.to_owned())),
count: 0,
max_count: Some(2),
};
let behavior_pong = TimedMessage {
delay: 1,
receive_data: "ping".to_owned(),
send_data: "pong".to_owned(),
messager: agent
.messager
.as_ref()
.unwrap()
.join_with_id(Some(AGENT_ID.to_owned())),
count: 0,
max_count: Some(2),
};

let behavior_ping = TimedMessage::new(1, "pong".to_owned(), "ping".to_owned(), Some(2));
let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2));
world.add_agent(
agent
.with_behavior(behavior_ping)
Expand Down Expand Up @@ -185,38 +170,13 @@ async fn ping_pong() {

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn ping_pong_two_agent() {
// std::env::set_var("RUST_LOG", "trace");
// tracing_subscriber::fmt::init();

let mut world = World::new("world");

let agent_ping = Agent::new("agent_ping", &world);
let behavior_ping = TimedMessage {
delay: 1,
receive_data: "pong".to_owned(),
send_data: "ping".to_owned(),
messager: agent_ping
.messager
.as_ref()
.unwrap()
.join_with_id(Some("agent_ping".to_owned())),
count: 0,
max_count: Some(2),
};
let behavior_ping = TimedMessage::new(1, "pong".to_owned(), "ping".to_owned(), Some(2));

let agent_pong = Agent::new("agent_pong", &world);
let behavior_pong = TimedMessage {
delay: 1,
receive_data: "ping".to_owned(),
send_data: "pong".to_owned(),
messager: agent_pong
.messager
.as_ref()
.unwrap()
.join_with_id(Some("agent_pong".to_owned())),
count: 0,
max_count: Some(2),
};
let behavior_pong = TimedMessage::new(1, "ping".to_owned(), "pong".to_owned(), Some(2));

world.add_agent(agent_ping.with_behavior(behavior_ping));
world.add_agent(agent_pong.with_behavior(behavior_pong));
Expand Down
Loading

0 comments on commit 62769cf

Please sign in to comment.