Skip to content

Commit

Permalink
Merge pull request #801 from primitivefinance/arbiter-engine/world-ag…
Browse files Browse the repository at this point in the history
…ent-behaviors

feat(arbiter-core / arbiter-engine): broadcasters, halting, test sims
  • Loading branch information
0xJepsen authored Jan 22, 2024
2 parents 3041530 + a6744b8 commit e36f75c
Show file tree
Hide file tree
Showing 14 changed files with 1,152 additions and 951 deletions.
189 changes: 70 additions & 119 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ path = "bin/main.rs"
[workspace.dependencies]
arbiter-bindings = { version = "*", path = "./arbiter-bindings" }
arbiter-core = { version = "*", path = "./arbiter-core" }
ethers = { version = "2.0.10" }
ethers = { version = "2.0.11" }
serde = { version = "1.0.193", features = ["derive"] }
serde_json = { version = "=1.0.108" }
revm = { git = "https://github.com/bluealloy/revm.git", rev = "30bbcdf", features = [ "ethersdb", "std", "serde"] }
revm-primitives = { git = "https://github.com/bluealloy/revm.git", rev = "30bbcdf" }
revm = { git = "https://github.com/bluealloy/revm.git", features = [ "ethersdb", "std", "serde"], rev = "30bbcdfe81446c9d1e9b37acc95f208943ddf858" }
revm-primitives = { git = "https://github.com/bluealloy/revm.git", rev = "30bbcdfe81446c9d1e9b37acc95f208943ddf858" }
thiserror = { version = "1.0.55" }
syn = { version = "2.0.43" }
quote = { version = "=1.0.33" }
Expand All @@ -44,7 +44,7 @@ async-stream = "0.3.5"
arbiter-core.workspace = true

# Command line and config
clap = { version = "=4.4.18", features = ["derive"] }
clap = { version = "=4.4.14", features = ["derive"] }
serde.workspace = true
serde_json.workspace = true
config = { version = "=0.13.4" }
Expand All @@ -65,8 +65,8 @@ thiserror.workspace = true
# Dependencies for the test build and development
[dev-dependencies]
tokio.workspace = true
assert_cmd = { version = "=2.0.13" }
rayon = { version = "1.8.1" }
assert_cmd = { version = "=2.0.12" }
rayon = { version = "1.8.0" }
revm-primitives.workspace = true

# Release profile
Expand Down
74 changes: 33 additions & 41 deletions arbiter-core/src/data_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::{
collections::BTreeMap, fmt::Debug, io::BufWriter, marker::PhantomData, mem::transmute,
pin::Pin, sync::Arc,
sync::Arc,
};

