diff --git a/core/src/subgraph/instance.rs b/core/src/subgraph/instance.rs index e27c8580586..b54942a924e 100644 --- a/core/src/subgraph/instance.rs +++ b/core/src/subgraph/instance.rs @@ -135,7 +135,7 @@ where async fn process_trigger( &self, logger: &Logger, - block: &Arc, + block: &Arc, trigger: EthereumTrigger, state: BlockState, proof_of_indexing: SharedProofOfIndexing, @@ -154,7 +154,7 @@ where async fn process_trigger_in_runtime_hosts( logger: &Logger, hosts: &[Arc], - block: &Arc, + block: &Arc, trigger: EthereumTrigger, mut state: BlockState, proof_of_indexing: SharedProofOfIndexing, @@ -165,12 +165,11 @@ where let transaction = block .transaction_for_log(&log) - .map(Arc::new) + .map(|tx| Arc::new(tx.clone())) .context("Found no transaction for event")?; let matching_hosts = hosts.iter().filter(|host| host.matches_log(&log)); // Process the log in each host in the same order the corresponding data // sources appear in the subgraph manifest - let transaction = Arc::new(transaction); for host in matching_hosts { state = host .process_log( @@ -189,8 +188,8 @@ where let transaction = block .transaction_for_call(&call) + .map(|tx| Arc::new(tx.clone())) .context("Found no transaction for call")?; - let transaction = Arc::new(transaction); let matching_hosts = hosts.iter().filter(|host| host.matches_call(&call)); for host in matching_hosts { @@ -206,16 +205,16 @@ where .await?; } } - EthereumTrigger::Block(ptr, trigger_type) => { + EthereumTrigger::Block(ptr, block_trigger_) => { let matching_hosts = hosts .iter() - .filter(|host| host.matches_block(&trigger_type, ptr.number)); + .filter(|host| host.matches_block(&block_trigger_.trigger_type, ptr.number)); for host in matching_hosts { state = host .process_block( logger, block, - &trigger_type, + &block_trigger_, state, proof_of_indexing.cheap_clone(), ) diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index e6aec8e8328..b86b2664e54 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -638,7 +638,7 @@ where } // Obtain current and new block pointer (after this block is processed) - let light_block = Arc::new(block.light_block()); + let light_block = Arc::new(EthereumBlockType::Light(block.light_block())); let block_ptr_after = EthereumBlockPointer::from(&block); let block_ptr_for_new_data_sources = block_ptr_after.clone(); @@ -879,7 +879,7 @@ async fn process_triggers, - block: &Arc, + block: &Arc, triggers: Vec, ) -> Result<(IndexingContext, BlockState), CancelableError> { for trigger in triggers.into_iter() { diff --git a/graph/src/components/ethereum/adapter.rs b/graph/src/components/ethereum/adapter.rs index 97cbc6860c0..aa1a409ef44 100644 --- a/graph/src/components/ethereum/adapter.rs +++ b/graph/src/components/ethereum/adapter.rs @@ -411,6 +411,7 @@ impl From for EthereumCallFilter { pub struct EthereumBlockFilter { pub contract_addresses: HashSet<(u64, Address)>, pub trigger_every_block: bool, + pub block_type: BlockType, } impl EthereumBlockFilter { @@ -421,8 +422,7 @@ impl EthereumBlockFilter { let has_block_handler_with_call_filter = data_source .mapping .block_handlers - .clone() - .into_iter() + .iter() .any(|block_handler| match block_handler.filter { Some(ref filter) if *filter == BlockHandlerFilter::Call => return true, _ => return false, @@ -431,12 +431,21 @@ impl EthereumBlockFilter { let has_block_handler_without_filter = data_source .mapping .block_handlers - .clone() - .into_iter() + .iter() .any(|block_handler| block_handler.filter.is_none()); + let block_type = data_source + .mapping + .block_handlers + .iter() + .map(|handler| handler.block_format) + .max() + .unwrap_or(EthereumBlockHandlerData::Block) + .into(); + filter_opt.extend(Self { trigger_every_block: has_block_handler_without_filter, + block_type, contract_addresses: if has_block_handler_with_call_filter { vec![( data_source.source.start_block, @@ -454,6 +463,8 @@ impl EthereumBlockFilter { pub fn extend(&mut self, other: EthereumBlockFilter) { self.trigger_every_block = self.trigger_every_block || other.trigger_every_block; + self.block_type = self.block_type.max(other.block_type); + self.contract_addresses = self.contract_addresses.iter().cloned().fold( HashSet::new(), |mut addresses, (start_block, address)| { @@ -803,20 +814,30 @@ fn parse_block_triggers( ) -> Vec { let block_ptr = EthereumBlockPointer::from(&block.ethereum_block); let trigger_every_block = block_filter.trigger_every_block; + let block_type = block_filter.block_type; let call_filter = EthereumCallFilter::from(block_filter); let mut triggers = block.calls.as_ref().map_or(vec![], |calls| { calls .iter() .filter(move |call| call_filter.matches(call)) .map(move |call| { - EthereumTrigger::Block(block_ptr, EthereumBlockTriggerType::WithCallTo(call.to)) + EthereumTrigger::Block( + block_ptr, + EthereumBlockTrigger { + block_type, + trigger_type: EthereumBlockTriggerType::WithCallTo(call.to), + }, + ) }) .collect::>() }); if trigger_every_block { triggers.push(EthereumTrigger::Block( block_ptr, - EthereumBlockTriggerType::Every, + EthereumBlockTrigger { + block_type, + trigger_type: EthereumBlockTriggerType::Every, + }, )); } triggers @@ -921,7 +942,15 @@ pub fn blocks_with_triggers( .block_range_to_ptrs(logger.clone(), from, to) .map(move |ptrs| { ptrs.into_iter() - .map(|ptr| EthereumTrigger::Block(ptr, EthereumBlockTriggerType::Every)) + .map(|ptr| { + EthereumTrigger::Block( + ptr, + EthereumBlockTrigger { + block_type: block_filter.block_type, + trigger_type: EthereumBlockTriggerType::Every, + }, + ) + }) .collect() }), )) @@ -929,13 +958,17 @@ pub fn blocks_with_triggers( // To determine which blocks include a call to addresses // in the block filter, transform the `block_filter` into // a `call_filter` and run `blocks_with_calls` + let block_type = block_filter.block_type; let call_filter = EthereumCallFilter::from(block_filter); trigger_futs.push(Box::new( eth.calls_in_block_range(&logger, subgraph_metrics.clone(), from, to, call_filter) - .map(|call| { + .map(move |call| { EthereumTrigger::Block( EthereumBlockPointer::from(&call), - EthereumBlockTriggerType::WithCallTo(call.to), + EthereumBlockTrigger { + block_type, + trigger_type: EthereumBlockTriggerType::WithCallTo(call.to), + }, ) }) .collect(), diff --git a/graph/src/components/ethereum/mod.rs b/graph/src/components/ethereum/mod.rs index e348586a690..13ba61f1f24 100644 --- a/graph/src/components/ethereum/mod.rs +++ b/graph/src/components/ethereum/mod.rs @@ -13,8 +13,10 @@ pub use self::adapter::{ pub use self::listener::{ChainHeadUpdate, ChainHeadUpdateListener, ChainHeadUpdateStream}; pub use self::stream::{BlockStream, BlockStreamBuilder, BlockStreamEvent}; pub use self::types::{ - BlockFinality, EthereumBlock, EthereumBlockData, EthereumBlockPointer, - EthereumBlockTriggerType, EthereumBlockWithCalls, EthereumBlockWithTriggers, EthereumCall, - EthereumCallData, EthereumEventData, EthereumTransactionData, EthereumTrigger, - LightEthereumBlock, LightEthereumBlockExt, + BlockFinality, BlockType, EthereumBlock, EthereumBlockData, EthereumBlockPointer, + EthereumBlockTrigger, EthereumBlockTriggerType, EthereumBlockType, EthereumBlockWithCalls, + EthereumBlockWithTriggers, EthereumCall, EthereumCallData, EthereumEventData, + EthereumTransactionData, EthereumTransactionReceiptData, EthereumTrigger, + FullEthereumBlockData, FullEthereumBlockDataWithReceipts, LightEthereumBlock, + LightEthereumBlockExt, }; diff --git a/graph/src/components/ethereum/types.rs b/graph/src/components/ethereum/types.rs index 214e075a36f..359c132790e 100644 --- a/graph/src/components/ethereum/types.rs +++ b/graph/src/components/ethereum/types.rs @@ -3,17 +3,18 @@ use serde::{Deserialize, Serialize}; use stable_hash::prelude::*; use stable_hash::utils::AsBytes; use std::cmp::Ordering; +use std::convert::TryFrom; use std::fmt; use web3::types::*; -use crate::prelude::{EntityKey, SubgraphDeploymentId, ToEntityKey}; +use crate::prelude::{ + anyhow, EntityKey, EthereumBlockHandlerData, SubgraphDeploymentId, ToEntityKey, +}; pub type LightEthereumBlock = Block; pub trait LightEthereumBlockExt { fn number(&self) -> u64; - fn transaction_for_log(&self, log: &Log) -> Option; - fn transaction_for_call(&self, call: &EthereumCall) -> Option; fn parent_ptr(&self) -> Option; fn format(&self) -> String; } @@ -23,18 +24,6 @@ impl LightEthereumBlockExt for LightEthereumBlock { self.number.unwrap().as_u64() } - fn transaction_for_log(&self, log: &Log) -> Option { - log.transaction_hash - .and_then(|hash| self.transactions.iter().find(|tx| tx.hash == hash)) - .cloned() - } - - fn transaction_for_call(&self, call: &EthereumCall) -> Option { - call.transaction_hash - .and_then(|hash| self.transactions.iter().find(|tx| tx.hash == hash)) - .cloned() - } - fn parent_ptr(&self) -> Option { match self.number() { 0 => None, @@ -56,6 +45,16 @@ impl LightEthereumBlockExt for LightEthereumBlock { } } +impl<'a> From<&'a EthereumBlockType> for LightEthereumBlock { + fn from(block: &'a EthereumBlockType) -> LightEthereumBlock { + match block { + EthereumBlockType::FullWithReceipts(block) => block.block.clone(), + EthereumBlockType::Full(block) => block.clone(), + EthereumBlockType::Light(block) => block.clone(), + } + } +} + /// This is used in `EthereumAdapter::triggers_in_block`, called when re-processing a block for /// newly created data sources. This allows the re-processing to be reorg safe without having to /// always fetch the full block data. @@ -168,7 +167,7 @@ impl EthereumCall { #[derive(Clone, Debug)] pub enum EthereumTrigger { - Block(EthereumBlockPointer, EthereumBlockTriggerType), + Block(EthereumBlockPointer, EthereumBlockTrigger), Call(EthereumCall), Log(Log), } @@ -193,6 +192,86 @@ impl PartialEq for EthereumTrigger { impl Eq for EthereumTrigger {} +#[derive(Clone, Debug)] +pub enum EthereumBlockType { + Light(LightEthereumBlock), + + Full(LightEthereumBlock), + + FullWithReceipts(EthereumBlock), +} + +impl EthereumBlockType { + pub fn light_block(&self) -> &LightEthereumBlock { + match self { + EthereumBlockType::Light(block) => block, + EthereumBlockType::Full(block) => block, + EthereumBlockType::FullWithReceipts(block) => &block.block, + } + } + + pub fn hash(&self) -> H256 { + self.light_block().hash.unwrap() + } + + pub fn number(&self) -> u64 { + self.light_block().number.unwrap().as_u64() + } + + pub fn transaction_for_log(&self, log: &Log) -> Option<&Transaction> { + log.transaction_hash.and_then(|hash| { + self.light_block() + .transactions + .iter() + .find(|tx| tx.hash == hash) + }) + } + + pub fn transaction_for_call(&self, call: &EthereumCall) -> Option<&Transaction> { + call.transaction_hash.and_then(|hash| { + self.light_block() + .transactions + .iter() + .find(|tx| tx.hash == hash) + }) + } +} + +impl Default for EthereumBlockType { + fn default() -> Self { + EthereumBlockType::Light(LightEthereumBlock::default()) + } +} + +#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq)] +pub enum BlockType { + Light, + Full, + FullWithReceipts, +} + +impl Default for BlockType { + fn default() -> BlockType { + BlockType::Light + } +} + +impl From for BlockType { + fn from(block: EthereumBlockHandlerData) -> BlockType { + match block { + EthereumBlockHandlerData::Block => BlockType::Light, + EthereumBlockHandlerData::FullBlock => BlockType::Full, + EthereumBlockHandlerData::FullBlockWithReceipts => BlockType::FullWithReceipts, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct EthereumBlockTrigger { + pub block_type: BlockType, + pub trigger_type: EthereumBlockTriggerType, +} + #[derive(Clone, Debug, PartialEq, Eq)] pub enum EthereumBlockTriggerType { Every, @@ -263,6 +342,166 @@ impl PartialOrd for EthereumTrigger { } } +pub struct EthereumTransactionReceiptData { + // from receipts + // Geth nodes do not support `root` so it is not included + pub hash: H256, + pub index: Index, + pub cumulative_gas_used: U256, + pub gas_used: Option, + pub contract_address: Option, + pub status: Option, + + // from txs + pub from: H160, + pub to: Option, + pub value: U256, + pub gas_price: U256, + pub gas: U256, + pub input: Bytes, +} + +/// Ethereum block data with transactions and their receipts. +pub struct FullEthereumBlockDataWithReceipts { + pub hash: H256, + pub parent_hash: H256, + pub uncles_hash: H256, + pub author: H160, + pub state_root: H256, + pub transactions_root: H256, + pub receipts_root: H256, + pub number: U64, + pub gas_used: U256, + pub gas_limit: U256, + pub timestamp: U256, + pub difficulty: U256, + pub total_difficulty: U256, + pub size: Option, + pub transaction_receipts: Vec, +} + +impl<'a> From<&'a EthereumBlock> for FullEthereumBlockDataWithReceipts { + fn from(block: &'a EthereumBlock) -> FullEthereumBlockDataWithReceipts { + let transaction_receipts_data = block + .block + .transactions + .iter() + .cloned() + .zip(block.transaction_receipts.iter().cloned()) + .map(|transaction_and_receipt| { + assert_eq!( + transaction_and_receipt.0.hash, + transaction_and_receipt.1.transaction_hash + ); + EthereumTransactionReceiptData { + hash: transaction_and_receipt.0.hash, + index: transaction_and_receipt.1.transaction_index, + cumulative_gas_used: transaction_and_receipt.1.cumulative_gas_used, + gas_used: transaction_and_receipt.1.gas_used, + contract_address: transaction_and_receipt.1.contract_address, + status: transaction_and_receipt.1.status, + + // from txs + from: transaction_and_receipt.0.from, + to: transaction_and_receipt.0.to, + value: transaction_and_receipt.0.value, + gas_price: transaction_and_receipt.0.gas_price, + gas: transaction_and_receipt.0.gas, + input: transaction_and_receipt.0.input, + } + }) + .collect::>(); + let block = &block.block; + + FullEthereumBlockDataWithReceipts { + hash: block.hash.unwrap(), + parent_hash: block.parent_hash, + uncles_hash: block.uncles_hash, + author: block.author, + state_root: block.state_root, + transactions_root: block.transactions_root, + receipts_root: block.receipts_root, + number: block.number.unwrap(), + gas_used: block.gas_used, + gas_limit: block.gas_limit, + timestamp: block.timestamp, + difficulty: block.difficulty, + total_difficulty: block.total_difficulty.unwrap_or_default(), + size: block.size, + transaction_receipts: transaction_receipts_data, + } + } +} + +impl<'a> TryFrom<&'a EthereumBlockType> for FullEthereumBlockDataWithReceipts { + type Error = anyhow::Error; + + fn try_from( + block: &'a EthereumBlockType, + ) -> Result { + let fullblock = match block { + EthereumBlockType::FullWithReceipts(full_block) => full_block, + EthereumBlockType::Full(_) | EthereumBlockType::Light(_) => return Err(anyhow::anyhow!( + "Failed to convert EthereumBlockType to FullEthereumBlockDataWithReceipts, requires an EthereumBlockType::FullWithReceipts()" + )), + }; + Ok(fullblock.into()) + } +} + +/// Ethereum block data with transactions. +#[derive(Clone, Debug, Default)] +pub struct FullEthereumBlockData { + pub hash: H256, + pub parent_hash: H256, + pub uncles_hash: H256, + pub author: H160, + pub state_root: H256, + pub transactions_root: H256, + pub receipts_root: H256, + pub number: U64, + pub gas_used: U256, + pub gas_limit: U256, + pub timestamp: U256, + pub difficulty: U256, + pub total_difficulty: U256, + pub size: Option, + pub transactions: Vec, +} + +impl<'a> From<&'a LightEthereumBlock> for FullEthereumBlockData { + fn from(block: &'a LightEthereumBlock) -> FullEthereumBlockData { + FullEthereumBlockData { + hash: block.hash.unwrap(), + parent_hash: block.parent_hash, + uncles_hash: block.uncles_hash, + author: block.author, + state_root: block.state_root, + transactions_root: block.transactions_root, + receipts_root: block.receipts_root, + number: block.number.unwrap(), + gas_used: block.gas_used, + gas_limit: block.gas_limit, + timestamp: block.timestamp, + difficulty: block.difficulty, + total_difficulty: block.total_difficulty.unwrap_or_default(), + size: block.size, + transactions: block + .transactions + .iter() + .map(|tx| EthereumTransactionData::from(tx)) + .collect(), + } + } +} + +impl<'a> From<&'a EthereumBlockType> for FullEthereumBlockData { + fn from(block: &'a EthereumBlockType) -> FullEthereumBlockData { + let block = &LightEthereumBlock::from(block); + block.into() + } +} + /// Ethereum block data. #[derive(Clone, Debug, Default)] pub struct EthereumBlockData { @@ -282,8 +521,31 @@ pub struct EthereumBlockData { pub size: Option, } -impl<'a, T> From<&'a Block> for EthereumBlockData { - fn from(block: &'a Block) -> EthereumBlockData { +impl<'a> From<&'a LightEthereumBlock> for EthereumBlockData { + fn from(block: &'a LightEthereumBlock) -> EthereumBlockData { + EthereumBlockData { + hash: block.hash.unwrap(), + parent_hash: block.parent_hash, + uncles_hash: block.uncles_hash, + author: block.author, + state_root: block.state_root, + transactions_root: block.transactions_root, + receipts_root: block.receipts_root, + number: block.number.unwrap(), + gas_used: block.gas_used, + gas_limit: block.gas_limit, + timestamp: block.timestamp, + difficulty: block.difficulty, + total_difficulty: block.total_difficulty.unwrap_or_default(), + size: block.size, + } + } +} + +impl<'a> From<&'a EthereumBlockType> for EthereumBlockData { + fn from(block: &'a EthereumBlockType) -> EthereumBlockData { + let block: LightEthereumBlock = block.into(); + EthereumBlockData { hash: block.hash.unwrap(), parent_hash: block.parent_hash, @@ -471,6 +733,12 @@ impl<'a> From<&'a EthereumBlock> for EthereumBlockPointer { } } +impl<'a> From<&'a EthereumBlockType> for EthereumBlockPointer { + fn from(block_type: &'a EthereumBlockType) -> EthereumBlockPointer { + EthereumBlockPointer::from(LightEthereumBlock::from(block_type)) + } +} + impl From<(H256, u64)> for EthereumBlockPointer { fn from((hash, number): (H256, u64)) -> EthereumBlockPointer { if number >= (1 << 63) { @@ -536,7 +804,10 @@ impl ToEntityKey for EthereumBlockPointer { #[cfg(test)] mod test { - use super::{EthereumBlockPointer, EthereumBlockTriggerType, EthereumCall, EthereumTrigger}; + use super::{ + BlockType, EthereumBlockPointer, EthereumBlockTrigger, EthereumBlockTriggerType, + EthereumCall, EthereumTrigger, + }; use web3::types::*; #[test] @@ -546,7 +817,10 @@ mod test { number: 1, hash: H256::random(), }, - EthereumBlockTriggerType::Every, + EthereumBlockTrigger { + block_type: BlockType::Light, + trigger_type: EthereumBlockTriggerType::Every, + }, ); let block2 = EthereumTrigger::Block( @@ -554,7 +828,10 @@ mod test { number: 0, hash: H256::random(), }, - EthereumBlockTriggerType::WithCallTo(Address::random()), + EthereumBlockTrigger { + block_type: BlockType::Light, + trigger_type: EthereumBlockTriggerType::WithCallTo(Address::random()), + }, ); let mut call1 = EthereumCall::default(); diff --git a/graph/src/components/subgraph/host.rs b/graph/src/components/subgraph/host.rs index 11b61093923..31a485db17c 100644 --- a/graph/src/components/subgraph/host.rs +++ b/graph/src/components/subgraph/host.rs @@ -27,7 +27,7 @@ pub trait RuntimeHost: Send + Sync + Debug + 'static { async fn process_log( &self, logger: &Logger, - block: &Arc, + block: &Arc, transaction: &Arc, log: &Arc, state: BlockState, @@ -38,7 +38,7 @@ pub trait RuntimeHost: Send + Sync + Debug + 'static { async fn process_call( &self, logger: &Logger, - block: &Arc, + block: &Arc, transaction: &Arc, call: &Arc, state: BlockState, @@ -49,8 +49,8 @@ pub trait RuntimeHost: Send + Sync + Debug + 'static { async fn process_block( &self, logger: &Logger, - block: &Arc, - trigger_type: &EthereumBlockTriggerType, + block: &Arc, + trigger: &EthereumBlockTrigger, state: BlockState, proof_of_indexing: SharedProofOfIndexing, ) -> Result; diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index 1b48a638f16..ce7d3560d27 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -38,7 +38,7 @@ pub trait SubgraphInstance { async fn process_trigger( &self, logger: &Logger, - block: &Arc, + block: &Arc, trigger: EthereumTrigger, state: BlockState, proof_of_indexing: SharedProofOfIndexing, @@ -48,7 +48,7 @@ pub trait SubgraphInstance { async fn process_trigger_in_runtime_hosts( logger: &Logger, hosts: &[Arc], - block: &Arc, + block: &Arc, trigger: EthereumTrigger, state: BlockState, proof_of_indexing: SharedProofOfIndexing, diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 5516ca90ce9..2a70f2eef3a 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -22,10 +22,10 @@ use crate::data::query::QueryExecutionError; use crate::data::schema::{Schema, SchemaImportError, SchemaValidationError}; use crate::data::store::Entity; use crate::data::subgraph::schema::{ - EthereumBlockHandlerEntity, EthereumCallHandlerEntity, EthereumContractAbiEntity, - EthereumContractDataSourceTemplateEntity, EthereumContractDataSourceTemplateSourceEntity, - EthereumContractEventHandlerEntity, EthereumContractMappingEntity, - EthereumContractSourceEntity, SUBGRAPHS_ID, + EthereumBlockHandlerData, EthereumBlockHandlerEntity, EthereumCallHandlerEntity, + EthereumContractAbiEntity, EthereumContractDataSourceTemplateEntity, + EthereumContractDataSourceTemplateSourceEntity, EthereumContractEventHandlerEntity, + EthereumContractMappingEntity, EthereumContractSourceEntity, SUBGRAPHS_ID, }; use crate::prelude::{ anyhow::{self, Context}, @@ -515,6 +515,7 @@ impl UnresolvedMappingABI { pub struct MappingBlockHandler { pub handler: String, pub filter: Option, + pub block_format: EthereumBlockHandlerData, } #[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] @@ -530,6 +531,7 @@ impl From for MappingBlockHandler { Self { handler: entity.handler, filter: None, + block_format: entity.block_format, } } } diff --git a/graph/src/data/subgraph/schema.rs b/graph/src/data/subgraph/schema.rs index 695420f511b..1ff4627263b 100644 --- a/graph/src/data/subgraph/schema.rs +++ b/graph/src/data/subgraph/schema.rs @@ -1009,6 +1009,7 @@ impl TryFromValue for EthereumContractAbiEntity { pub struct EthereumBlockHandlerEntity { pub handler: String, pub filter: Option, + pub block_format: EthereumBlockHandlerData, } impl WriteOperations for EthereumBlockHandlerEntity { @@ -1025,6 +1026,7 @@ impl WriteOperations for EthereumBlockHandlerEntity { if let Some(filter_id) = filter_id { entity.set("filter", filter_id); } + entity.set("input", self.block_format); ops.add(Self::TYPENAME, id.to_owned(), entity); } } @@ -1048,6 +1050,7 @@ impl From for EthereumBlockHandlerEntity { EthereumBlockHandlerEntity { handler: block_handler.handler, filter, + block_format: EthereumBlockHandlerData::from(block_handler.block_format), } } } @@ -1065,6 +1068,7 @@ impl TryFromValue for EthereumBlockHandlerEntity { Ok(EthereumBlockHandlerEntity { handler: map.get_required("handler")?, filter: map.get_optional("filter")?, + block_format: map.get_optional("blockFormat")?.unwrap_or_default(), }) } } @@ -1106,6 +1110,63 @@ impl TryFromValue for EthereumBlockHandlerFilterEntity { } } +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord, Deserialize)] +pub enum EthereumBlockHandlerData { + Block, + FullBlock, + FullBlockWithReceipts, +} + +impl Default for EthereumBlockHandlerData { + fn default() -> EthereumBlockHandlerData { + EthereumBlockHandlerData::Block + } +} + +impl FromStr for EthereumBlockHandlerData { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "FullBlockWithReceipts" => Ok(EthereumBlockHandlerData::FullBlockWithReceipts), + "FullBlock" => Ok(EthereumBlockHandlerData::FullBlock), + "Block" => Ok(EthereumBlockHandlerData::Block), + _ => Err(format_err!( + "failed to parse `{}` as EthereumBlockHandlerData", + s + )), + } + } +} + +impl From for String { + fn from(data: EthereumBlockHandlerData) -> String { + match data { + EthereumBlockHandlerData::FullBlockWithReceipts => "FullBlockWithReceipts".into(), + EthereumBlockHandlerData::FullBlock => "FullBlock".into(), + EthereumBlockHandlerData::Block => "Block".into(), + } + } +} + +impl From for Value { + fn from(data: EthereumBlockHandlerData) -> Value { + Value::String(data.into()) + } +} + +impl TryFromValue for EthereumBlockHandlerData { + fn try_from_value(value: &q::Value) -> Result { + match value { + q::Value::Enum(data) => EthereumBlockHandlerData::from_str(data), + _ => Err(format_err!( + "cannot parse value as EthereumBlockHandlerData: `{:?}`", + value + )), + } + } +} + #[derive(Debug)] pub struct EthereumCallHandlerEntity { pub function: String, diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 1dda564ee46..f9cd563dc85 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -77,11 +77,12 @@ pub mod prelude { BlockFinality, BlockStream, BlockStreamBuilder, BlockStreamEvent, BlockStreamMetrics, ChainHeadUpdate, ChainHeadUpdateListener, ChainHeadUpdateStream, EthereumAdapter, EthereumAdapterError, EthereumBlock, EthereumBlockData, EthereumBlockFilter, - EthereumBlockPointer, EthereumBlockTriggerType, EthereumBlockWithCalls, - EthereumBlockWithTriggers, EthereumCall, EthereumCallData, EthereumCallFilter, - EthereumContractCall, EthereumContractCallError, EthereumEventData, EthereumLogFilter, - EthereumNetworkIdentifier, EthereumTransactionData, EthereumTrigger, LightEthereumBlock, - LightEthereumBlockExt, ProviderEthRpcMetrics, SubgraphEthRpcMetrics, + EthereumBlockPointer, EthereumBlockTrigger, EthereumBlockTriggerType, EthereumBlockType, + EthereumBlockWithCalls, EthereumBlockWithTriggers, EthereumCall, EthereumCallData, + EthereumCallFilter, EthereumContractCall, EthereumContractCallError, EthereumEventData, + EthereumLogFilter, EthereumNetworkIdentifier, EthereumTransactionData, EthereumTrigger, + FullEthereumBlockData, LightEthereumBlock, LightEthereumBlockExt, ProviderEthRpcMetrics, + SubgraphEthRpcMetrics, }; pub use crate::components::graphql::{GraphQlRunner, SubscriptionResultFuture}; pub use crate::components::link_resolver::{JsonStreamValue, JsonValueStream, LinkResolver}; @@ -124,7 +125,9 @@ pub mod prelude { AssignmentEvent, Attribute, Entity, NodeId, SubgraphEntityPair, SubgraphVersionSummary, ToEntityId, ToEntityKey, TryIntoEntity, Value, ValueType, }; - pub use crate::data::subgraph::schema::{SubgraphDeploymentEntity, TypedEntity}; + pub use crate::data::subgraph::schema::{ + EthereumBlockHandlerData, SubgraphDeploymentEntity, TypedEntity, + }; pub use crate::data::subgraph::{ BlockHandlerFilter, CreateSubgraphResult, DataSource, DataSourceContext, DataSourceTemplate, Link, MappingABI, MappingBlockHandler, MappingCallHandler, diff --git a/runtime/wasm/src/asc_abi/asc_ptr.rs b/runtime/wasm/src/asc_abi/asc_ptr.rs index c3c62438249..c22821e3e4d 100644 --- a/runtime/wasm/src/asc_abi/asc_ptr.rs +++ b/runtime/wasm/src/asc_abi/asc_ptr.rs @@ -74,6 +74,15 @@ impl AscPtr { } } +impl From>> for AscPtr { + fn from(option: Option>) -> Self { + match option { + Some(ptr) => ptr, + None => AscPtr::null(), + } + } +} + impl From for AscPtr { fn from(ptr: u32) -> Self { AscPtr(ptr, PhantomData) diff --git a/runtime/wasm/src/asc_abi/class.rs b/runtime/wasm/src/asc_abi/class.rs index f548d3d8fdc..52ad5d8fe71 100644 --- a/runtime/wasm/src/asc_abi/class.rs +++ b/runtime/wasm/src/asc_abi/class.rs @@ -418,6 +418,46 @@ pub(crate) struct AscEthereumBlock { pub size: AscPtr, } +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscFullEthereumBlock { + pub hash: AscPtr, + pub parent_hash: AscPtr, + pub uncles_hash: AscPtr, + pub author: AscPtr, + pub state_root: AscPtr, + pub transactions_root: AscPtr, + pub receipts_root: AscPtr, + pub number: AscPtr, + pub gas_used: AscPtr, + pub gas_limit: AscPtr, + pub timestamp: AscPtr, + pub difficulty: AscPtr, + pub total_difficulty: AscPtr, + pub size: AscPtr, + pub transactions: AscPtr>>, +} + +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscFullEthereumBlockWithReceipts { + pub hash: AscPtr, + pub parent_hash: AscPtr, + pub uncles_hash: AscPtr, + pub author: AscPtr, + pub state_root: AscPtr, + pub transactions_root: AscPtr, + pub receipts_root: AscPtr, + pub number: AscPtr, + pub gas_used: AscPtr, + pub gas_limit: AscPtr, + pub timestamp: AscPtr, + pub difficulty: AscPtr, + pub total_difficulty: AscPtr, + pub size: AscPtr, + pub transaction_receipts: AscPtr>>, +} + #[repr(C)] #[derive(AscType)] pub(crate) struct AscEthereumTransaction { @@ -430,6 +470,24 @@ pub(crate) struct AscEthereumTransaction { pub gas_price: AscPtr, } +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscEthereumTransactionReceipt { + pub hash: AscPtr, + pub index: AscPtr, + pub cumulative_gas_used: AscPtr, + pub gas_used: AscPtr, + pub contract_address: AscPtr, + pub status: AscPtr, + + pub from: AscPtr, + pub to: AscPtr, + pub value: AscPtr, + pub gas_price: AscPtr, + pub gas: AscPtr, + pub input: AscPtr, +} + #[repr(C)] #[derive(AscType)] pub(crate) struct AscEthereumTransaction_0_0_2 { diff --git a/runtime/wasm/src/host.rs b/runtime/wasm/src/host.rs index 19683496df3..b98e573c263 100644 --- a/runtime/wasm/src/host.rs +++ b/runtime/wasm/src/host.rs @@ -386,7 +386,7 @@ impl RuntimeHost { state: BlockState, handler: &str, trigger: MappingTrigger, - block: &Arc, + block: &Arc, proof_of_indexing: SharedProofOfIndexing, ) -> Result { let trigger_type = trigger.as_static(); @@ -474,7 +474,7 @@ impl RuntimeHostTrait for RuntimeHost { async fn process_call( &self, logger: &Logger, - block: &Arc, + block: &Arc, transaction: &Arc, call: &Arc, state: BlockState, @@ -570,24 +570,43 @@ impl RuntimeHostTrait for RuntimeHost { async fn process_block( &self, logger: &Logger, - block: &Arc, - trigger_type: &EthereumBlockTriggerType, + block: &Arc, + trigger: &EthereumBlockTrigger, state: BlockState, proof_of_indexing: SharedProofOfIndexing, ) -> Result { - let block_handler = self.handler_for_block(trigger_type)?; + let block_handler = self.handler_for_block(&trigger.trigger_type)?; + let mapping_block: EthereumBlockType = match trigger.block_type { + BlockType::FullWithReceipts => match self + .host_exports + .ethereum_adapter + .load_full_block(logger, block.light_block().clone()) + .compat() + .await + { + Ok(block) => Ok(EthereumBlockType::FullWithReceipts(block)), + Err(e) => Err(anyhow::anyhow!( + "Failed to load full block: {}, error: {}", + &block.number().to_string(), + e + )), + }?, + BlockType::Full => EthereumBlockType::Full(block.light_block().clone()), + BlockType::Light => block.as_ref().clone(), + }; + self.send_mapping_request( logger, o! { - "hash" => block.hash.unwrap().to_string(), - "number" => &block.number.unwrap().to_string(), + "hash" => block.hash().to_string(), + "number" => &block.number().to_string(), }, state, &block_handler.handler, MappingTrigger::Block { handler: block_handler.clone(), }, - block, + &Arc::new(mapping_block), proof_of_indexing, ) .await @@ -596,7 +615,7 @@ impl RuntimeHostTrait for RuntimeHost { async fn process_log( &self, logger: &Logger, - block: &Arc, + block: &Arc, transaction: &Arc, log: &Arc, state: BlockState, diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index cea48e20320..6be9b25518c 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -34,7 +34,7 @@ pub(crate) struct HostExports { causality_region: String, templates: Arc>, abis: Vec, - ethereum_adapter: Arc, + pub(crate) ethereum_adapter: Arc, pub(crate) link_resolver: Arc, call_cache: Arc, store: Arc, diff --git a/runtime/wasm/src/mapping.rs b/runtime/wasm/src/mapping.rs index 50937843f84..b3985f0d313 100644 --- a/runtime/wasm/src/mapping.rs +++ b/runtime/wasm/src/mapping.rs @@ -143,7 +143,7 @@ pub struct MappingRequest { pub(crate) struct MappingContext { pub(crate) logger: Logger, pub(crate) host_exports: Arc, - pub(crate) block: Arc, + pub(crate) block: Arc, pub(crate) state: BlockState, pub(crate) proof_of_indexing: SharedProofOfIndexing, } diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index aba6f37668c..fdef3833c15 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -161,14 +161,26 @@ impl WasmInstance { mut self, handler_name: &str, ) -> Result { - let block = EthereumBlockData::from(self.instance_ctx().ctx.block.as_ref()); + let context = self.take_ctx(); - // Prepare an EthereumBlock for the WASM runtime - let arg = self.asc_new(&block); + // Prepare an Ethereum Block for the WASM runtime + let arg = match context.ctx.block.as_ref() { + EthereumBlockType::FullWithReceipts(block) => self + .asc_new::( + &FullEthereumBlockDataWithReceipts::try_from(block).unwrap(), + ) + .erase(), + EthereumBlockType::Full(block) => self + .asc_new::(&FullEthereumBlockData::from(block)) + .erase(), + EthereumBlockType::Light(light_block) => self + .asc_new::(&EthereumBlockData::from(light_block)) + .erase(), + }; self.invoke_handler(handler_name, arg)?; - Ok(self.take_ctx().ctx.state) + Ok(context.ctx.state) } pub(crate) fn take_ctx(&mut self) -> WasmInstanceContext { @@ -704,10 +716,11 @@ impl WasmInstanceContext { &mut self, call: UnresolvedContractCall, ) -> Result, Trap> { - let result = - self.ctx - .host_exports - .ethereum_call(&self.ctx.logger, &self.ctx.block, call)?; + let result = self.ctx.host_exports.ethereum_call( + &self.ctx.logger, + &LightEthereumBlock::from(self.ctx.block.as_ref()), + call, + )?; Ok(match result { Some(tokens) => self.asc_new(tokens.as_slice()), None => AscPtr::null(), diff --git a/runtime/wasm/src/to_from/external.rs b/runtime/wasm/src/to_from/external.rs index 4d0eab0d2eb..f501787b2f8 100644 --- a/runtime/wasm/src/to_from/external.rs +++ b/runtime/wasm/src/to_from/external.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use graph::components::ethereum::{ EthereumBlockData, EthereumCallData, EthereumEventData, EthereumTransactionData, + EthereumTransactionReceiptData, FullEthereumBlockData, FullEthereumBlockDataWithReceipts, }; use graph::data::store; use graph::prelude::anyhow::{ensure, Error}; @@ -319,7 +320,89 @@ impl ToAscObj for EthereumBlockData { size: self .size .map(|size| heap.asc_new(&BigInt::from_unsigned_u256(&size))) - .unwrap_or_else(|| AscPtr::null()), + .into(), + } + } +} + +impl ToAscObj for FullEthereumBlockData { + fn to_asc_obj(&self, heap: &mut H) -> AscFullEthereumBlock { + AscFullEthereumBlock { + hash: heap.asc_new(&self.hash), + parent_hash: heap.asc_new(&self.parent_hash), + uncles_hash: heap.asc_new(&self.uncles_hash), + author: heap.asc_new(&self.author), + state_root: heap.asc_new(&self.state_root), + transactions_root: heap.asc_new(&self.transactions_root), + receipts_root: heap.asc_new(&self.receipts_root), + number: heap.asc_new(&BigInt::from(self.number)), + gas_used: heap.asc_new(&BigInt::from_unsigned_u256(&self.gas_used)), + gas_limit: heap.asc_new(&BigInt::from_unsigned_u256(&self.gas_limit)), + timestamp: heap.asc_new(&BigInt::from_unsigned_u256(&self.timestamp)), + difficulty: heap.asc_new(&BigInt::from_unsigned_u256(&self.difficulty)), + total_difficulty: heap.asc_new(&BigInt::from_unsigned_u256(&self.total_difficulty)), + size: self + .size + .map(|size| heap.asc_new(&BigInt::from_unsigned_u256(&size))) + .into(), + transactions: heap.asc_new(self.transactions.as_slice()), + } + } +} + +impl ToAscObj for FullEthereumBlockDataWithReceipts { + fn to_asc_obj(&self, heap: &mut H) -> AscFullEthereumBlockWithReceipts { + AscFullEthereumBlockWithReceipts { + hash: heap.asc_new(&self.hash), + parent_hash: heap.asc_new(&self.parent_hash), + uncles_hash: heap.asc_new(&self.uncles_hash), + author: heap.asc_new(&self.author), + state_root: heap.asc_new(&self.state_root), + transactions_root: heap.asc_new(&self.transactions_root), + receipts_root: heap.asc_new(&self.receipts_root), + number: heap.asc_new(&BigInt::from(self.number)), + gas_used: heap.asc_new(&BigInt::from_unsigned_u256(&self.gas_used)), + gas_limit: heap.asc_new(&BigInt::from_unsigned_u256(&self.gas_limit)), + timestamp: heap.asc_new(&BigInt::from_unsigned_u256(&self.timestamp)), + difficulty: heap.asc_new(&BigInt::from_unsigned_u256(&self.difficulty)), + total_difficulty: heap.asc_new(&BigInt::from_unsigned_u256(&self.total_difficulty)), + size: self + .size + .map(|size| heap.asc_new(&BigInt::from_unsigned_u256(&size))) + .into(), + transaction_receipts: heap.asc_new(self.transaction_receipts.as_slice()), + } + } +} + +impl ToAscObj for EthereumTransactionReceiptData { + fn to_asc_obj(&self, heap: &mut H) -> AscEthereumTransactionReceipt { + AscEthereumTransactionReceipt { + //from receipts + hash: heap.asc_new(&self.hash), + index: heap.asc_new(&BigInt::from(self.index)), + cumulative_gas_used: heap + .asc_new(&BigInt::from_unsigned_u256(&self.cumulative_gas_used)), + gas_used: self + .gas_used + .map(|gas_used| heap.asc_new(&BigInt::from_unsigned_u256(&gas_used))) + .into(), + contract_address: self + .contract_address + .map(|contract_address| heap.asc_new(&contract_address)) + .into(), + status: self + .status + .map(|status| heap.asc_new(&BigInt::from(status))) + .into(), + + // // from txs + from: heap.asc_new(&self.from), + to: self.to.map(|to| heap.asc_new(&to)).into(), + value: heap.asc_new(&BigInt::from_unsigned_u256(&self.value)), + gas_price: heap.asc_new(&BigInt::from_unsigned_u256(&self.gas_price)), + gas: heap.asc_new(&BigInt::from_unsigned_u256(&self.gas)), + input: heap.asc_new(&*self.input.0), } } } @@ -330,10 +413,7 @@ impl ToAscObj for EthereumTransactionData { hash: heap.asc_new(&self.hash), index: heap.asc_new(&BigInt::from(self.index)), from: heap.asc_new(&self.from), - to: self - .to - .map(|to| heap.asc_new(&to)) - .unwrap_or_else(|| AscPtr::null()), + to: self.to.map(|to| heap.asc_new(&to)).into(), value: heap.asc_new(&BigInt::from_unsigned_u256(&self.value)), gas_used: heap.asc_new(&BigInt::from_unsigned_u256(&self.gas_used)), gas_price: heap.asc_new(&BigInt::from_unsigned_u256(&self.gas_price)), @@ -347,10 +427,7 @@ impl ToAscObj for EthereumTransactionData { hash: heap.asc_new(&self.hash), index: heap.asc_new(&BigInt::from(self.index)), from: heap.asc_new(&self.from), - to: self - .to - .map(|to| heap.asc_new(&to)) - .unwrap_or_else(|| AscPtr::null()), + to: self.to.map(|to| heap.asc_new(&to)).into(), value: heap.asc_new(&BigInt::from_unsigned_u256(&self.value)), gas_used: heap.asc_new(&BigInt::from_unsigned_u256(&self.gas_used)), gas_price: heap.asc_new(&BigInt::from_unsigned_u256(&self.gas_price)), @@ -373,7 +450,7 @@ where .log_type .clone() .map(|log_type| heap.asc_new(&log_type)) - .unwrap_or_else(|| AscPtr::null()), + .into(), block: heap.asc_new(&self.block), transaction: heap.asc_new::(&self.transaction), params: heap.asc_new(self.params.as_slice()),