Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(arbiter-core / arbiter-engine): broadcasters, halting, test sims #801

Merged
merged 21 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 70 additions & 119 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ path = "bin/main.rs"
[workspace.dependencies]
arbiter-bindings = { version = "*", path = "./arbiter-bindings" }
arbiter-core = { version = "*", path = "./arbiter-core" }
ethers = { version = "2.0.10" }
ethers = { version = "2.0.11" }
serde = { version = "1.0.193", features = ["derive"] }
serde_json = { version = "=1.0.108" }
revm = { git = "https://github.com/bluealloy/revm.git", rev = "30bbcdf", features = [ "ethersdb", "std", "serde"] }
revm-primitives = { git = "https://github.com/bluealloy/revm.git", rev = "30bbcdf" }
revm = { git = "https://github.com/bluealloy/revm.git", features = [ "ethersdb", "std", "serde"], rev = "30bbcdfe81446c9d1e9b37acc95f208943ddf858" }
revm-primitives = { git = "https://github.com/bluealloy/revm.git", rev = "30bbcdfe81446c9d1e9b37acc95f208943ddf858" }
thiserror = { version = "1.0.55" }
syn = { version = "2.0.43" }
quote = { version = "=1.0.33" }
Expand All @@ -44,7 +44,7 @@ async-stream = "0.3.5"
arbiter-core.workspace = true

# Command line and config
clap = { version = "=4.4.18", features = ["derive"] }
clap = { version = "=4.4.14", features = ["derive"] }
serde.workspace = true
serde_json.workspace = true
config = { version = "=0.13.4" }
Expand All @@ -65,8 +65,8 @@ thiserror.workspace = true
# Dependencies for the test build and development
[dev-dependencies]
tokio.workspace = true
assert_cmd = { version = "=2.0.13" }
rayon = { version = "1.8.1" }
assert_cmd = { version = "=2.0.12" }
rayon = { version = "1.8.0" }
revm-primitives.workspace = true

# Release profile
Expand Down
74 changes: 33 additions & 41 deletions arbiter-core/src/data_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use std::{
collections::BTreeMap, fmt::Debug, io::BufWriter, marker::PhantomData, mem::transmute,
pin::Pin, sync::Arc,
sync::Arc,
};

use ethers::{
Expand All @@ -38,6 +38,7 @@
};
use serde::Serialize;
use serde_json::Value;
use tokio::{sync::broadcast::Receiver as BroadcastReceiver, task::JoinHandle};

