Skip to content

Commit

Permalink
Implements indexer transaction RPC (#1154)
Browse files Browse the repository at this point in the history
* Implements transaction indexer RPC

* Verify and convert data types

* support query transaction indexer by multichain original address

* add indexer rpc integration test
  • Loading branch information
baichuan3 authored Nov 17, 2023
1 parent 8a7cbe0 commit 7f1d041
Show file tree
Hide file tree
Showing 25 changed files with 561 additions and 93 deletions.
15 changes: 0 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,6 @@ diesel = { version = "2.1.0", features = [
diesel-derive-enum = { version = "2.0.1", features = ["sqlite"] }
diesel_migrations = { version = "2.0.0" }
tap = "1.0.1"
backoff = { version = "0.4.0", features = [
"futures",
"futures-core",
"pin-project-lite",
"tokio",
"tokio_1",
] }
dotenvy = "0.15"
sized-chunks = { version = "0.6" }

Expand Down
1 change: 0 additions & 1 deletion crates/rooch-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ serde_json = { workspace = true }
regex = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
backoff = { workspace = true }
tokio = { workspace = true, features = ["full"] }
diesel = { workspace = true }
diesel-derive-enum = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ CREATE TABLE transactions (
transaction_type VARCHAR NOT NULL,
sequence_number BIGINT NOT NULL,
multichain_id BIGINT NOT NULL,
multichain_raw_address VARCHAR NOT NULL,
multichain_address VARCHAR NOT NULL,
multichain_original_address VARCHAR NOT NULL,
sender VARCHAR NOT NULL,
action VARCHAR NOT NULL,
action_type SMALLINT NOT NULL,
Expand Down
26 changes: 23 additions & 3 deletions crates/rooch-indexer/src/actor/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::actor::messages::{
IndexerEventsMessage, IndexerTransactionMessage, QueryIndexerEventsMessage,
QueryIndexerTransactionsMessage,
};
use crate::indexer_reader::IndexerReader;
use crate::store::traits::IndexerStoreTrait;
Expand All @@ -12,6 +13,7 @@ use anyhow::{anyhow, Result};
use async_trait::async_trait;
use coerce::actor::{context::ActorContext, message::Handler, Actor};
use rooch_types::indexer::event_filter::IndexerEvent;
use rooch_types::transaction::TransactionWithInfo;

pub struct IndexerActor {
indexer_store: IndexerStore,
Expand Down Expand Up @@ -45,9 +47,8 @@ impl Handler<IndexerTransactionMessage> for IndexerActor {

let indexed_transaction =
IndexedTransaction::new(transaction, sequence_info, execution_info, moveos_tx)?;
let _transactions = vec![indexed_transaction];
//TODO Open after supporting automatic creation of sqlite schema
// self.indexer_store.persist_transactions(transactions)?;
let transactions = vec![indexed_transaction];
self.indexer_store.persist_transactions(transactions)?;
Ok(())
}
}
Expand Down Expand Up @@ -78,6 +79,25 @@ impl Handler<IndexerEventsMessage> for IndexerActor {
}
}

#[async_trait]
impl Handler<QueryIndexerTransactionsMessage> for IndexerActor {
async fn handle(
&mut self,
msg: QueryIndexerTransactionsMessage,
_ctx: &mut ActorContext,
) -> Result<Vec<TransactionWithInfo>> {
let QueryIndexerTransactionsMessage {
filter,
cursor,
limit,
descending_order,
} = msg;
self.indexer_reader
.query_transactions_with_filter(filter, cursor, limit, descending_order)
.map_err(|e| anyhow!(format!("Failed to query indexer transactions: {:?}", e)))
}
}

#[async_trait]
impl Handler<QueryIndexerEventsMessage> for IndexerActor {
async fn handle(
Expand Down
17 changes: 16 additions & 1 deletion crates/rooch-indexer/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use coerce::actor::message::Message;
use moveos_types::moveos_std::event::Event;
use moveos_types::transaction::{TransactionExecutionInfo, VerifiedMoveOSTransaction};
use rooch_types::indexer::event_filter::{EventFilter, IndexerEvent, IndexerEventID};
use rooch_types::transaction::{TransactionSequenceInfo, TypedTransaction};
use rooch_types::indexer::transaction_filter::TransactionFilter;
use rooch_types::transaction::{TransactionSequenceInfo, TransactionWithInfo, TypedTransaction};
use serde::{Deserialize, Serialize};

/// Indexer Transaction write Message
Expand Down Expand Up @@ -35,6 +36,20 @@ impl Message for IndexerEventsMessage {
type Result = Result<()>;
}

/// Query Indexer Transactions Message
#[derive(Debug, Serialize, Deserialize)]
pub struct QueryIndexerTransactionsMessage {
pub filter: TransactionFilter,
// exclusive cursor if `Some`, otherwise start from the beginning
pub cursor: Option<u64>,
pub limit: usize,
pub descending_order: bool,
}

impl Message for QueryIndexerTransactionsMessage {
type Result = Result<Vec<TransactionWithInfo>>;
}

/// Query Indexer Events Message
#[derive(Debug, Serialize, Deserialize)]
pub struct QueryIndexerEventsMessage {
Expand Down
115 changes: 101 additions & 14 deletions crates/rooch-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,22 @@ use diesel::{
use std::ops::DerefMut;

use crate::models::events::StoredEvent;
use crate::schema::events;
use crate::schema::{events, transactions};
use rooch_types::indexer::event_filter::{EventFilter, IndexerEvent, IndexerEventID};
use rooch_types::indexer::transaction_filter::TransactionFilter;
use rooch_types::transaction::TransactionWithInfo;

pub const TX_ORDER_STR: &str = "tx_order";
pub const TX_HASH_STR: &str = "tx_hash";
pub const TX_SENDER_STR: &str = "sender";
pub const CREATED_AT_STR: &str = "created_at";

pub const TRANSACTION_ORIGINAL_ADDRESS_STR: &str = "multichain_original_address";

pub const EVENT_HANDLE_ID_STR: &str = "event_handle_id";
pub const EVENT_INDEX_STR: &str = "event_index";
pub const EVENT_SEQ_STR: &str = "event_seq";
pub const TX_ORDER_STR: &str = "tx_order";
pub const TX_HASH_STR: &str = "tx_hash";
pub const EVENT_SENDER_STR: &str = "sender";
pub const EVENT_TYPE_STR: &str = "event_type";
pub const EVENT_CREATED_TIME_STR: &str = "created_time";

#[derive(Clone)]
pub(crate) struct InnerIndexerReader {
Expand Down Expand Up @@ -100,14 +104,97 @@ impl IndexerReader {
})
}

pub fn stored_transaction_to_transaction_block(
pub fn query_transactions_with_filter(
&self,
stored_transactions: Vec<StoredTransaction>,
filter: TransactionFilter,
cursor: Option<u64>,
limit: usize,
descending_order: bool,
) -> IndexerResult<Vec<TransactionWithInfo>> {
stored_transactions
let tx_order = if let Some(cursor) = cursor {
cursor as i64
} else if descending_order {
let max_tx_order: i64 = self.inner_indexer_reader.run_query(|conn| {
transactions::dsl::transactions
.select(transactions::tx_order)
.order_by(transactions::tx_order.desc())
.first::<i64>(conn)
})?;
max_tx_order + 1
} else {
-1
};

let main_where_clause = match filter {
TransactionFilter::Sender(sender) => {
format!("{TX_SENDER_STR} = \"{}\"", sender.to_hex_literal())
}
TransactionFilter::OriginalAddress(address) => {
format!("{TRANSACTION_ORIGINAL_ADDRESS_STR} = \"{}\"", address)
}
TransactionFilter::TxHashes(tx_hashes) => {
let in_tx_hashes_str: String = tx_hashes
.iter()
.map(|tx_hash| format!("\"{:?}\"", tx_hash))
.collect::<Vec<String>>()
.join(",");
format!("{TX_HASH_STR} in ({})", in_tx_hashes_str)
}
TransactionFilter::TimeRange {
start_time,
end_time,
} => {
format!(
"({CREATED_AT_STR} >= {} AND {CREATED_AT_STR} < {})",
start_time, end_time
)
}
TransactionFilter::TxOrderRange {
from_order,
to_order,
} => {
format!(
"({TX_ORDER_STR} >= {} AND {TX_ORDER_STR} < {})",
from_order, to_order
)
}
};

let cursor_clause = if descending_order {
format!("AND ({TX_ORDER_STR} < {})", tx_order)
} else {
format!("AND ({TX_ORDER_STR} > {})", tx_order)
};
let order_clause = if descending_order {
format!("{TX_ORDER_STR} DESC")
} else {
format!("{TX_ORDER_STR} ASC")
};

let query = format!(
"
SELECT * FROM transactions \
WHERE {} {} \
ORDER BY {} \
LIMIT {}
",
main_where_clause, cursor_clause, order_clause, limit,
);

tracing::debug!("query transactions: {}", query);
let stored_transactions = self
.inner_indexer_reader
.run_query(|conn| diesel::sql_query(query).load::<StoredTransaction>(conn))?;

let result = stored_transactions
.into_iter()
.map(|stored_transaction| stored_transaction.try_into_transaction_with_info())
.collect::<IndexerResult<Vec<_>>>()
.map(|t| t.try_into_transaction_with_info())
.collect::<Result<Vec<_>>>()
.map_err(|e| {
IndexerError::SQLiteReadError(format!("Cast indexer transactions failed: {:?}", e))
})?;

Ok(result)
}

pub fn query_events_with_filter(
Expand All @@ -122,7 +209,7 @@ impl IndexerReader {
tx_order,
event_index,
} = cursor;
(tx_order as i128, event_index as i64)
(tx_order as i64, event_index as i64)
} else if descending_order {
let (max_tx_order, event_index): (i64, i64) =
self.inner_indexer_reader.run_query(|conn| {
Expand All @@ -131,7 +218,7 @@ impl IndexerReader {
.order_by((events::tx_order.desc(), events::event_index.desc()))
.first::<(i64, i64)>(conn)
})?;
((max_tx_order as i128) + 1, event_index)
(max_tx_order + 1, event_index)
} else {
(-1, 0)
};
Expand All @@ -142,7 +229,7 @@ impl IndexerReader {
format!("{EVENT_TYPE_STR} = \"{}\"", event_type_str)
}
EventFilter::Sender(sender) => {
format!("{EVENT_SENDER_STR} = \"{}\"", sender.to_hex_literal())
format!("{TX_SENDER_STR} = \"{}\"", sender.to_hex_literal())
}
EventFilter::TxHash(tx_hash) => {
let tx_hash_str = format!("{:?}", tx_hash);
Expand All @@ -153,7 +240,7 @@ impl IndexerReader {
end_time,
} => {
format!(
"({EVENT_CREATED_TIME_STR} >= {} AND {EVENT_CREATED_TIME_STR} < {})",
"({CREATED_AT_STR} >= {} AND {CREATED_AT_STR} < {})",
start_time, end_time
)
}
Expand Down
Loading

0 comments on commit 7f1d041

Please sign in to comment.