Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(arbiter-core): stream with meta #863

Merged
merged 3 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion arbiter-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ futures.workspace = true
cargo_metadata = "0.18.1"
chrono = "0.4.33"


assert_matches = { version = "=1.5" }

[[bench]]
Expand Down
8 changes: 4 additions & 4 deletions arbiter-core/src/data_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,9 @@ impl EventLogger {
}
break;
}
Broadcast::Event(event) => {
Broadcast::Event(event, receipt_data) => {
trace!("`EventLogger` received an event");
let ethers_logs = revm_logs_to_ethers_logs(event);
let ethers_logs = revm_logs_to_ethers_logs(event, &receipt_data);
for log in ethers_logs {
for (contract_name, (filter, decoder)) in self.decoder.iter() {
if filter.filter_address(&log) && filter.filter_topics(&log) {
Expand Down Expand Up @@ -365,9 +365,9 @@ impl EventLogger {
trace!("`EventLogger` has seen a stop signal");
break;
}
Broadcast::Event(event) => {
Broadcast::Event(event, receipt_data) => {
trace!("`EventLogger` received an event");
let ethers_logs = revm_logs_to_ethers_logs(event);
let ethers_logs = revm_logs_to_ethers_logs(event, &receipt_data);
for log in ethers_logs {
for (_id, (filter, decoder)) in self.decoder.iter() {
if filter.filter_address(&log) && filter.filter_topics(&log) {
Expand Down
7 changes: 5 additions & 2 deletions arbiter-core/src/environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,10 @@ impl Environment {
transaction_index,
cumulative_gas_per_block,
};
match event_broadcaster.send(Broadcast::Event(execution_result.logs())) {
match event_broadcaster.send(Broadcast::Event(
execution_result.logs(),
receipt_data.clone(),
)) {
Ok(_) => {}
Err(_) => {
warn!(
Expand Down Expand Up @@ -599,7 +602,7 @@ pub enum Broadcast {
/// Represents a signal to stop the event logger process.
StopSignal,
/// Represents a broadcast of a vector of Ethereum logs.
Event(Vec<Log>),
Event(Vec<Log>, ReceiptData),
}

/// Convert a U256 to a U64, discarding the higher bits if the number is larger
Expand Down
32 changes: 17 additions & 15 deletions arbiter-core/src/middleware/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ impl JsonRpcClient for Connection {
if let Some(receiver) = filter_receiver.receiver.as_mut() {
if let Ok(broadcast) = receiver.try_recv() {
match broadcast {
Broadcast::Event(received_logs) => {
let ethers_logs = revm_logs_to_ethers_logs(received_logs);
Broadcast::Event(received_logs, receipt_data) => {
let ethers_logs =
revm_logs_to_ethers_logs(received_logs, &receipt_data);
for log in ethers_logs {
if filtered_params.filter_address(&log)
&& filtered_params.filter_topics(&log)
Expand Down Expand Up @@ -150,10 +151,10 @@ impl PubsubClient for Connection {
Broadcast::StopSignal => {
break;
}
Broadcast::Event(logs) => {
Broadcast::Event(logs, receipt_data) => {
let filtered_params =
FilteredParams::new(Some(filter_receiver.filter.clone()));
let ethers_logs = revm_logs_to_ethers_logs(logs);
let ethers_logs = revm_logs_to_ethers_logs(logs, &receipt_data);
// Return the first log that matches the filter, if any
for log in ethers_logs {
if filtered_params.filter_address(&log)
Expand Down Expand Up @@ -214,28 +215,29 @@ pub(crate) struct FilterReceiver {
}

// TODO: The logs below could have the block number, transaction index, and
// maybe other fields populated.
// maybe other fields populated. Right now, some are defaulted and are not
// correct!

/// Converts logs from the Revm format to the Ethers format.
///
/// This function iterates over a list of logs as they appear in the `revm` and
/// converts each log entry to the corresponding format used by the `ethers-rs`
/// library.
#[inline]
pub fn revm_logs_to_ethers_logs(revm_logs: Vec<Log>) -> Vec<eLog> {
let mut logs: Vec<ethers::core::types::Log> = vec![];
pub fn revm_logs_to_ethers_logs(revm_logs: Vec<Log>, receipt_data: &ReceiptData) -> Vec<eLog> {
let mut logs: Vec<eLog> = vec![];
for revm_log in revm_logs {
let topics = revm_log.topics().iter().map(recast_b256).collect();
let data = ethers::core::types::Bytes::from(revm_log.data.data.0);
let log = ethers::core::types::Log {
address: ethers::core::types::H160::from(revm_log.address.into_array()),
let data = eBytes::from(revm_log.data.data.0);
let log = eLog {
address: eAddress::from(revm_log.address.into_array()),
topics,
data,
block_hash: None,
block_number: None,
transaction_hash: None,
transaction_index: None,
log_index: None,
block_hash: Some(H256::default()),
block_number: Some(receipt_data.block_number),
transaction_hash: Some(H256::default()),
transaction_index: Some(receipt_data.transaction_index),
log_index: Some(eU256::from(0)),
transaction_log_index: None,
log_type: None,
removed: None,
Expand Down
20 changes: 10 additions & 10 deletions arbiter-core/src/middleware/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ impl Middleware for ArbiterMiddleware {
let outcome = provider.outcome_receiver.recv()??;

if let Outcome::TransactionCompleted(execution_result, receipt_data) = outcome {
let mut block_hasher = Sha256::new();
block_hasher.update(receipt_data.block_number.to_string().as_bytes());
let block_hash = block_hasher.finalize();
let block_hash = Some(H256::from_slice(&block_hash));
match execution_result {
ExecutionResult::Revert { gas_used, output } => {
return Err(ArbiterCoreError::ExecutionRevert {
Expand All @@ -475,12 +479,7 @@ impl Middleware for ArbiterMiddleware {
logs,
..
} => {
let logs = revm_logs_to_ethers_logs(logs);
let to: Option<eAddress> = match tx_env.transact_to {
TransactTo::Call(address) => Some(address.into_array().into()),
TransactTo::Create(_) => None,
};

// TODO: This is why we need the signer middleware
// Note that this is technically not the correct construction on the tx hash
// but until we increment the nonce correctly this will do
let sender = self.address();
Expand All @@ -490,10 +489,11 @@ impl Middleware for ArbiterMiddleware {
hasher.update(data.as_ref());
let hash = hasher.finalize();

let mut block_hasher = Sha256::new();
block_hasher.update(receipt_data.block_number.to_string().as_bytes());
let block_hash = block_hasher.finalize();
let block_hash = Some(H256::from_slice(&block_hash));
let logs = revm_logs_to_ethers_logs(logs, &receipt_data);
let to: Option<eAddress> = match tx_env.transact_to {
TransactTo::Call(address) => Some(address.into_array().into()),
TransactTo::Create(_) => None,
};

match output {
Output::Create(_, address) => {
Expand Down
2 changes: 1 addition & 1 deletion arbiter-core/tests/environment_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn receipt_data() {
block_hasher.update(receipt.block_number.unwrap().to_string().as_bytes());
let block_hash = block_hasher.finalize();
let block_hash = Some(H256::from_slice(&block_hash));
assert_eq!(receipt.block_hash, block_hash);
assert_eq!(receipt.block_hash, block_hash); // panic here left side is none
assert_eq!(receipt.status, Some(1.into()));

assert!(receipt.contract_address.is_none());
Expand Down
31 changes: 31 additions & 0 deletions arbiter-core/tests/middleware_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,3 +594,34 @@ async fn access() {
}
}
}

#[tokio::test]
async fn stream_with_meta() {
let (_environment, client) = startup();

let arbx = deploy_arbx(client.clone()).await;

let events = arbx.events();
let mut stream = events.stream_with_meta().await.unwrap();

for _ in 0..2 {
arbx.approve(client.address(), eU256::from(1))
.send()
.await
.unwrap()
.await
.unwrap();
}

client.update_block(1, 1).unwrap();

arbx.approve(client.address(), eU256::from(1))
.send()
.await
.unwrap()
.await
.unwrap();
assert_eq!(format!("{:?}", stream.next().await), "Some(Ok((ApprovalFilter(ApprovalFilter { owner: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, spender: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, amount: 1 }), LogMeta { address: 0x067ea9e44c76a2620f10b39a1b51d5124a299192, block_number: 0, block_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_index: 1, log_index: 0 })))");
assert_eq!(format!("{:?}", stream.next().await), "Some(Ok((ApprovalFilter(ApprovalFilter { owner: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, spender: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, amount: 1 }), LogMeta { address: 0x067ea9e44c76a2620f10b39a1b51d5124a299192, block_number: 0, block_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_index: 2, log_index: 0 })))");
assert_eq!(format!("{:?}", stream.next().await), "Some(Ok((ApprovalFilter(ApprovalFilter { owner: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, spender: 0x2efdc9eecfee3a776209fcb8e9a83a6b221d74f5, amount: 1 }), LogMeta { address: 0x067ea9e44c76a2620f10b39a1b51d5124a299192, block_number: 1, block_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_hash: 0x0000000000000000000000000000000000000000000000000000000000000000, transaction_index: 0, log_index: 0 })))");
}
Loading