use super::*;
use crate::{
Expand All @@ -63,8 +64,8 @@
/// traits, and has a static lifetime.
pub struct EventLogger {
decoder: FilterDecoder,
receiver: Option<crossbeam_channel::Receiver<Broadcast>>,
shutdown_sender: Option<crossbeam_channel::Sender<()>>,
receiver: Option<BroadcastReceiver<Broadcast>>,
// shutdown_sender: Option<crossbeam_channel::Sender<()>>,
output_file_type: Option<OutputFileType>,
directory: Option<String>,
file_name: Option<String>,
Expand Down Expand Up @@ -103,7 +104,7 @@
file_name: None,
decoder: BTreeMap::new(),
receiver: None,
shutdown_sender: None,
// shutdown_sender: None,
output_file_type: None,
metadata: None,
}
Expand Down Expand Up @@ -138,15 +139,7 @@
);
let connection = middleware.provider().as_ref();
if self.receiver.is_none() {
let (event_sender, event_receiver) = crossbeam_channel::unbounded::<Broadcast>();
let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded::<()>(1);
connection
.event_broadcaster
.lock()
.unwrap()
.add_sender(event_sender, Some(shutdown_receiver));
self.receiver = Some(event_receiver);
self.shutdown_sender = Some(shutdown_sender);
self.receiver = Some(connection.event_sender.subscribe());
}
debug!("`EventLogger` now provided with event labeled: {:?}", name);
self
Expand Down Expand Up @@ -253,15 +246,15 @@
///
/// This function will return an error if there is a problem creating the
/// directories or files, or writing to the files.
pub fn run(self) -> Result<(), RevmMiddlewareError> {
let receiver = self.receiver.unwrap();
pub fn run(self) -> Result<JoinHandle<()>, RevmMiddlewareError> {
let mut receiver = self.receiver.unwrap();
let dir = self.directory.unwrap_or("./data".into());
let file_name = self.file_name.unwrap_or("output".into());
let file_type = self.output_file_type.unwrap_or(OutputFileType::JSON);
let metadata = self.metadata.clone();
std::thread::spawn(move || {
let task = tokio::spawn(async move {
let mut events: BTreeMap<String, BTreeMap<String, Vec<Value>>> = BTreeMap::new();
while let Ok(broadcast) = receiver.recv() {
while let Ok(broadcast) = receiver.recv().await {
match broadcast {
Broadcast::StopSignal => {
debug!("`EventLogger` has seen a stop signal");
Expand All @@ -288,7 +281,6 @@
}
let data = OutputData { events, metadata };
serde_json::to_writer(writer, &data).expect("Unable to write data");
self.shutdown_sender.unwrap().send(()).unwrap();
}
OutputFileType::CSV => {
// Write the DataFrame to a CSV file
Expand All @@ -301,7 +293,6 @@
writer.finish(&mut df).unwrap_or_else(|_| {
panic!("Error writing to csv file");
});
self.shutdown_sender.unwrap().send(()).unwrap();
}
OutputFileType::Parquet => {
// Write the DataFrame to a parquet file
Expand All @@ -314,7 +305,6 @@
writer.finish(&mut df).unwrap_or_else(|_| {
panic!("Error writing to parquet file");
});
self.shutdown_sender.unwrap().send(()).unwrap();
}
}
break;
Expand Down Expand Up @@ -361,36 +351,38 @@
}
}
});
Ok(())
Ok(task)
}

/// Returns a stream of the serialized events.
pub fn stream(self) -> Pin<Box<dyn Stream<Item = String> + Send + 'static>> {
let receiver = self.receiver.clone().unwrap();

let stream = async_stream::stream! {
while let Ok(broadcast) = receiver.recv() {
match broadcast {
Broadcast::StopSignal => {
trace!("`EventLogger` has seen a stop signal");
break;
}
Broadcast::Event(event) => {
trace!("`EventLogger` received an event");
let ethers_logs = revm_logs_to_ethers_logs(event);
for log in ethers_logs {
for (_id, (filter, decoder)) in self.decoder.iter() {
if filter.filter_address(&log) && filter.filter_topics(&log) {
yield decoder(&log.clone().into());
pub fn stream(mut self) -> Option<impl Stream<Item = String> + Send> {
if let Some(mut receiver) = self.receiver.take() {
let stream = async_stream::stream! {
while let Ok(broadcast) = receiver.recv().await {
match broadcast {
Broadcast::StopSignal => {
trace!("`EventLogger` has seen a stop signal");
break;

Check warning on line 365 in arbiter-core/src/data_collection.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-core/src/data_collection.rs#L364-L365

Added lines #L364 - L365 were not covered by tests
}
Broadcast::Event(event) => {
trace!("`EventLogger` received an event");
let ethers_logs = revm_logs_to_ethers_logs(event);
for log in ethers_logs {
for (_id, (filter, decoder)) in self.decoder.iter() {
if filter.filter_address(&log) && filter.filter_topics(&log) {
yield decoder(&log.clone().into());
}
}
}
}
}
}
}
};
};

Box::pin(stream)
Some(stream)
} else {
None
}
}
}

Expand Down
87 changes: 19 additions & 68 deletions arbiter-core/src/environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::broadcast::{channel, Sender as BroadcastSender};

use super::*;
#[cfg_attr(doc, doc(hidden))]
Expand Down Expand Up @@ -84,14 +85,6 @@
/// emitted from transactions.
pub(crate) type OutcomeReceiver = Receiver<Result<Outcome, EnvironmentError>>;

/// Alias for the sender used in the [`EventBroadcaster`] that transmits
/// contract events via [`Log`].
pub(crate) type EventSender = Sender<Broadcast>;

/// Alias for the receiver used in the [`EventBroadcaster`] that accepts
/// shutdown signals from child processes.
pub(crate) type ShutDownReceiver = Receiver<()>;

/// Represents a sandboxed EVM environment.
///
/// ## Communication
Expand Down Expand Up @@ -162,10 +155,11 @@
db: Option<CacheDB<EmptyDB>>,
) -> Self {
let (instruction_sender, instruction_receiver) = unbounded();
let (event_broadcaster, _) = channel(512);
let socket = Socket {
instruction_sender: Arc::new(instruction_sender),
instruction_receiver,
event_broadcaster: Arc::new(Mutex::new(EventBroadcaster::new())),
event_broadcaster,
};
let db = db.map(|db| ArbiterDB(Arc::new(RwLock::new(db))));

Expand Down Expand Up @@ -540,15 +534,19 @@

// update transaction count for sender

let event_broadcaster = event_broadcaster
.lock()
.map_err(|e| EnvironmentError::Communication(e.to_string()))?;
let receipt_data = ReceiptData {
block_number,
transaction_index: transaction_index.into(),
cumulative_gas_per_block,
};
event_broadcaster.broadcast(Some(execution_result.logs()), false)?;
match event_broadcaster.send(Broadcast::Event(execution_result.logs())) {
Ok(_) => {}
Err(_) => {
warn!(
"Event was not sent to any listeners. Are there any listeners?"
)

Check warning on line 547 in arbiter-core/src/environment/mod.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-core/src/environment/mod.rs#L546-L547

Added lines #L546 - L547 were not covered by tests
}
}
outcome_sender
.send(Ok(Outcome::TransactionCompleted(
execution_result,
Expand Down Expand Up @@ -649,10 +647,15 @@
.map_err(|e| EnvironmentError::Communication(e.to_string()))?;
}
Instruction::Stop(outcome_sender) => {
match event_broadcaster.send(Broadcast::StopSignal) {
Ok(_) => {}
Err(_) => {
warn!("Stop signal was not sent to any listeners. Are there any listeners?")
}
}
outcome_sender
.send(Ok(Outcome::StopCompleted(evm.db.unwrap())))
.map_err(|e| EnvironmentError::Communication(e.to_string()))?;
event_broadcaster.lock().unwrap().broadcast(None, true)?;
break;
}
}
Expand Down Expand Up @@ -718,7 +721,7 @@
pub(crate) struct Socket {
pub(crate) instruction_sender: Arc<InstructionSender>,
pub(crate) instruction_receiver: InstructionReceiver,
pub(crate) event_broadcaster: Arc<Mutex<EventBroadcaster>>,
pub(crate) event_broadcaster: BroadcastSender<Broadcast>,
}

/// Enum representing the types of broadcasts that can be sent.
Expand All @@ -729,66 +732,14 @@
/// Variants:
/// * `StopSignal`: Represents a signal to stop the event logger process.
/// * `Event(Vec<Log>)`: Represents a broadcast of a vector of Ethereum logs.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug)]
pub enum Broadcast {
/// Represents a signal to stop the event logger process.
StopSignal,
/// Represents a broadcast of a vector of Ethereum logs.
Event(Vec<Log>),
}

/// Responsible for broadcasting Ethereum logs to subscribers.
///
/// Maintains a list of senders to which logs are sent whenever they are
/// produced by the EVM.
#[derive(Clone, Debug)]
pub(crate) struct EventBroadcaster(Vec<(EventSender, Option<ShutDownReceiver>)>);

impl EventBroadcaster {
/// Called only when creating a new [`Environment`]
fn new() -> Self {
Self(vec![])
}

/// Called from [`RevmMiddleware`] implementation when setting up a new
/// `FilterWatcher` as each watcher will need their own sender
pub(crate) fn add_sender(
&mut self,
sender: EventSender,
shutdown_receiver: Option<ShutDownReceiver>,
) {
debug!("Sender added for `EventBroadcaster`");
self.0.push((sender, shutdown_receiver));
}

/// Loop through each sender and send `Vec<Log>` emitted from a transaction
/// downstream to any and all receivers
fn broadcast(&self, logs: Option<Vec<Log>>, stop_signal: bool) -> Result<(), EnvironmentError> {
if stop_signal {
for (sender, receiver) in &self.0 {
sender.send(Broadcast::StopSignal)?;
debug!("Broadcasted stop signal to listener");
if let Some(receiver) = receiver {
receiver
.recv()
.map_err(|_| EnvironmentError::ShutDownReceiverError)?;
debug!("Blocked on shutdown receiver signal");
}
}
return Ok(());
} else {
if logs.is_none() {
unreachable!();
}
for (sender, _) in &self.0 {
sender.send(Broadcast::Event(logs.clone().unwrap()))?;
trace!("Broadcasting event to all listeners")
}
}
Ok(())
}
}

/// Convert a U256 to a U64, discarding the higher bits if the number is larger
/// than 2^64 # Arguments
/// * `input` - The U256 to convert.
Expand Down
Loading
Loading