Skip to content

Commit

Permalink
optimize. (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenduckgo authored Jul 3, 2024
1 parent 6a28f4b commit ee69e71
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 24 deletions.
15 changes: 15 additions & 0 deletions scanner/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ create table if not exists evm_last_height(
primary key(tip)
);

create table if not exists evm_blocks(
block_id varchar(66) not null,
block_num bigint not null,
tm timestamp not null,
primary key(block_id)
);

create table if not exists evm_txs(
tx_id varchar(66) not null,
block_id varchar(66) not null,
block_num bigint not null,
tm timestamp not null,
primary key(tx_id)
);

create table if not exists evm_stakes(
tx_id varchar(66) not null,
block_id varchar(66) not null,
Expand Down
32 changes: 32 additions & 0 deletions scanner/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,38 @@ impl Storage {
Ok(height as u64)
}

pub async fn upsert_block(
&self,
block_id: &str,
block_num: i64,
tm: NaiveDateTime,
) -> Result<()> {
sqlx::query("insert into evm_blocks values($1,$2,$3) on conflict(block_id) do update set block_num=$2,tm=$3")
.bind(block_id)
.bind(block_num)
.bind(tm)
.execute(&self.pool)
.await?;
Ok(())
}

pub async fn upsert_tx(
&self,
tx_id: &str,
block_id: &str,
block_num: i64,
tm: NaiveDateTime,
) -> Result<()> {
sqlx::query("insert into evm_txs values($1,$2,$3,$4) on conflict(tx_id) do update set block_id=$2,block_num=$3,tm=$4")
.bind(tx_id)
.bind(block_id)
.bind(block_num)
.bind(tm)
.execute(&self.pool)
.await?;
Ok(())
}

pub async fn upsert_tip(&self, height: i64) -> Result<()> {
sqlx::query(
"insert into evm_last_height values($1,$2) on conflict(tip) do update set height=$2",
Expand Down
70 changes: 46 additions & 24 deletions scanner/src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use crate::error::Result;
use crate::error::ScannerError;
use crossbeam::channel::bounded;
use ethers::contract::{parse_log, EthEvent};
use ethers::prelude::Middleware;
use ethers::prelude::{Middleware, TransactionReceipt};
use ethers::providers::{Http, Provider};
use ethers::types::Bytes;
use ethers::types::U256;
use ethers::types::{Address, Block, Transaction};
use ethers::types::{Bytes, TxHash};
use ethers::utils::hex::encode_prefixed;
use log::{debug, error, info};
use reqwest::{Client, ClientBuilder, Url};
Expand Down Expand Up @@ -47,28 +47,29 @@ pub struct FindoraRPC {
}

#[derive(Debug, Serialize, Deserialize)]
struct GetBlockByNumerRequest {
struct EthRpcRequest<T> {
pub id: i32,
pub jsonrpc: String,
pub method: String,
pub params: (String, bool),
pub params: T,
}
impl GetBlockByNumerRequest {
pub fn new(block_num: u64, full: bool) -> Self {
GetBlockByNumerRequest {

impl<T> EthRpcRequest<T> {
pub fn body(method: &str, params: T) -> Self {
EthRpcRequest {
id: 1,
jsonrpc: "2.0".into(),
method: "eth_getBlockByNumber".into(),
params: (format!("0x{:x}", block_num), full),
method: method.to_string(),
params,
}
}
}

#[derive(Debug, Serialize, Deserialize)]
struct GetBlockByNumberResponse {
struct GetBlockByNumberResponse<T> {
pub jsonrpc: String,
pub id: i32,
pub result: Option<Block<Transaction>>,
pub result: Option<T>,
}

impl FindoraRPC {
Expand All @@ -77,13 +78,25 @@ impl FindoraRPC {
FindoraRPC { url, client }
}

pub async fn get_block_by_number(&self, block_num: u64) -> Result<Option<Block<Transaction>>> {
let req = GetBlockByNumerRequest::new(block_num, true);
pub async fn get_block_by_number(&self, block_num: u64) -> Result<Option<Block<TxHash>>> {
let params = (format!("0x{:x}", block_num), false);
let req = EthRpcRequest::body("eth_getBlockByNumber", params);
let resp = self.client.post(self.url.clone()).json(&req).send().await?;
let block = resp.json::<GetBlockByNumberResponse>().await?;

let block = resp
.json::<GetBlockByNumberResponse<Block<TxHash>>>()
.await?;
Ok(block.result)
}

pub async fn get_transaction_receipt(&self, tx: TxHash) -> Result<Option<TransactionReceipt>> {
let params = vec![tx];
let req = EthRpcRequest::body("eth_getTransactionReceipt", params);
let resp = self.client.post(self.url.clone()).json(&req).send().await?;
let receipt = resp
.json::<GetBlockByNumberResponse<TransactionReceipt>>()
.await?;
Ok(receipt.result)
}
}

pub struct RpcCaller {
Expand Down Expand Up @@ -453,7 +466,7 @@ impl Scanner {
info!("Fast syncing complete.");
loop {
if let Ok(h) = self.caller.storage.get_tip().await {
height = h as u64 + 1;
height = h + 1;
}

match self.caller.get_block_retried(height).await {
Expand All @@ -468,8 +481,7 @@ impl Scanner {
error!("Get block {} error: {:?}", height, e);
}
}

tokio::time::sleep(interval).await;
//tokio::time::sleep(interval).await;
}
}
}
Expand Down Expand Up @@ -504,12 +516,22 @@ mod tests {

#[tokio::test]
async fn test_rpc() -> Result<()> {
// let url: Url = Url::parse("https://prod-mainnet.prod.findora.org:8545/").unwrap();
// let rpc = FindoraRPC::new(Duration::from_secs(60), url);
// let block = rpc.get_block_by_number(5000000).await?;
// if let Some(b) = block {
// println!("{:?}", b);
// }
let url: Url = Url::parse("https://rpc-mainnet.findora.org/").unwrap();
//let url: Url = Url::parse("https://rpc-mainnet.findora.org/").unwrap();
let rpc = FindoraRPC::new(Duration::from_secs(120), url);
let block = rpc.get_block_by_number(4636000).await?;

if let Some(b) = block {
println!("txs_count: {}", b.transactions.len());
println!("{:?}", b);
// for tx in b.transactions {
// let receipt = rpc.get_transaction_receipt(tx).await?;
// println!("{:?}", tx);
// if let Some(r) = receipt {
// println!("{:?}", r);
// }
// }
}
Ok(())
}
}

0 comments on commit ee69e71

Please sign in to comment.