use ethers::{
Expand All @@ -38,6 +38,7 @@ use polars::{
};
use serde::Serialize;
use serde_json::Value;
use tokio::{sync::broadcast::Receiver as BroadcastReceiver, task::JoinHandle};

use super::*;
use crate::{
Expand All @@ -63,8 +64,8 @@ pub(crate) type FilterDecoder =
/// traits, and has a static lifetime.
pub struct EventLogger {
decoder: FilterDecoder,
receiver: Option<crossbeam_channel::Receiver<Broadcast>>,
shutdown_sender: Option<crossbeam_channel::Sender<()>>,
receiver: Option<BroadcastReceiver<Broadcast>>,
// shutdown_sender: Option<crossbeam_channel::Sender<()>>,
output_file_type: Option<OutputFileType>,
directory: Option<String>,
file_name: Option<String>,
Expand Down Expand Up @@ -103,7 +104,7 @@ impl EventLogger {
file_name: None,
decoder: BTreeMap::new(),
receiver: None,
shutdown_sender: None,
// shutdown_sender: None,
output_file_type: None,
metadata: None,
}
Expand Down Expand Up @@ -138,15 +139,7 @@ impl EventLogger {
);
let connection = middleware.provider().as_ref();
if self.receiver.is_none() {
let (event_sender, event_receiver) = crossbeam_channel::unbounded::<Broadcast>();
let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded::<()>(1);
connection
.event_broadcaster
.lock()
.unwrap()
.add_sender(event_sender, Some(shutdown_receiver));
self.receiver = Some(event_receiver);
self.shutdown_sender = Some(shutdown_sender);
self.receiver = Some(connection.event_sender.subscribe());
}
debug!("`EventLogger` now provided with event labeled: {:?}", name);
self
Expand Down Expand Up @@ -253,15 +246,15 @@ impl EventLogger {
///
/// This function will return an error if there is a problem creating the
/// directories or files, or writing to the files.
pub fn run(self) -> Result<(), RevmMiddlewareError> {
let receiver = self.receiver.unwrap();
pub fn run(self) -> Result<JoinHandle<()>, RevmMiddlewareError> {
let mut receiver = self.receiver.unwrap();
let dir = self.directory.unwrap_or("./data".into());
let file_name = self.file_name.unwrap_or("output".into());
let file_type = self.output_file_type.unwrap_or(OutputFileType::JSON);
let metadata = self.metadata.clone();
std::thread::spawn(move || {
let task = tokio::spawn(async move {
let mut events: BTreeMap<String, BTreeMap<String, Vec<Value>>> = BTreeMap::new();
while let Ok(broadcast) = receiver.recv() {
while let Ok(broadcast) = receiver.recv().await {
match broadcast {
Broadcast::StopSignal => {
debug!("`EventLogger` has seen a stop signal");
Expand All @@ -288,7 +281,6 @@ impl EventLogger {
}
let data = OutputData { events, metadata };
serde_json::to_writer(writer, &data).expect("Unable to write data");
self.shutdown_sender.unwrap().send(()).unwrap();
}
OutputFileType::CSV => {
// Write the DataFrame to a CSV file
Expand All @@ -301,7 +293,6 @@ impl EventLogger {
writer.finish(&mut df).unwrap_or_else(|_| {
panic!("Error writing to csv file");
});
self.shutdown_sender.unwrap().send(()).unwrap();
}
OutputFileType::Parquet => {
// Write the DataFrame to a parquet file
Expand All @@ -314,7 +305,6 @@ impl EventLogger {
writer.finish(&mut df).unwrap_or_else(|_| {
panic!("Error writing to parquet file");
});
self.shutdown_sender.unwrap().send(()).unwrap();
}
}
break;
Expand Down Expand Up @@ -361,36 +351,38 @@ impl EventLogger {
}
}
});
Ok(())
Ok(task)
}

/// Returns a stream of the serialized events.
pub fn stream(self) -> Pin<Box<dyn Stream<Item = String> + Send + 'static>> {
let receiver = self.receiver.clone().unwrap();

let stream = async_stream::stream! {
while let Ok(broadcast) = receiver.recv() {
match broadcast {
Broadcast::StopSignal => {
trace!("`EventLogger` has seen a stop signal");
break;
}
Broadcast::Event(event) => {
trace!("`EventLogger` received an event");
let ethers_logs = revm_logs_to_ethers_logs(event);
for log in ethers_logs {
for (_id, (filter, decoder)) in self.decoder.iter() {
if filter.filter_address(&log) && filter.filter_topics(&log) {
yield decoder(&log.clone().into());
pub fn stream(mut self) -> Option<impl Stream<Item = String> + Send> {
if let Some(mut receiver) = self.receiver.take() {
let stream = async_stream::stream! {
while let Ok(broadcast) = receiver.recv().await {
match broadcast {
Broadcast::StopSignal => {
trace!("`EventLogger` has seen a stop signal");
break;
}
Broadcast::Event(event) => {
trace!("`EventLogger` received an event");
let ethers_logs = revm_logs_to_ethers_logs(event);
for log in ethers_logs {
for (_id, (filter, decoder)) in self.decoder.iter() {
if filter.filter_address(&log) && filter.filter_topics(&log) {
yield decoder(&log.clone().into());
}
}
}
}
}
}
}
};
};

Box::pin(stream)
Some(stream)
} else {
None
}
}
}

Expand Down
87 changes: 19 additions & 68 deletions arbiter-core/src/environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use revm::{
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::broadcast::{channel, Sender as BroadcastSender};

use super::*;
#[cfg_attr(doc, doc(hidden))]
Expand Down Expand Up @@ -84,14 +85,6 @@ pub(crate) type OutcomeSender = Sender<Result<Outcome, EnvironmentError>>;
/// emitted from transactions.
pub(crate) type OutcomeReceiver = Receiver<Result<Outcome, EnvironmentError>>;

/// Alias for the sender used in the [`EventBroadcaster`] that transmits
/// contract events via [`Log`].
pub(crate) type EventSender = Sender<Broadcast>;

/// Alias for the receiver used in the [`EventBroadcaster`] that accepts
/// shutdown signals from child processes.
pub(crate) type ShutDownReceiver = Receiver<()>;

/// Represents a sandboxed EVM environment.
///
/// ## Communication
Expand Down Expand Up @@ -162,10 +155,11 @@ impl Environment {
db: Option<CacheDB<EmptyDB>>,
) -> Self {
let (instruction_sender, instruction_receiver) = unbounded();
let (event_broadcaster, _) = channel(512);
let socket = Socket {
instruction_sender: Arc::new(instruction_sender),
instruction_receiver,
event_broadcaster: Arc::new(Mutex::new(EventBroadcaster::new())),
event_broadcaster,
};
let db = db.map(|db| ArbiterDB(Arc::new(RwLock::new(db))));

Expand Down Expand Up @@ -540,15 +534,19 @@ impl Environment {

// update transaction count for sender

let event_broadcaster = event_broadcaster
.lock()
.map_err(|e| EnvironmentError::Communication(e.to_string()))?;
let receipt_data = ReceiptData {
block_number,
transaction_index: transaction_index.into(),
cumulative_gas_per_block,
};
event_broadcaster.broadcast(Some(execution_result.logs()), false)?;
match event_broadcaster.send(Broadcast::Event(execution_result.logs())) {
Ok(_) => {}
Err(_) => {
warn!(
"Event was not sent to any listeners. Are there any listeners?"
)
}
}
outcome_sender
.send(Ok(Outcome::TransactionCompleted(
execution_result,
Expand Down Expand Up @@ -649,10 +647,15 @@ impl Environment {
.map_err(|e| EnvironmentError::Communication(e.to_string()))?;
}
Instruction::Stop(outcome_sender) => {
match event_broadcaster.send(Broadcast::StopSignal) {
Ok(_) => {}
Err(_) => {
warn!("Stop signal was not sent to any listeners. Are there any listeners?")
}
}
outcome_sender
.send(Ok(Outcome::StopCompleted(evm.db.unwrap())))
.map_err(|e| EnvironmentError::Communication(e.to_string()))?;
event_broadcaster.lock().unwrap().broadcast(None, true)?;
break;
}
}
Expand Down Expand Up @@ -718,7 +721,7 @@ impl Environment {
pub(crate) struct Socket {
pub(crate) instruction_sender: Arc<InstructionSender>,
pub(crate) instruction_receiver: InstructionReceiver,
pub(crate) event_broadcaster: Arc<Mutex<EventBroadcaster>>,
pub(crate) event_broadcaster: BroadcastSender<Broadcast>,
}

/// Enum representing the types of broadcasts that can be sent.
Expand All @@ -729,66 +732,14 @@ pub(crate) struct Socket {
/// Variants:
/// * `StopSignal`: Represents a signal to stop the event logger process.
/// * `Event(Vec<Log>)`: Represents a broadcast of a vector of Ethereum logs.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug)]
pub enum Broadcast {
/// Represents a signal to stop the event logger process.
StopSignal,
/// Represents a broadcast of a vector of Ethereum logs.
Event(Vec<Log>),
}

/// Responsible for broadcasting Ethereum logs to subscribers.
///
/// Maintains a list of senders to which logs are sent whenever they are
/// produced by the EVM.
#[derive(Clone, Debug)]
pub(crate) struct EventBroadcaster(Vec<(EventSender, Option<ShutDownReceiver>)>);

impl EventBroadcaster {
/// Called only when creating a new [`Environment`]
fn new() -> Self {
Self(vec![])
}

/// Called from [`RevmMiddleware`] implementation when setting up a new
/// `FilterWatcher` as each watcher will need their own sender
pub(crate) fn add_sender(
&mut self,
sender: EventSender,
shutdown_receiver: Option<ShutDownReceiver>,
) {
debug!("Sender added for `EventBroadcaster`");
self.0.push((sender, shutdown_receiver));
}

/// Loop through each sender and send `Vec<Log>` emitted from a transaction
/// downstream to any and all receivers
fn broadcast(&self, logs: Option<Vec<Log>>, stop_signal: bool) -> Result<(), EnvironmentError> {
if stop_signal {
for (sender, receiver) in &self.0 {
sender.send(Broadcast::StopSignal)?;
debug!("Broadcasted stop signal to listener");
if let Some(receiver) = receiver {
receiver
.recv()
.map_err(|_| EnvironmentError::ShutDownReceiverError)?;
debug!("Blocked on shutdown receiver signal");
}
}
return Ok(());
} else {
if logs.is_none() {
unreachable!();
}
for (sender, _) in &self.0 {
sender.send(Broadcast::Event(logs.clone().unwrap()))?;
trace!("Broadcasting event to all listeners")
}
}
Ok(())
}
}

/// Convert a U256 to a U64, discarding the higher bits if the number is larger
/// than 2^64 # Arguments
/// * `input` - The U256 to convert.
Expand Down
Loading

0 comments on commit e36f75c

Please sign in to comment.