From 0a386dbb7a8f14ce5cf55108c2dbb2aec973206b Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 19 Nov 2024 23:25:46 +0800 Subject: [PATCH 01/17] fix get_feed_active api resp --- lib/ain-ocean/src/api/prices.rs | 36 ++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 11fc582d30..a0635ea185 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -24,7 +24,7 @@ use super::{ use crate::{ error::{ApiError, OtherSnafu}, model::{ - BlockContext, OracleIntervalSeconds, OraclePriceActive, OraclePriceActiveNext, + BlockContext, OracleIntervalSeconds, OraclePriceActive, OraclePriceActiveNextOracles, OraclePriceAggregatedIntervalAggregated, PriceTicker, }, storage::{RepositoryOps, SortOrder}, @@ -233,14 +233,22 @@ async fn get_feed( )) } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OraclePriceActiveNextResponse { + pub amount: String, // convert to logical amount + pub weightage: Decimal, + pub oracles: OraclePriceActiveNextOraclesResponse, +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct OraclePriceActiveResponse { pub id: String, // token-currency-height pub key: String, // token-currency pub sort: String, // height - pub active: Option, - pub next: Option, + pub active: Option, + pub next: Option, pub is_live: bool, pub block: BlockContext, } @@ -251,8 +259,26 @@ impl OraclePriceActiveResponse { id: format!("{}-{}-{}", token, currency, v.block.height), key: format!("{}-{}", token, currency), sort: hex::encode(v.block.height.to_be_bytes()).to_string(), - active: v.active, - next: v.next, + active: v.active.map(|active| { + OraclePriceActiveNextResponse { + amount: format!("{:.8}", active.amount / Decimal::from(COIN)), + weightage: active.weightage, + oracles: OraclePriceActiveNextOraclesResponse { + active: active.oracles.active.to_i32().unwrap_or_default(), + total: active.oracles.total, + } + } + }), + next: v.next.map(|next| { + OraclePriceActiveNextResponse { + amount: format!("{:.8}", next.amount / Decimal::from(COIN)), + weightage: next.weightage, + oracles: OraclePriceActiveNextOraclesResponse { + active: next.oracles.active.to_i32().unwrap_or_default(), + total: next.oracles.total, + } + } + }), is_live: v.is_live, block: v.block, } From 1d028042314962f902bedab1ad25cf98e9ebeaff Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 19 Nov 2024 23:30:41 +0800 Subject: [PATCH 02/17] fix OraclePriceAggregatedId & OraclePriceFeedId sort key --- lib/ain-ocean/src/api/oracle.rs | 6 +++--- lib/ain-ocean/src/api/prices.rs | 8 +++++--- lib/ain-ocean/src/indexer/loan_token.rs | 3 ++- lib/ain-ocean/src/indexer/oracle.rs | 9 ++++++--- lib/ain-ocean/src/model/oracle_price_aggregated.rs | 2 +- lib/ain-ocean/src/model/oracle_price_feed.rs | 2 +- 6 files changed, 18 insertions(+), 12 deletions(-) diff --git a/lib/ain-ocean/src/api/oracle.rs b/lib/ain-ocean/src/api/oracle.rs index f1f42dd06c..e9b7c2e6a1 100644 --- a/lib/ain-ocean/src/api/oracle.rs +++ b/lib/ain-ocean/src/api/oracle.rs @@ -103,15 +103,15 @@ async fn get_feed( .list(None, SortOrder::Descending)? .paginate(&query) .flatten() - .filter(|((token, currency, oracle_id, _), _)| { + .filter(|((token, currency, oracle_id, _, _), _)| { key.0.eq(token) && key.1.eq(currency) && key.2.eq(oracle_id) }) - .map(|((token, currency, oracle_id, txid), feed)| { + .map(|((token, currency, oracle_id, height, txid), feed)| { let amount = Decimal::from(feed.amount) / Decimal::from(COIN); OraclePriceFeedResponse { id: format!("{}-{}-{}-{}", token, currency, oracle_id, txid), key: format!("{}-{}-{}", token, currency, oracle_id), - sort: hex::encode(feed.block.height.to_string() + &txid.to_string()), + sort: hex::encode(height.to_string() + &txid.to_string()), token, currency, oracle_id, diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index a0635ea185..04952df6ef 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -191,7 +191,7 @@ async fn get_feed( let (token, currency) = parse_token_currency(&key)?; let repo = &ctx.services.oracle_price_aggregated; - let id = (token.to_string(), currency.to_string(), u32::MAX); + let id = (token.to_string(), currency.to_string(), i64::MAX, u32::MAX); let oracle_aggregated = repo .by_id .list(Some(id), SortOrder::Descending)? @@ -467,6 +467,7 @@ async fn list_price_oracles( token.clone(), currency.clone(), oracle_id, + u32::MAX, Txid::from_byte_array([0xffu8; 32]), )), SortOrder::Descending, @@ -490,11 +491,12 @@ async fn list_price_oracles( let token = id.0; let currency = id.1; let oracle_id = id.2; - let txid = id.3; + let height = id.3; + let txid = id.4; OraclePriceFeedResponse { id: format!("{}-{}-{}-{}", token, currency, oracle_id, txid), key: format!("{}-{}-{}", token, currency, oracle_id), - sort: hex::encode(f.block.height.to_string() + &txid.to_string()), + sort: hex::encode(height.to_string() + &txid.to_string()), token: token.clone(), currency: currency.clone(), oracle_id, diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index 3ae78b1e21..afbafd8db0 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -169,7 +169,7 @@ pub fn perform_active_price_tick( ticker_id: (Token, Currency), block: &BlockContext, ) -> Result<()> { - let id = (ticker_id.0, ticker_id.1, u32::MAX); + let id = (ticker_id.0.clone(), ticker_id.1.clone(), i64::MAX, u32::MAX); let prev = services .oracle_price_aggregated @@ -182,6 +182,7 @@ pub fn perform_active_price_tick( return Ok(()); }; + let id = (ticker_id.0, ticker_id.1, u32::MAX); let repo = &services.oracle_price_active; let prev = repo .by_id diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 461fc8c97e..447a58ca40 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -301,7 +301,7 @@ fn map_price_aggregated( let feed = services .oracle_price_feed .by_id - .list(Some((id.0, id.1, id.2, base_id)), SortOrder::Descending)? + .list(Some((id.0, id.1, id.2, u32::MAX, base_id)), SortOrder::Descending)? .next() .transpose()?; @@ -372,6 +372,7 @@ fn index_set_oracle_data( let id = ( token.clone(), currency.clone(), + price_aggregated.block.median_time, price_aggregated.block.height, ); oracle_repo.by_id.put(&id, &price_aggregated)?; @@ -402,6 +403,7 @@ fn index_set_oracle_data_interval( let aggregated = services.oracle_price_aggregated.by_id.get(&( token.clone(), currency.clone(), + context.block.median_time, context.block.height, ))?; @@ -447,8 +449,8 @@ impl Index for SetOracleData { let feeds = map_price_feeds(self, context); - for ((token, currency, _, _), _) in feeds.iter().rev() { - let id = (token.clone(), currency.clone(), context.block.height); + for ((token, currency, _, _, _), _) in feeds.iter().rev() { + let id = (token.clone(), currency.clone(), context.block.median_time, context.block.height); let aggregated = oracle_repo.by_id.get(&id)?; @@ -485,6 +487,7 @@ fn map_price_feeds( token_price.token.clone(), token_amount.currency.clone(), data.oracle_id, + ctx.block.height, ctx.tx.txid, ); diff --git a/lib/ain-ocean/src/model/oracle_price_aggregated.rs b/lib/ain-ocean/src/model/oracle_price_aggregated.rs index 0ee554ed8f..65832b95ba 100644 --- a/lib/ain-ocean/src/model/oracle_price_aggregated.rs +++ b/lib/ain-ocean/src/model/oracle_price_aggregated.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use super::{BlockContext, OraclePriceActiveNext}; -pub type OraclePriceAggregatedId = (String, String, u32); //token-currency-height +pub type OraclePriceAggregatedId = (String, String, i64, u32); //token-currency-mediantime-height #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/model/oracle_price_feed.rs b/lib/ain-ocean/src/model/oracle_price_feed.rs index 0afcc65614..4a2ff073a3 100644 --- a/lib/ain-ocean/src/model/oracle_price_feed.rs +++ b/lib/ain-ocean/src/model/oracle_price_feed.rs @@ -2,7 +2,7 @@ use bitcoin::Txid; use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type OraclePriceFeedId = (String, String, Txid, Txid); // token-currency-oracle_id-txid +pub type OraclePriceFeedId = (String, String, Txid, u32, Txid); // token-currency-oracle_id-height-txid #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] From 3caee4d64d056da6d9423362cedf3a766a188ddc Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 19 Nov 2024 23:33:32 +0800 Subject: [PATCH 03/17] fix index_block_start and index_block_end --- lib/ain-ocean/src/indexer/loan_token.rs | 10 --- lib/ain-ocean/src/indexer/mod.rs | 58 +++++--------- lib/ain-ocean/src/indexer/poolswap.rs | 102 ++++++++++++------------ 3 files changed, 72 insertions(+), 98 deletions(-) diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index afbafd8db0..85c7aa7462 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -32,16 +32,6 @@ impl Index for SetLoanToken { } } -impl IndexBlockEnd for SetLoanToken { - fn index_block_end(self, services: &Arc, block: &BlockContext) -> Result<()> { - index_active_price(services, block) - } - - fn invalidate_block_end(self, services: &Arc, block: &BlockContext) -> Result<()> { - invalidate_active_price(services, block) - } -} - fn is_aggregate_valid(aggregate: &OraclePriceAggregated, block: &BlockContext) -> bool { if (aggregate.block.time - block.time).abs() >= 3600 { return false; diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index eea99999f9..41523133fc 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -19,6 +19,8 @@ use defichain_rpc::json::blockchain::{Block, Transaction, Vin, VinStandard, Vout use helper::check_if_evm_tx; use log::trace; pub use poolswap::PoolSwapAggregatedInterval; +use loan_token::{index_active_price, invalidate_active_price}; +use poolswap::{index_pool_swap_aggregated, invalidate_pool_swap_aggregated}; use crate::{ error::{Error, IndexAction}, @@ -41,18 +43,6 @@ pub trait Index { fn invalidate(&self, services: &Arc, ctx: &Context) -> Result<()>; } -pub trait IndexBlockStart: Index { - fn index_block_start(self, services: &Arc, block: &BlockContext) -> Result<()>; - - fn invalidate_block_start(self, services: &Arc, block: &BlockContext) -> Result<()>; -} - -pub trait IndexBlockEnd: Index { - fn index_block_end(self, services: &Arc, block: &BlockContext) -> Result<()>; - - fn invalidate_block_end(self, services: &Arc, block: &BlockContext) -> Result<()>; -} - #[derive(Debug)] pub struct Context { block: BlockContext, @@ -538,6 +528,22 @@ pub fn get_block_height(services: &Arc) -> Result { .map_or(0, |block| block.height)) } +pub fn index_block_start(services: &Arc, block: &BlockContext) -> Result<()> { + index_pool_swap_aggregated(services, block) +} + +pub fn invalidate_block_start(services: &Arc, block: &BlockContext) -> Result<()> { + invalidate_pool_swap_aggregated(services, block) +} + +pub fn index_block_end(services: &Arc, block: &BlockContext) -> Result<()> { + index_active_price(services, block) +} + +pub fn invalidate_block_end(services: &Arc, block: &BlockContext) -> Result<()> { + invalidate_active_price(services, block) +} + pub fn index_block(services: &Arc, block: Block) -> Result<()> { trace!("[index_block] Indexing block..."); let start = Instant::now(); @@ -586,12 +592,7 @@ pub fn index_block(services: &Arc, block: Block) -> Resul } } - // index_block_start - for (dftx, _) in &dftxs { - if let DfTx::PoolSwap(data) = dftx.clone() { - data.index_block_start(services, &block_ctx)? - } - } + index_block_start(services, &block_ctx)?; // index_dftx for (dftx, ctx) in &dftxs { @@ -615,12 +616,7 @@ pub fn index_block(services: &Arc, block: Block) -> Resul log_elapsed(start, "Indexed dftx"); } - // index_block_end - for (dftx, _) in dftxs { - if let DfTx::SetLoanToken(data) = dftx { - data.index_block_end(services, &block_ctx)? - } - } + index_block_end(services, &block_ctx)?; let block_mapper = BlockMapper { hash: block_hash, @@ -699,12 +695,7 @@ pub fn invalidate_block(services: &Arc, block: Block) -> } } - // invalidate_block_end - for (dftx, _) in &dftxs { - if let DfTx::SetLoanToken(data) = dftx.clone() { - data.invalidate_block_end(services, &block_ctx)? - } - } + invalidate_block_end(services, &block_ctx)?; // invalidate_dftx for (dftx, ctx) in &dftxs { @@ -727,12 +718,7 @@ pub fn invalidate_block(services: &Arc, block: Block) -> log_elapsed(start, "Invalidate dftx"); } - // invalidate_block_start - for (dftx, _) in &dftxs { - if let DfTx::PoolSwap(data) = dftx.clone() { - data.invalidate_block_start(services, &block_ctx)? - } - } + invalidate_block_start(services, &block_ctx)?; // invalidate_block services.block.by_height.delete(&block.height)?; diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index bc6bc8ad05..bd78364116 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -182,67 +182,65 @@ fn create_new_bucket( Ok(()) } -impl IndexBlockStart for PoolSwap { - fn index_block_start(self, services: &Arc, block: &BlockContext) -> Result<()> { - let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); - pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); - - for interval in AGGREGATED_INTERVALS { - for pool_pair in &pool_pairs { - let repo = &services.pool_swap_aggregated; - - let prev = repo - .by_key - .list( - Some((pool_pair.id, interval, i64::MAX)), - SortOrder::Descending, - )? - .take_while(|item| match item { - Ok((k, _)) => k.0 == pool_pair.id && k.1 == interval, - _ => true, - }) - .next() - .transpose()?; - - let bucket = block.median_time - (block.median_time % interval as i64); - - let Some((_, prev_id)) = prev else { - create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; - continue; - }; +pub fn index_pool_swap_aggregated(services: &Arc, block: &BlockContext) -> Result<()> { + let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); + pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); - let Some(prev) = repo.by_id.get(&prev_id)? else { - create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; - continue; - }; - - if prev.bucket >= bucket { - break; - } + for interval in AGGREGATED_INTERVALS { + for pool_pair in &pool_pairs { + let repo = &services.pool_swap_aggregated; + + let prev = repo + .by_key + .list( + Some((pool_pair.id, interval, i64::MAX)), + SortOrder::Descending, + )? + .take_while(|item| match item { + Ok((k, _)) => k.0 == pool_pair.id && k.1 == interval, + _ => true, + }) + .next() + .transpose()?; + + let bucket = block.median_time - (block.median_time % interval as i64); + + let Some((_, prev_id)) = prev else { + create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; + continue; + }; + let Some(prev) = repo.by_id.get(&prev_id)? else { create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; + continue; + }; + + if prev.bucket >= bucket { + break; } - } - Ok(()) + create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; + } } - fn invalidate_block_start(self, services: &Arc, block: &BlockContext) -> Result<()> { - let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); - pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); - - for interval in AGGREGATED_INTERVALS.into_iter().rev() { - for pool_pair in pool_pairs.iter().rev() { - let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash); - services - .pool_swap_aggregated - .by_id - .delete(&pool_swap_aggregated_id)?; - } - } + Ok(()) +} - Ok(()) +pub fn invalidate_pool_swap_aggregated(services: &Arc, block: &BlockContext) -> Result<()> { + let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); + pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); + + for interval in AGGREGATED_INTERVALS.into_iter().rev() { + for pool_pair in pool_pairs.iter().rev() { + let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash); + services + .pool_swap_aggregated + .by_id + .delete(&pool_swap_aggregated_id)?; + } } + + Ok(()) } impl Index for PoolSwap { From 642840485d19938ae1505ffea4fb5204761d588f Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 19 Nov 2024 23:40:40 +0800 Subject: [PATCH 04/17] clippy --- lib/ain-ocean/src/api/prices.rs | 2 +- lib/ain-ocean/src/indexer/loan_token.rs | 2 +- lib/ain-ocean/src/indexer/poolswap.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 04952df6ef..1879aa8700 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -24,7 +24,7 @@ use super::{ use crate::{ error::{ApiError, OtherSnafu}, model::{ - BlockContext, OracleIntervalSeconds, OraclePriceActive, OraclePriceActiveNextOracles, + BlockContext, OracleIntervalSeconds, OraclePriceActive, OraclePriceAggregatedIntervalAggregated, PriceTicker, }, storage::{RepositoryOps, SortOrder}, diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index 85c7aa7462..8fa47615d5 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -6,7 +6,7 @@ use rust_decimal::{prelude::Zero, Decimal}; use rust_decimal_macros::dec; use crate::{ - indexer::{Context, Index, IndexBlockEnd, Result}, + indexer::{Context, Index, Result}, model::{BlockContext, OraclePriceActive, OraclePriceActiveNext, OraclePriceAggregated}, network::Network, storage::{RepositoryOps, SortOrder}, diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index bd78364116..e01088cac3 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -9,7 +9,7 @@ use rust_decimal::Decimal; use rust_decimal_macros::dec; use snafu::OptionExt; -use super::{Context, IndexBlockStart}; +use super::Context; use crate::{ error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, NotFoundKind}, indexer::{tx_result, Index, Result}, From 8f27b973e3c2b6d4bf9fa4d4d4ebaa49fa244c75 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 19 Nov 2024 23:40:48 +0800 Subject: [PATCH 05/17] fmt --- lib/ain-ocean/src/api/prices.rs | 32 ++++++++++++--------------- lib/ain-ocean/src/indexer/mod.rs | 2 +- lib/ain-ocean/src/indexer/oracle.rs | 12 ++++++++-- lib/ain-ocean/src/indexer/poolswap.rs | 5 ++++- 4 files changed, 29 insertions(+), 22 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 1879aa8700..d4bd3ed909 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -259,25 +259,21 @@ impl OraclePriceActiveResponse { id: format!("{}-{}-{}", token, currency, v.block.height), key: format!("{}-{}", token, currency), sort: hex::encode(v.block.height.to_be_bytes()).to_string(), - active: v.active.map(|active| { - OraclePriceActiveNextResponse { - amount: format!("{:.8}", active.amount / Decimal::from(COIN)), - weightage: active.weightage, - oracles: OraclePriceActiveNextOraclesResponse { - active: active.oracles.active.to_i32().unwrap_or_default(), - total: active.oracles.total, - } - } + active: v.active.map(|active| OraclePriceActiveNextResponse { + amount: format!("{:.8}", active.amount / Decimal::from(COIN)), + weightage: active.weightage, + oracles: OraclePriceActiveNextOraclesResponse { + active: active.oracles.active.to_i32().unwrap_or_default(), + total: active.oracles.total, + }, }), - next: v.next.map(|next| { - OraclePriceActiveNextResponse { - amount: format!("{:.8}", next.amount / Decimal::from(COIN)), - weightage: next.weightage, - oracles: OraclePriceActiveNextOraclesResponse { - active: next.oracles.active.to_i32().unwrap_or_default(), - total: next.oracles.total, - } - } + next: v.next.map(|next| OraclePriceActiveNextResponse { + amount: format!("{:.8}", next.amount / Decimal::from(COIN)), + weightage: next.weightage, + oracles: OraclePriceActiveNextOraclesResponse { + active: next.oracles.active.to_i32().unwrap_or_default(), + total: next.oracles.total, + }, }), is_live: v.is_live, block: v.block, diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index 41523133fc..725ec86808 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -17,9 +17,9 @@ use std::{ use ain_dftx::{deserialize, is_skipped_tx, DfTx, Stack}; use defichain_rpc::json::blockchain::{Block, Transaction, Vin, VinStandard, Vout}; use helper::check_if_evm_tx; +use loan_token::{index_active_price, invalidate_active_price}; use log::trace; pub use poolswap::PoolSwapAggregatedInterval; -use loan_token::{index_active_price, invalidate_active_price}; use poolswap::{index_pool_swap_aggregated, invalidate_pool_swap_aggregated}; use crate::{ diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 447a58ca40..ab93c4b236 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -301,7 +301,10 @@ fn map_price_aggregated( let feed = services .oracle_price_feed .by_id - .list(Some((id.0, id.1, id.2, u32::MAX, base_id)), SortOrder::Descending)? + .list( + Some((id.0, id.1, id.2, u32::MAX, base_id)), + SortOrder::Descending, + )? .next() .transpose()?; @@ -450,7 +453,12 @@ impl Index for SetOracleData { let feeds = map_price_feeds(self, context); for ((token, currency, _, _, _), _) in feeds.iter().rev() { - let id = (token.clone(), currency.clone(), context.block.median_time, context.block.height); + let id = ( + token.clone(), + currency.clone(), + context.block.median_time, + context.block.height, + ); let aggregated = oracle_repo.by_id.get(&id)?; diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index e01088cac3..e9ed559528 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -226,7 +226,10 @@ pub fn index_pool_swap_aggregated(services: &Arc, block: &BlockContext Ok(()) } -pub fn invalidate_pool_swap_aggregated(services: &Arc, block: &BlockContext) -> Result<()> { +pub fn invalidate_pool_swap_aggregated( + services: &Arc, + block: &BlockContext, +) -> Result<()> { let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); From 50d407eb3583b90442cf6818cd25334ab50ebb78 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 21 Nov 2024 15:30:18 +0800 Subject: [PATCH 06/17] add missing pagination --- lib/ain-ocean/src/api/prices.rs | 54 +++++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 6 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index d4bd3ed909..303a3a620f 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{str::FromStr, sync::Arc}; use ain_dftx::{Currency, Token, Weightage, COIN}; use ain_macros::ocean_endpoint; @@ -22,7 +22,7 @@ use super::{ AppContext, }; use crate::{ - error::{ApiError, OtherSnafu}, + error::{ApiError, Error, OtherSnafu}, model::{ BlockContext, OracleIntervalSeconds, OraclePriceActive, OraclePriceAggregatedIntervalAggregated, PriceTicker, @@ -189,9 +189,22 @@ async fn get_feed( Extension(ctx): Extension>, ) -> Result> { let (token, currency) = parse_token_currency(&key)?; + let next = query + .next + .map(|q| { + let median_time = &q[..16]; + let height = &q[16..]; + + let median_time = median_time.parse::()?; + let height = height.parse::()?; + Ok::<(i64, u32), Error>((median_time, height)) + }) + .transpose()? + .unwrap_or((i64::MAX, u32::MAX)); + let repo = &ctx.services.oracle_price_aggregated; - let id = (token.to_string(), currency.to_string(), i64::MAX, u32::MAX); + let id = (token.clone(), currency.clone(), next.0, next.1); let oracle_aggregated = repo .by_id .list(Some(id), SortOrder::Descending)? @@ -289,7 +302,16 @@ async fn get_feed_active( ) -> Result> { let (token, currency) = parse_token_currency(&key)?; - let id = (token.clone(), currency.clone(), u32::MAX); + let next = query + .next + .map(|q| { + let height = q.parse::()?; + Ok::(height) + }) + .transpose()? + .unwrap_or(u32::MAX); + + let id = (token.clone(), currency.clone(), next); let price_active = ctx .services .oracle_price_active @@ -355,11 +377,21 @@ async fn get_feed_with_interval( 86400 => OracleIntervalSeconds::OneDay, _ => return Err(From::from("Invalid oracle interval")), }; + + let next = query + .next + .map(|q| { + let height = q.parse::()?; + Ok::(height) + }) + .transpose()? + .unwrap_or(u32::MAX); + let id = ( token.clone(), currency.clone(), interval_type.clone(), - u32::MAX, + next, ); let items = ctx @@ -434,10 +466,20 @@ async fn list_price_oracles( ) -> Result> { let (token, currency) = parse_token_currency(&key)?; + let next = query + .next + .map(|q| { + let oracle_id = Txid::from_str(&q)?; + Ok::(oracle_id) + }) + .transpose()? + .unwrap_or(Txid::from_byte_array([0xffu8; 32])); + + let id = ( token.clone(), currency.clone(), - Txid::from_byte_array([0xffu8; 32]), + next, ); let token_currencies = ctx .services From 89e20b60aa62094cf19346ce3263d6fd3ccaf56a Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 21 Nov 2024 22:20:00 +0800 Subject: [PATCH 07/17] [oracle] convert u32,i64 to be bytes --- lib/ain-ocean/src/api/loan.rs | 12 ++++--- lib/ain-ocean/src/api/oracle.rs | 2 +- lib/ain-ocean/src/api/prices.rs | 29 +++++++-------- lib/ain-ocean/src/indexer/loan_token.rs | 10 +++--- lib/ain-ocean/src/indexer/oracle.rs | 36 +++++++++---------- lib/ain-ocean/src/model/oracle.rs | 2 +- .../src/model/oracle_price_active.rs | 2 +- .../src/model/oracle_price_aggregated.rs | 2 +- .../model/oracle_price_aggregated_interval.rs | 2 +- lib/ain-ocean/src/model/oracle_price_feed.rs | 2 +- lib/ain-ocean/src/model/price_ticker.rs | 2 +- 11 files changed, 53 insertions(+), 48 deletions(-) diff --git a/lib/ain-ocean/src/api/loan.rs b/lib/ain-ocean/src/api/loan.rs index 85731fc68b..7316561754 100644 --- a/lib/ain-ocean/src/api/loan.rs +++ b/lib/ain-ocean/src/api/loan.rs @@ -138,7 +138,11 @@ fn get_active_price( .services .oracle_price_active .by_id - .list(Some((token, currency, u32::MAX)), SortOrder::Descending)? + .list(Some((token.clone(), currency.clone(), [0xffu8; 4])), SortOrder::Descending)? + .take_while(|item| match item { + Ok((k, _)) => k.0 == token && k.1 == currency, + _ => true, + }) .next() .map(|item| { let (_, v) = item?; @@ -264,7 +268,7 @@ async fn list_loan_token( .services .oracle_price_active .by_id - .list(Some((token, currency, u32::MAX)), SortOrder::Descending)? + .list(Some((token, currency, [0xffu8; 4])), SortOrder::Descending)? .next() .map(|item| { let (_, v) = item?; @@ -671,7 +675,7 @@ async fn map_liquidation_batches( let id = ( Txid::from_str(vault_id)?, batch.index.to_be_bytes(), - [0xffu8, 0xffu8, 0xffu8, 0xffu8], + [0xffu8; 4], Txid::from_byte_array([0xffu8; 32]), ); let bids = repo @@ -733,7 +737,7 @@ async fn map_token_amounts( .oracle_price_active .by_id .list( - Some((token_info.symbol.clone(), "USD".to_string(), u32::MAX)), + Some((token_info.symbol.clone(), "USD".to_string(), [0xffu8; 4])), SortOrder::Descending, )? .take_while(|item| match item { diff --git a/lib/ain-ocean/src/api/oracle.rs b/lib/ain-ocean/src/api/oracle.rs index e9b7c2e6a1..de0bf825dc 100644 --- a/lib/ain-ocean/src/api/oracle.rs +++ b/lib/ain-ocean/src/api/oracle.rs @@ -111,7 +111,7 @@ async fn get_feed( OraclePriceFeedResponse { id: format!("{}-{}-{}-{}", token, currency, oracle_id, txid), key: format!("{}-{}-{}", token, currency, oracle_id), - sort: hex::encode(height.to_string() + &txid.to_string()), + sort: hex::encode(u32::from_be_bytes(height).to_string() + &txid.to_string()), token, currency, oracle_id, diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 303a3a620f..e780ec921f 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -195,12 +195,12 @@ async fn get_feed( let median_time = &q[..16]; let height = &q[16..]; - let median_time = median_time.parse::()?; - let height = height.parse::()?; - Ok::<(i64, u32), Error>((median_time, height)) + let median_time = median_time.parse::()?.to_be_bytes(); + let height = height.parse::()?.to_be_bytes(); + Ok::<([u8; 8], [u8; 4]), Error>((median_time, height)) }) .transpose()? - .unwrap_or((i64::MAX, u32::MAX)); + .unwrap_or(([0xffu8; 8], [0xffu8; 4])); let repo = &ctx.services.oracle_price_aggregated; @@ -216,7 +216,7 @@ async fn get_feed( .map(|item| { let (k, v) = item?; let res = OraclePriceAggregatedResponse { - id: format!("{}-{}-{}", k.0, k.1, k.2), + id: format!("{}-{}-{}", k.0, k.1, i64::from_be_bytes(k.2)), key: format!("{}-{}", k.0, k.1), sort: format!( "{}{}", @@ -305,11 +305,11 @@ async fn get_feed_active( let next = query .next .map(|q| { - let height = q.parse::()?; - Ok::(height) + let height = q.parse::()?.to_be_bytes(); + Ok::<[u8; 4], Error>(height) }) .transpose()? - .unwrap_or(u32::MAX); + .unwrap_or([0xffu8; 4]); let id = (token.clone(), currency.clone(), next); let price_active = ctx @@ -381,11 +381,11 @@ async fn get_feed_with_interval( let next = query .next .map(|q| { - let height = q.parse::()?; - Ok::(height) + let height = q.parse::()?.to_be_bytes(); + Ok::<[u8; 4], Error>(height) }) .transpose()? - .unwrap_or(u32::MAX); + .unwrap_or([0xffu8; 4]); let id = ( token.clone(), @@ -412,9 +412,10 @@ async fn get_feed_with_interval( let mut prices = Vec::new(); for (id, item) in items { let start = item.block.median_time - (item.block.median_time % interval); + let height = u32::from_be_bytes(id.3); let price = OraclePriceAggregatedIntervalResponse { - id: format!("{}-{}-{:?}-{}", id.0, id.1, id.2, id.3), + id: format!("{}-{}-{:?}-{}", id.0, id.1, id.2, height), key: format!("{}-{}-{:?}", id.0, id.1, id.2), sort: format!( "{}{}", @@ -505,7 +506,7 @@ async fn list_price_oracles( token.clone(), currency.clone(), oracle_id, - u32::MAX, + [0xffu8; 4], Txid::from_byte_array([0xffu8; 32]), )), SortOrder::Descending, @@ -529,7 +530,7 @@ async fn list_price_oracles( let token = id.0; let currency = id.1; let oracle_id = id.2; - let height = id.3; + let height = u32::from_be_bytes(id.3); let txid = id.4; OraclePriceFeedResponse { id: format!("{}-{}-{}-{}", token, currency, oracle_id, txid), diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index 8fa47615d5..964fea963b 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -25,7 +25,7 @@ impl Index for SetLoanToken { let ticker_id = ( self.currency_pair.token.clone(), self.currency_pair.currency.clone(), - context.block.height, + context.block.height.to_be_bytes(), ); services.oracle_price_active.by_id.delete(&ticker_id)?; Ok(()) @@ -147,7 +147,7 @@ pub fn invalidate_active_price(services: &Arc, block: &BlockContext) - services .oracle_price_active .by_id - .delete(&(token, currency, block.height))?; + .delete(&(token, currency, block.height.to_be_bytes()))?; } } @@ -159,7 +159,7 @@ pub fn perform_active_price_tick( ticker_id: (Token, Currency), block: &BlockContext, ) -> Result<()> { - let id = (ticker_id.0.clone(), ticker_id.1.clone(), i64::MAX, u32::MAX); + let id = (ticker_id.0.clone(), ticker_id.1.clone(), [0xffu8; 8], [0xffu8; 4]); let prev = services .oracle_price_aggregated @@ -172,7 +172,7 @@ pub fn perform_active_price_tick( return Ok(()); }; - let id = (ticker_id.0, ticker_id.1, u32::MAX); + let id = (ticker_id.0, ticker_id.1, [0xffu8; 4]); let repo = &services.oracle_price_active; let prev = repo .by_id @@ -188,7 +188,7 @@ pub fn perform_active_price_tick( let active_price = map_active_price(block, aggregated_price, prev_price); - repo.by_id.put(&(id.0, id.1, block.height), &active_price)?; + repo.by_id.put(&(id.0, id.1, block.height.to_be_bytes()), &active_price)?; Ok(()) } diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index ab93c4b236..343b4831ae 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -53,7 +53,7 @@ impl Index for AppointOracle { }; services.oracle.by_id.put(&oracle_id, &oracle)?; - let oracle_history_id = (oracle_id, ctx.block.height); + let oracle_history_id = (oracle_id, ctx.block.height.to_be_bytes()); services .oracle_history .by_id @@ -88,7 +88,7 @@ impl Index for AppointOracle { services .oracle_history .by_id - .delete(&(oracle_id, context.block.height))?; + .delete(&(oracle_id, context.block.height.to_be_bytes()))?; for currency_pair in self.price_feeds.iter().rev() { let token_currency_id = ( @@ -184,7 +184,7 @@ impl Index for UpdateOracle { services .oracle_history .by_id - .put(&(oracle_id, ctx.block.height), &oracle)?; + .put(&(oracle_id, ctx.block.height.to_be_bytes()), &oracle)?; let (_, previous) = get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu { @@ -220,7 +220,7 @@ impl Index for UpdateOracle { services .oracle_history .by_id - .delete(&(oracle_id, context.block.height))?; + .delete(&(oracle_id, context.block.height.to_be_bytes()))?; let price_feeds = self.price_feeds.as_ref(); for pair in price_feeds.iter().rev() { @@ -302,7 +302,7 @@ fn map_price_aggregated( .oracle_price_feed .by_id .list( - Some((id.0, id.1, id.2, u32::MAX, base_id)), + Some((id.0, id.1, id.2, [0xffu8; 4], base_id)), SortOrder::Descending, )? .next() @@ -375,14 +375,14 @@ fn index_set_oracle_data( let id = ( token.clone(), currency.clone(), - price_aggregated.block.median_time, - price_aggregated.block.height, + price_aggregated.block.median_time.to_be_bytes(), + price_aggregated.block.height.to_be_bytes(), ); oracle_repo.by_id.put(&id, &price_aggregated)?; let key = ( - price_aggregated.aggregated.oracles.total, - price_aggregated.block.height, + price_aggregated.aggregated.oracles.total.to_be_bytes(), + price_aggregated.block.height.to_be_bytes(), token, currency, ); @@ -406,8 +406,8 @@ fn index_set_oracle_data_interval( let aggregated = services.oracle_price_aggregated.by_id.get(&( token.clone(), currency.clone(), - context.block.median_time, - context.block.height, + context.block.median_time.to_be_bytes(), + context.block.height.to_be_bytes(), ))?; let Some(aggregated) = aggregated else { @@ -456,8 +456,8 @@ impl Index for SetOracleData { let id = ( token.clone(), currency.clone(), - context.block.median_time, - context.block.height, + context.block.median_time.to_be_bytes(), + context.block.height.to_be_bytes(), ); let aggregated = oracle_repo.by_id.get(&id)?; @@ -495,7 +495,7 @@ fn map_price_feeds( token_price.token.clone(), token_amount.currency.clone(), data.oracle_id, - ctx.block.height, + ctx.block.height.to_be_bytes(), ctx.tx.txid, ); @@ -518,7 +518,7 @@ fn start_new_bucket( aggregated: &OraclePriceAggregated, interval: OracleIntervalSeconds, ) -> Result<()> { - let id = (token, currency, interval, block.height); + let id = (token, currency, interval, block.height.to_be_bytes()); services.oracle_price_aggregated_interval.by_id.put( &id, &OraclePriceAggregatedInterval { @@ -550,7 +550,7 @@ pub fn index_interval_mapper( let previous = repo .by_id .list( - Some((token.clone(), currency.clone(), interval.clone(), u32::MAX)), + Some((token.clone(), currency.clone(), interval.clone(), [0xffu8; 4])), SortOrder::Descending, )? .take_while(|item| match item { @@ -591,7 +591,7 @@ pub fn invalidate_oracle_interval( token.to_string(), currency.to_string(), interval.clone(), - u32::MAX, + [0xffu8; 4], )), SortOrder::Descending, )? @@ -739,7 +739,7 @@ fn get_previous_oracle( let previous = services .oracle_history .by_id - .list(Some((oracle_id, u32::MAX)), SortOrder::Descending)? + .list(Some((oracle_id, [0xffu8; 4])), SortOrder::Descending)? .next() .transpose()?; diff --git a/lib/ain-ocean/src/model/oracle.rs b/lib/ain-ocean/src/model/oracle.rs index 6c4c0b9548..c15c3f7f69 100644 --- a/lib/ain-ocean/src/model/oracle.rs +++ b/lib/ain-ocean/src/model/oracle.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type OracleHistoryId = (Txid, u32); //oracleId-height +pub type OracleHistoryId = (Txid, [u8; 4]); //oracleId-height #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/model/oracle_price_active.rs b/lib/ain-ocean/src/model/oracle_price_active.rs index 3b9740f08e..4f53828420 100644 --- a/lib/ain-ocean/src/model/oracle_price_active.rs +++ b/lib/ain-ocean/src/model/oracle_price_active.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type OraclePriceActiveId = (String, String, u32); //token-currency-height +pub type OraclePriceActiveId = (String, String, [u8; 4]); //token-currency-height #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/model/oracle_price_aggregated.rs b/lib/ain-ocean/src/model/oracle_price_aggregated.rs index 65832b95ba..1fdcf30128 100644 --- a/lib/ain-ocean/src/model/oracle_price_aggregated.rs +++ b/lib/ain-ocean/src/model/oracle_price_aggregated.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use super::{BlockContext, OraclePriceActiveNext}; -pub type OraclePriceAggregatedId = (String, String, i64, u32); //token-currency-mediantime-height +pub type OraclePriceAggregatedId = (String, String, [u8; 8], [u8; 4]); //token-currency-mediantime-height #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs b/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs index 6b5bb65d5b..443e4e7e5a 100644 --- a/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs +++ b/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs @@ -3,7 +3,7 @@ use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type OraclePriceAggregatedIntervalId = (Token, Currency, OracleIntervalSeconds, u32); //token-currency-interval-height +pub type OraclePriceAggregatedIntervalId = (Token, Currency, OracleIntervalSeconds, [u8; 4]); //token-currency-interval-height pub const FIFTEEN_MINUTES: isize = 15 * 60; pub const ONE_HOUR: isize = 60 * 60; diff --git a/lib/ain-ocean/src/model/oracle_price_feed.rs b/lib/ain-ocean/src/model/oracle_price_feed.rs index 4a2ff073a3..40cb98b536 100644 --- a/lib/ain-ocean/src/model/oracle_price_feed.rs +++ b/lib/ain-ocean/src/model/oracle_price_feed.rs @@ -2,7 +2,7 @@ use bitcoin::Txid; use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type OraclePriceFeedId = (String, String, Txid, u32, Txid); // token-currency-oracle_id-height-txid +pub type OraclePriceFeedId = (String, String, Txid, [u8; 4], Txid); // token-currency-oracle_id-height-txid #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/model/price_ticker.rs b/lib/ain-ocean/src/model/price_ticker.rs index 54457622fd..777e693649 100644 --- a/lib/ain-ocean/src/model/price_ticker.rs +++ b/lib/ain-ocean/src/model/price_ticker.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use super::oracle_price_aggregated::OraclePriceAggregated; pub type PriceTickerId = (String, String); //token-currency -pub type PriceTickerKey = (i32, u32, String, String); // total-height-token-currency +pub type PriceTickerKey = ([u8; 4], [u8; 4], String, String); // total-height-token-currency #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] From a7597340a40eee0c1304bcec2babfaaa4a6ab1f5 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Fri, 22 Nov 2024 11:06:45 +0800 Subject: [PATCH 08/17] ScriptAggregationId height u32 -> bytes --- lib/ain-ocean/src/api/address.rs | 2 +- lib/ain-ocean/src/indexer/mod.rs | 6 +++--- lib/ain-ocean/src/model/script_aggregation.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/ain-ocean/src/api/address.rs b/lib/ain-ocean/src/api/address.rs index 6e98e80c46..0e4dcd35fa 100644 --- a/lib/ain-ocean/src/api/address.rs +++ b/lib/ain-ocean/src/api/address.rs @@ -173,7 +173,7 @@ fn get_latest_aggregation( .services .script_aggregation .by_id - .list(Some((hid, u32::MAX)), SortOrder::Descending)? + .list(Some((hid, [0xffu8; 4])), SortOrder::Descending)? .take(1) .take_while(|item| match item { Ok(((v, _), _)) => v == &hid, diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index 725ec86808..fbc140fc0f 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -317,7 +317,7 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> let repo = &services.script_aggregation; let latest = repo .by_id - .list(Some((aggregation.hid, u32::MAX)), SortOrder::Descending)? + .list(Some((aggregation.hid, [0xffu8; 4])), SortOrder::Descending)? .take(1) .take_while(|item| match item { Ok(((hid, _), _)) => &aggregation.hid == hid, @@ -342,7 +342,7 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> aggregation.amount.unspent = aggregation.amount.tx_in - aggregation.amount.tx_out; repo.by_id - .put(&(aggregation.hid, ctx.block.height), &aggregation)?; + .put(&(aggregation.hid, ctx.block.height.to_be_bytes()), &aggregation)?; record.insert(aggregation.hid, aggregation); } @@ -404,7 +404,7 @@ fn invalidate_script(services: &Arc, ctx: &Context, txs: &[Transaction services .script_aggregation .by_id - .delete(&(hid, block.height))? + .delete(&(hid, block.height.to_be_bytes()))? } Ok(()) diff --git a/lib/ain-ocean/src/model/script_aggregation.rs b/lib/ain-ocean/src/model/script_aggregation.rs index 7623c1b47b..8e1f882c7b 100644 --- a/lib/ain-ocean/src/model/script_aggregation.rs +++ b/lib/ain-ocean/src/model/script_aggregation.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type ScriptAggregationId = ([u8; 32], u32); // (hid, block.height) +pub type ScriptAggregationId = ([u8; 32], [u8; 4]); // (hid, block.height) #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ScriptAggregation { From 655c38f4f2c13a35de933c66e82a34dfd7f5517b Mon Sep 17 00:00:00 2001 From: canonbrother Date: Fri, 22 Nov 2024 14:05:50 +0800 Subject: [PATCH 09/17] storage: endianness - mn, poolswap --- lib/ain-ocean/src/storage/mod.rs | 87 ++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/lib/ain-ocean/src/storage/mod.rs b/lib/ain-ocean/src/storage/mod.rs index 054f25d599..0d6c449636 100644 --- a/lib/ain-ocean/src/storage/mod.rs +++ b/lib/ain-ocean/src/storage/mod.rs @@ -94,6 +94,28 @@ define_table! { pub struct MasternodeByHeight { key_type = (u32, Txid), value_type = u8, + custom_key = { + fn key(index: &Self::Index) -> DBResult> { + let (height, txid) = index; + let mut vec = height.to_be_bytes().to_vec(); + vec.extend_from_slice(txid.as_byte_array().to_vec().as_ref()); + Ok(vec) + } + + fn get_key(raw_key: Box<[u8]>) -> DBResult { + if raw_key.len() != 36 { + return Err(DBError::WrongKeyLength); + } + let mut height_array = [0u8; 4]; + height_array.copy_from_slice(&raw_key[..4]); + let mut txid_array = [0u8; 32]; + txid_array.copy_from_slice(&raw_key[4..]); + + let height = u32::from_be_bytes(height_array); + let txid = Txid::from_byte_array(txid_array); + Ok((height, txid)) + } + }, } } @@ -201,6 +223,40 @@ define_table! { pub struct PoolSwap { key_type = model::PoolSwapKey, value_type = model::PoolSwap, + custom_key = { + fn key(index: &Self::Index) -> DBResult> { + let (pool_id, height, txno) = index; // u32, u32, usize + let mut vec = Vec::with_capacity(16); + vec.extend_from_slice(&pool_id.to_be_bytes()); + vec.extend_from_slice(&height.to_be_bytes()); + vec.extend_from_slice(&txno.to_be_bytes()); + Ok(vec) + } + + fn get_key(raw_key: Box<[u8]>) -> DBResult { + if raw_key.len() != 16 { + return Err(DBError::WrongKeyLength); + } + let pool_id = u32::from_be_bytes( + raw_key[0..4] + .try_into() + .map_err(|_| DBError::WrongKeyLength)?, + ); + let height = u32::from_be_bytes( + raw_key[4..8] + .try_into() + .map_err(|_| DBError::WrongKeyLength)?, + ); + let txno = usize::from_be_bytes( + raw_key[8..] + .try_into() + .map_err(|_| DBError::WrongKeyLength)?, + ); + + Ok((pool_id, height, txno)) + } + + }, }, InitialKeyProvider = |pk: u32| (pk, u32::MAX, usize::MAX) } @@ -210,6 +266,37 @@ define_table! { pub struct PoolSwapAggregated { key_type = model::PoolSwapAggregatedId, value_type = model::PoolSwapAggregated, + custom_key = { + fn key(index: &Self::Index) -> DBResult> { + let (pool_id, interval, hash) = index; // u32, u32, hash + let mut vec = Vec::with_capacity(40); + vec.extend_from_slice(&pool_id.to_be_bytes()); + vec.extend_from_slice(&interval.to_be_bytes()); + vec.extend_from_slice(hash.as_byte_array().to_vec().as_ref()); + Ok(vec) + } + + fn get_key(raw_key: Box<[u8]>) -> DBResult { + if raw_key.len() != 40 { + return Err(DBError::WrongKeyLength); + } + let pool_id = u32::from_be_bytes( + raw_key[0..4] + .try_into() + .map_err(|_| DBError::WrongKeyLength)?, + ); + let interval = u32::from_be_bytes( + raw_key[4..8] + .try_into() + .map_err(|_| DBError::WrongKeyLength)?, + ); + let mut hash_array = [0u8; 32]; + hash_array.copy_from_slice(&raw_key[..32]); + let hash = BlockHash::from_byte_array(hash_array); + + Ok((pool_id, interval, hash)) + } + }, } } From fbfbf3ace1b5a0245db2c78b96761f34acfbe2f6 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Fri, 22 Nov 2024 14:14:40 +0800 Subject: [PATCH 10/17] storage: endianness - OracleHistory --- lib/ain-ocean/src/indexer/oracle.rs | 10 +++++----- lib/ain-ocean/src/model/oracle.rs | 2 +- lib/ain-ocean/src/storage/mod.rs | 22 ++++++++++++++++++++++ 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 343b4831ae..77ab02710f 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -53,7 +53,7 @@ impl Index for AppointOracle { }; services.oracle.by_id.put(&oracle_id, &oracle)?; - let oracle_history_id = (oracle_id, ctx.block.height.to_be_bytes()); + let oracle_history_id = (oracle_id, ctx.block.height); services .oracle_history .by_id @@ -88,7 +88,7 @@ impl Index for AppointOracle { services .oracle_history .by_id - .delete(&(oracle_id, context.block.height.to_be_bytes()))?; + .delete(&(oracle_id, context.block.height))?; for currency_pair in self.price_feeds.iter().rev() { let token_currency_id = ( @@ -184,7 +184,7 @@ impl Index for UpdateOracle { services .oracle_history .by_id - .put(&(oracle_id, ctx.block.height.to_be_bytes()), &oracle)?; + .put(&(oracle_id, ctx.block.height), &oracle)?; let (_, previous) = get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu { @@ -220,7 +220,7 @@ impl Index for UpdateOracle { services .oracle_history .by_id - .delete(&(oracle_id, context.block.height.to_be_bytes()))?; + .delete(&(oracle_id, context.block.height))?; let price_feeds = self.price_feeds.as_ref(); for pair in price_feeds.iter().rev() { @@ -739,7 +739,7 @@ fn get_previous_oracle( let previous = services .oracle_history .by_id - .list(Some((oracle_id, [0xffu8; 4])), SortOrder::Descending)? + .list(Some((oracle_id, u32::MAX)), SortOrder::Descending)? .next() .transpose()?; diff --git a/lib/ain-ocean/src/model/oracle.rs b/lib/ain-ocean/src/model/oracle.rs index c15c3f7f69..6c4c0b9548 100644 --- a/lib/ain-ocean/src/model/oracle.rs +++ b/lib/ain-ocean/src/model/oracle.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type OracleHistoryId = (Txid, [u8; 4]); //oracleId-height +pub type OracleHistoryId = (Txid, u32); //oracleId-height #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/storage/mod.rs b/lib/ain-ocean/src/storage/mod.rs index 0d6c449636..92ef6cfc9b 100644 --- a/lib/ain-ocean/src/storage/mod.rs +++ b/lib/ain-ocean/src/storage/mod.rs @@ -175,6 +175,28 @@ define_table! { pub struct OracleHistory { key_type = model::OracleHistoryId, value_type = model::Oracle, + custom_key = { + fn key(index: &Self::Index) -> DBResult> { + let (txid, height) = index; // txid, u32 + let mut vec = txid.as_byte_array().to_vec(); + vec.extend_from_slice(&height.to_be_bytes()); + Ok(vec) + } + + fn get_key(raw_key: Box<[u8]>) -> DBResult { + if raw_key.len() != 36 { + return Err(DBError::WrongKeyLength); + } + let mut txid_array = [0u8; 32]; + txid_array.copy_from_slice(&raw_key[..32]); + let mut height_array = [0u8; 4]; + height_array.copy_from_slice(&raw_key[32..]); + + let txid = Txid::from_byte_array(txid_array); + let height = u32::from_be_bytes(height_array); + Ok((txid, height)) + } + }, } } From 59795e960fcfdd127742bd27c49b066d778dfe65 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Fri, 22 Nov 2024 14:30:18 +0800 Subject: [PATCH 11/17] ScriptUnspentKey uzize to be --- lib/ain-ocean/src/api/address.rs | 11 +++++++---- lib/ain-ocean/src/indexer/mod.rs | 14 +++++++------- lib/ain-ocean/src/model/script_unspent.rs | 4 ++-- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/lib/ain-ocean/src/api/address.rs b/lib/ain-ocean/src/api/address.rs index 0e4dcd35fa..e41ff07373 100644 --- a/lib/ain-ocean/src/api/address.rs +++ b/lib/ain-ocean/src/api/address.rs @@ -458,14 +458,17 @@ async fn list_transaction_unspent( msg: format!("Invalid height: {}", height), })?; let txid = Txid::from_str(txid)?; - let n = n.parse::()?; - Ok::<([u8; 4], Txid, usize), Error>((height, txid, n)) + let decoded_n = hex::decode(n)?; + let n = decoded_n.try_into().map_err(|_| Error::Other { + msg: format!("Invalid txno: {}", n) + })?; + Ok::<([u8; 4], Txid, [u8; 8]), Error>((height, txid, n)) }) .transpose()? .unwrap_or(( - [0u8, 0u8, 0u8, 0u8], + [0u8; 4], Txid::from_byte_array([0x00u8; 32]), - usize::default(), + [0u8; 8], )); let res = ctx diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index fbc140fc0f..1824e285f4 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -160,7 +160,7 @@ fn index_script_unspent_vin( vin: &VinStandard, ctx: &Context, ) -> Result<()> { - let key = (ctx.block.height.to_be_bytes(), vin.txid, vin.vout); + let key = (ctx.block.height.to_be_bytes(), vin.txid, vin.vout.to_be_bytes()); let id = services.script_unspent.by_key.get(&key)?; if let Some(id) = id { services.script_unspent.by_id.delete(&id)?; @@ -255,8 +255,8 @@ fn index_script_unspent_vout(services: &Arc, vout: &Vout, ctx: &Contex }, }; - let id = (hid, block.height.to_be_bytes(), tx.txid, vout.n); - let key = (block.height.to_be_bytes(), tx.txid, vout.n); + let id = (hid, block.height.to_be_bytes(), tx.txid, vout.n.to_be_bytes()); + let key = (block.height.to_be_bytes(), tx.txid, vout.n.to_be_bytes()); services.script_unspent.by_key.put(&key, &id)?; services.script_unspent.by_id.put(&id, &script_unspent)?; Ok(()) @@ -458,12 +458,12 @@ fn invalidate_script_unspent_vin( hid, transaction.block.height.to_be_bytes(), transaction.txid, - vout.n, + vout.n.to_be_bytes(), ); let key = ( transaction.block.height.to_be_bytes(), transaction.txid, - vout.n, + vout.n.to_be_bytes(), ); services.script_unspent.by_key.put(&key, &id)?; @@ -496,8 +496,8 @@ fn invalidate_script_unspent_vout( vout: &Vout, ) -> Result<()> { let hid = as_sha256(&vout.script_pub_key.hex); - let id = (hid, ctx.block.height.to_be_bytes(), ctx.tx.txid, vout.n); - let key = (ctx.block.height.to_be_bytes(), ctx.tx.txid, vout.n); + let id = (hid, ctx.block.height.to_be_bytes(), ctx.tx.txid, vout.n.to_be_bytes()); + let key = (ctx.block.height.to_be_bytes(), ctx.tx.txid, vout.n.to_be_bytes()); services.script_unspent.by_id.delete(&id)?; services.script_unspent.by_key.delete(&key)?; diff --git a/lib/ain-ocean/src/model/script_unspent.rs b/lib/ain-ocean/src/model/script_unspent.rs index c088751976..0bf270cd8b 100644 --- a/lib/ain-ocean/src/model/script_unspent.rs +++ b/lib/ain-ocean/src/model/script_unspent.rs @@ -3,8 +3,8 @@ use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type ScriptUnspentId = ([u8; 32], [u8; 4], Txid, usize); // hid + block.height + txid + vout_index -pub type ScriptUnspentKey = ([u8; 4], Txid, usize); // block.height + txid + vout_index, ps: key is required in index_script_unspent_vin +pub type ScriptUnspentId = ([u8; 32], [u8; 4], Txid, [u8; 8]); // hid + block.height + txid + vout_index +pub type ScriptUnspentKey = ([u8; 4], Txid, [u8; 8]); // block.height + txid + vout_index, ps: key is required in index_script_unspent_vin #[derive(Debug, Serialize, Deserialize)] pub struct ScriptUnspent { From 1b58ee91ed265ac06c9e97f4c3269302fbc50691 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Fri, 22 Nov 2024 15:11:03 +0800 Subject: [PATCH 12/17] fmt_rs --- lib/ain-ocean/src/api/address.rs | 8 ++----- lib/ain-ocean/src/api/loan.rs | 5 +++- lib/ain-ocean/src/api/prices.rs | 15 ++---------- lib/ain-ocean/src/indexer/loan_token.rs | 19 ++++++++++----- lib/ain-ocean/src/indexer/mod.rs | 32 ++++++++++++++++++++----- lib/ain-ocean/src/indexer/oracle.rs | 7 +++++- 6 files changed, 53 insertions(+), 33 deletions(-) diff --git a/lib/ain-ocean/src/api/address.rs b/lib/ain-ocean/src/api/address.rs index e41ff07373..a685b42119 100644 --- a/lib/ain-ocean/src/api/address.rs +++ b/lib/ain-ocean/src/api/address.rs @@ -460,16 +460,12 @@ async fn list_transaction_unspent( let txid = Txid::from_str(txid)?; let decoded_n = hex::decode(n)?; let n = decoded_n.try_into().map_err(|_| Error::Other { - msg: format!("Invalid txno: {}", n) + msg: format!("Invalid txno: {}", n), })?; Ok::<([u8; 4], Txid, [u8; 8]), Error>((height, txid, n)) }) .transpose()? - .unwrap_or(( - [0u8; 4], - Txid::from_byte_array([0x00u8; 32]), - [0u8; 8], - )); + .unwrap_or(([0u8; 4], Txid::from_byte_array([0x00u8; 32]), [0u8; 8])); let res = ctx .services diff --git a/lib/ain-ocean/src/api/loan.rs b/lib/ain-ocean/src/api/loan.rs index 7316561754..fb67b4b63f 100644 --- a/lib/ain-ocean/src/api/loan.rs +++ b/lib/ain-ocean/src/api/loan.rs @@ -138,7 +138,10 @@ fn get_active_price( .services .oracle_price_active .by_id - .list(Some((token.clone(), currency.clone(), [0xffu8; 4])), SortOrder::Descending)? + .list( + Some((token.clone(), currency.clone(), [0xffu8; 4])), + SortOrder::Descending, + )? .take_while(|item| match item { Ok((k, _)) => k.0 == token && k.1 == currency, _ => true, diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index e780ec921f..47e0d44f32 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -202,7 +202,6 @@ async fn get_feed( .transpose()? .unwrap_or(([0xffu8; 8], [0xffu8; 4])); - let repo = &ctx.services.oracle_price_aggregated; let id = (token.clone(), currency.clone(), next.0, next.1); let oracle_aggregated = repo @@ -387,12 +386,7 @@ async fn get_feed_with_interval( .transpose()? .unwrap_or([0xffu8; 4]); - let id = ( - token.clone(), - currency.clone(), - interval_type.clone(), - next, - ); + let id = (token.clone(), currency.clone(), interval_type.clone(), next); let items = ctx .services @@ -476,12 +470,7 @@ async fn list_price_oracles( .transpose()? .unwrap_or(Txid::from_byte_array([0xffu8; 32])); - - let id = ( - token.clone(), - currency.clone(), - next, - ); + let id = (token.clone(), currency.clone(), next); let token_currencies = ctx .services .oracle_token_currency diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index 964fea963b..cc35ca42ef 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -144,10 +144,11 @@ pub fn invalidate_active_price(services: &Arc, block: &BlockContext) - .collect::>(); for ((token, currency), _) in price_tickers.into_iter().rev() { - services - .oracle_price_active - .by_id - .delete(&(token, currency, block.height.to_be_bytes()))?; + services.oracle_price_active.by_id.delete(&( + token, + currency, + block.height.to_be_bytes(), + ))?; } } @@ -159,7 +160,12 @@ pub fn perform_active_price_tick( ticker_id: (Token, Currency), block: &BlockContext, ) -> Result<()> { - let id = (ticker_id.0.clone(), ticker_id.1.clone(), [0xffu8; 8], [0xffu8; 4]); + let id = ( + ticker_id.0.clone(), + ticker_id.1.clone(), + [0xffu8; 8], + [0xffu8; 4], + ); let prev = services .oracle_price_aggregated @@ -188,7 +194,8 @@ pub fn perform_active_price_tick( let active_price = map_active_price(block, aggregated_price, prev_price); - repo.by_id.put(&(id.0, id.1, block.height.to_be_bytes()), &active_price)?; + repo.by_id + .put(&(id.0, id.1, block.height.to_be_bytes()), &active_price)?; Ok(()) } diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index 1824e285f4..a0e78fb3e2 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -160,7 +160,11 @@ fn index_script_unspent_vin( vin: &VinStandard, ctx: &Context, ) -> Result<()> { - let key = (ctx.block.height.to_be_bytes(), vin.txid, vin.vout.to_be_bytes()); + let key = ( + ctx.block.height.to_be_bytes(), + vin.txid, + vin.vout.to_be_bytes(), + ); let id = services.script_unspent.by_key.get(&key)?; if let Some(id) = id { services.script_unspent.by_id.delete(&id)?; @@ -255,7 +259,12 @@ fn index_script_unspent_vout(services: &Arc, vout: &Vout, ctx: &Contex }, }; - let id = (hid, block.height.to_be_bytes(), tx.txid, vout.n.to_be_bytes()); + let id = ( + hid, + block.height.to_be_bytes(), + tx.txid, + vout.n.to_be_bytes(), + ); let key = (block.height.to_be_bytes(), tx.txid, vout.n.to_be_bytes()); services.script_unspent.by_key.put(&key, &id)?; services.script_unspent.by_id.put(&id, &script_unspent)?; @@ -341,8 +350,10 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> aggregation.statistic.tx_in_count + aggregation.statistic.tx_out_count; aggregation.amount.unspent = aggregation.amount.tx_in - aggregation.amount.tx_out; - repo.by_id - .put(&(aggregation.hid, ctx.block.height.to_be_bytes()), &aggregation)?; + repo.by_id.put( + &(aggregation.hid, ctx.block.height.to_be_bytes()), + &aggregation, + )?; record.insert(aggregation.hid, aggregation); } @@ -496,8 +507,17 @@ fn invalidate_script_unspent_vout( vout: &Vout, ) -> Result<()> { let hid = as_sha256(&vout.script_pub_key.hex); - let id = (hid, ctx.block.height.to_be_bytes(), ctx.tx.txid, vout.n.to_be_bytes()); - let key = (ctx.block.height.to_be_bytes(), ctx.tx.txid, vout.n.to_be_bytes()); + let id = ( + hid, + ctx.block.height.to_be_bytes(), + ctx.tx.txid, + vout.n.to_be_bytes(), + ); + let key = ( + ctx.block.height.to_be_bytes(), + ctx.tx.txid, + vout.n.to_be_bytes(), + ); services.script_unspent.by_id.delete(&id)?; services.script_unspent.by_key.delete(&key)?; diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 77ab02710f..6ca9d02aeb 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -550,7 +550,12 @@ pub fn index_interval_mapper( let previous = repo .by_id .list( - Some((token.clone(), currency.clone(), interval.clone(), [0xffu8; 4])), + Some(( + token.clone(), + currency.clone(), + interval.clone(), + [0xffu8; 4], + )), SortOrder::Descending, )? .take_while(|item| match item { From cbf76d49f3bcc798ed7684e1067903a4a8f75416 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Mon, 25 Nov 2024 17:05:31 +0800 Subject: [PATCH 13/17] ApiRpcResponse --- lib/ain-ocean/src/api/response.rs | 14 ++++++++++++++ lib/ain-ocean/src/api/rpc.rs | 6 +++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/ain-ocean/src/api/response.rs b/lib/ain-ocean/src/api/response.rs index ead721a278..a872621010 100644 --- a/lib/ain-ocean/src/api/response.rs +++ b/lib/ain-ocean/src/api/response.rs @@ -12,6 +12,20 @@ impl Response { } } +#[derive(Debug, Serialize)] +pub struct ApiRpcResponse { + result: T, + // TODO: map error and id from rpc + // error: T, + // id: T, +} + +impl ApiRpcResponse { + pub fn new(result: T) -> Self { + Self { result } + } +} + /// ApiPagedResponse indicates that this response of data array slice is part of a sorted list of items. /// Items are part of a larger sorted list and the slice indicates a window within the large sorted list. /// Each ApiPagedResponse holds the data array and the "token" for next part of the slice. diff --git a/lib/ain-ocean/src/api/rpc.rs b/lib/ain-ocean/src/api/rpc.rs index 809f0853ec..0bebb6836a 100644 --- a/lib/ain-ocean/src/api/rpc.rs +++ b/lib/ain-ocean/src/api/rpc.rs @@ -5,7 +5,7 @@ use axum::{routing::post, Extension, Json, Router}; use defichain_rpc::RpcApi; use serde::{Deserialize, Serialize}; -use super::{response::Response, AppContext}; +use super::{response::ApiRpcResponse, AppContext}; use crate::{ error::{ApiError, Error}, Result, @@ -54,12 +54,12 @@ fn method_whitelist(method: &str) -> Result<()> { async fn rpc( Extension(ctx): Extension>, Json(body): Json, -) -> Result> { +) -> Result> { method_whitelist(&body.method)?; let res: serde_json::Value = ctx.client.call(&body.method, &body.params).await?; - Ok(Response::new(res)) + Ok(ApiRpcResponse::new(res)) } pub fn router(ctx: Arc) -> Router { From 08c7f7d9808dd18434eacfd70ec0fa94732d2cb7 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 26 Nov 2024 12:13:25 +0800 Subject: [PATCH 14/17] optimize prices indexer & api --- lib/ain-ocean/src/api/prices.rs | 68 +++++++++++-------------- lib/ain-ocean/src/api/stats/cache.rs | 18 +++++-- lib/ain-ocean/src/indexer/loan_token.rs | 41 +++++++++++---- lib/ain-ocean/src/indexer/oracle.rs | 8 ++- lib/ain-ocean/src/lib.rs | 2 - lib/ain-ocean/src/model/price_ticker.rs | 3 +- lib/ain-ocean/src/storage/mod.rs | 12 +---- 7 files changed, 79 insertions(+), 73 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 47e0d44f32..c2e5c261f9 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, sync::Arc}; +use std::{collections::HashSet, str::FromStr, sync::Arc}; use ain_dftx::{Currency, Token, Weightage, COIN}; use ain_macros::ocean_endpoint; @@ -8,11 +8,9 @@ use axum::{ Extension, Router, }; use bitcoin::{hashes::Hash, Txid}; -use indexmap::IndexSet; use rust_decimal::{prelude::ToPrimitive, Decimal}; use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; -use snafu::OptionExt; use super::{ common::parse_token_currency, @@ -22,7 +20,7 @@ use super::{ AppContext, }; use crate::{ - error::{ApiError, Error, OtherSnafu}, + error::{ApiError, Error}, model::{ BlockContext, OracleIntervalSeconds, OraclePriceActive, OraclePriceAggregatedIntervalAggregated, PriceTicker, @@ -121,39 +119,28 @@ async fn list_prices( Query(query): Query, Extension(ctx): Extension>, ) -> Result> { - let sorted_ids = ctx + let mut set: HashSet<(Token, Currency)> = HashSet::new(); + + let prices = ctx .services .price_ticker - .by_key + .by_id .list(None, SortOrder::Descending)? - .map(|item| { - let (_, id) = item?; - Ok(id) - }) - .collect::>>()?; - - // use IndexSet to rm dup without changing order - let mut sorted_ids_set = IndexSet::new(); - for id in sorted_ids { - sorted_ids_set.insert(id); - } - - let prices = sorted_ids_set - .into_iter() - .take(query.size) - .map(|id| { - let price_ticker = ctx - .services - .price_ticker - .by_id - .get(&id)? - .context(OtherSnafu { - msg: "Missing price ticker index", - })?; - - Ok(PriceTickerResponse::from((id, price_ticker))) + .flat_map(|item| { + let ((_, _, token, currency), v) = item?; + let has_key = set.contains(&(token.clone(), currency.clone())); + if !has_key { + set.insert((token.clone(), currency.clone())); + Ok::, Error>(Some(PriceTickerResponse::from(( + (token, currency), + v, + )))) + } else { + Ok(None) + } }) - .collect::>>()?; + .flatten() + .collect::>(); Ok(ApiPagedResponse::of(prices, query.size, |price| { price.sort.to_string() @@ -167,13 +154,16 @@ async fn get_price( ) -> Result>> { let (token, currency) = parse_token_currency(&key)?; - let price_ticker = ctx - .services - .price_ticker - .by_id - .get(&(token.clone(), currency.clone()))?; + let price_ticker = ctx.services.price_ticker.by_id.list(Some(( + [0xffu8; 4], + [0xffu8; 4], + token.clone(), + currency.clone(), + )), SortOrder::Descending)? + .next() + .transpose()?; - let Some(price_ticker) = price_ticker else { + let Some((_, price_ticker)) = price_ticker else { return Ok(Response::new(None)); }; diff --git a/lib/ain-ocean/src/api/stats/cache.rs b/lib/ain-ocean/src/api/stats/cache.rs index c66fdf9e28..de29ea27cf 100644 --- a/lib/ain-ocean/src/api/stats/cache.rs +++ b/lib/ain-ocean/src/api/stats/cache.rs @@ -1,5 +1,10 @@ -use std::{collections::HashMap, str::FromStr, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + str::FromStr, + sync::Arc, +}; +use ain_dftx::{Currency, Token}; use cached::proc_macro::cached; use defichain_rpc::{ defichain_rpc_json::token::TokenPagination, json::account::AccountAmount, AccountRPC, Client, @@ -22,7 +27,7 @@ use crate::{ stats::get_block_reward_distribution, AppContext, }, - error::{DecimalConversionSnafu, OtherSnafu}, + error::{DecimalConversionSnafu, Error, OtherSnafu}, model::MasternodeStatsData, storage::{RepositoryOps, SortOrder}, Result, @@ -100,12 +105,19 @@ pub async fn get_count(ctx: &Arc) -> Result { .get_latest()? .map_or(0, |mn| mn.stats.count); + let mut set: HashSet<(Token, Currency)> = HashSet::new(); let prices = ctx .services .price_ticker .by_id .list(None, SortOrder::Descending)? - .collect::>(); + .flat_map(|item| { + let ((_, _, token, currency), _) = item?; + set.insert((token, currency)); + Ok::, Error>(set.clone()) + }) + .next() + .unwrap_or(set); Ok(Count { blocks: 0, diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index cc35ca42ef..d05d2ecd9a 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, sync::Arc}; +use std::{collections::HashSet, str::FromStr, sync::Arc}; use ain_dftx::{loans::SetLoanToken, Currency, Token}; use log::trace; @@ -6,6 +6,7 @@ use rust_decimal::{prelude::Zero, Decimal}; use rust_decimal_macros::dec; use crate::{ + error::Error, indexer::{Context, Index, Result}, model::{BlockContext, OraclePriceActive, OraclePriceActiveNext, OraclePriceAggregated}, network::Network, @@ -86,15 +87,21 @@ pub fn index_active_price(services: &Arc, block: &BlockContext) -> Res _ => 120, }; if block.height % block_interval == 0 { - let price_tickers = services + let mut set: HashSet<(Token, Currency)> = HashSet::new(); + let pairs = services .price_ticker .by_id .list(None, SortOrder::Descending)? - .flatten() - .collect::>(); - - for (ticker_id, _) in price_tickers { - perform_active_price_tick(services, ticker_id, block)?; + .flat_map(|item| { + let ((_, _, token, currency), _) = item?; + set.insert((token, currency)); + Ok::, Error>(set.clone()) + }) + .next() + .unwrap_or(set); + + for (token, currency) in pairs { + perform_active_price_tick(services, (token, currency), block)?; } } Ok(()) @@ -136,14 +143,26 @@ pub fn invalidate_active_price(services: &Arc, block: &BlockContext) - _ => 120, }; if block.height % block_interval == 0 { - let price_tickers = services + let mut set: HashSet<(Token, Currency)> = HashSet::new(); + let pairs = services .price_ticker .by_id .list(None, SortOrder::Descending)? - .flatten() - .collect::>(); + .flat_map(|item| { + let ((_, _, token, currency), _) = item?; + set.insert((token, currency)); + Ok::, Error>(set.clone()) + }) + .next() + .unwrap_or(set); + + // convert to vector to reverse the hashset is required + let mut vec = Vec::new(); + for pair in pairs { + vec.insert(0, pair); + } - for ((token, currency), _) in price_tickers.into_iter().rev() { + for (token, currency) in vec { services.oracle_price_active.by_id.delete(&( token, currency, diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 6ca9d02aeb..c9fb315fd3 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -360,7 +360,6 @@ fn index_set_oracle_data( pairs: &HashSet<(Token, Currency)>, ) -> Result<()> { let oracle_repo = &services.oracle_price_aggregated; - let ticker_repo = &services.price_ticker; for pair in pairs { let price_aggregated = map_price_aggregated(services, context, pair)?; @@ -380,15 +379,14 @@ fn index_set_oracle_data( ); oracle_repo.by_id.put(&id, &price_aggregated)?; - let key = ( + let id = ( price_aggregated.aggregated.oracles.total.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), token, currency, ); - ticker_repo.by_key.put(&key, pair)?; - ticker_repo.by_id.put( - &pair.clone(), + services.price_ticker.by_id.put( + &id, &PriceTicker { price: price_aggregated, }, diff --git a/lib/ain-ocean/src/lib.rs b/lib/ain-ocean/src/lib.rs index c72b6a9f67..bde84b9312 100644 --- a/lib/ain-ocean/src/lib.rs +++ b/lib/ain-ocean/src/lib.rs @@ -93,7 +93,6 @@ pub struct OracleHistoryService { pub struct PriceTickerService { by_id: PriceTicker, - by_key: PriceTickerKey, } pub struct ScriptActivityService { @@ -196,7 +195,6 @@ impl Services { }, price_ticker: PriceTickerService { by_id: PriceTicker::new(Arc::clone(&store)), - by_key: PriceTickerKey::new(Arc::clone(&store)), }, script_activity: ScriptActivityService { by_id: ScriptActivity::new(Arc::clone(&store)), diff --git a/lib/ain-ocean/src/model/price_ticker.rs b/lib/ain-ocean/src/model/price_ticker.rs index 777e693649..158fb96157 100644 --- a/lib/ain-ocean/src/model/price_ticker.rs +++ b/lib/ain-ocean/src/model/price_ticker.rs @@ -2,8 +2,7 @@ use serde::{Deserialize, Serialize}; use super::oracle_price_aggregated::OraclePriceAggregated; -pub type PriceTickerId = (String, String); //token-currency -pub type PriceTickerKey = ([u8; 4], [u8; 4], String, String); // total-height-token-currency +pub type PriceTickerId = ([u8; 4], [u8; 4], String, String); // total-height-token-currency #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/storage/mod.rs b/lib/ain-ocean/src/storage/mod.rs index 92ef6cfc9b..1f373022c4 100644 --- a/lib/ain-ocean/src/storage/mod.rs +++ b/lib/ain-ocean/src/storage/mod.rs @@ -372,15 +372,6 @@ define_table! { } } -define_table! { - #[derive(Debug)] - pub struct PriceTickerKey { - key_type = model::PriceTickerKey, - value_type = model::PriceTickerId, - }, - SecondaryIndex = PriceTicker -} - define_table! { #[derive(Debug)] pub struct RawBlock { @@ -517,7 +508,7 @@ define_table! { } } -pub const COLUMN_NAMES: [&str; 28] = [ +pub const COLUMN_NAMES: [&str; 27] = [ Block::NAME, BlockByHeight::NAME, MasternodeStats::NAME, @@ -534,7 +525,6 @@ pub const COLUMN_NAMES: [&str; 28] = [ PoolSwapAggregatedKey::NAME, PoolSwap::NAME, PriceTicker::NAME, - PriceTickerKey::NAME, RawBlock::NAME, ScriptActivity::NAME, ScriptAggregation::NAME, From e709239ef42fe5a4baf50f3ee16d055b7a019de4 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 26 Nov 2024 12:46:37 +0800 Subject: [PATCH 15/17] missing: script activity key usize to BE --- lib/ain-ocean/src/api/address.rs | 8 ++++---- lib/ain-ocean/src/indexer/mod.rs | 8 ++++---- lib/ain-ocean/src/model/script_activity.rs | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/ain-ocean/src/api/address.rs b/lib/ain-ocean/src/api/address.rs index a685b42119..be6ee17749 100644 --- a/lib/ain-ocean/src/api/address.rs +++ b/lib/ain-ocean/src/api/address.rs @@ -342,8 +342,8 @@ async fn list_transactions( _ => ScriptActivityTypeHex::Vout, }; let txid = Txid::from_str(txid)?; - let n = n.parse::()?; - Ok::<([u8; 4], ScriptActivityTypeHex, Txid, usize), Error>(( + let n = n.parse::()?.to_be_bytes(); + Ok::<([u8; 4], ScriptActivityTypeHex, Txid, [u8; 8]), Error>(( height, vin_vout_type, txid, @@ -352,10 +352,10 @@ async fn list_transactions( }) .transpose()? .unwrap_or(( - [u8::MAX, u8::MAX, u8::MAX, u8::MAX], + [0xffu8; 4], ScriptActivityTypeHex::Vout, Txid::from_byte_array([0xffu8; 32]), - usize::MAX, + [0xffu8; 8], )); let res = ctx diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index a0e78fb3e2..a572323dcf 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -128,7 +128,7 @@ fn index_script_activity_vin( block.height.to_be_bytes(), ScriptActivityTypeHex::Vin, vin.txid, - vin.vout, + vin.vout.to_be_bytes(), ); services.script_activity.by_id.put(&id, &script_activity)?; @@ -206,7 +206,7 @@ fn index_script_activity_vout(services: &Arc, vout: &Vout, ctx: &Conte block.height.to_be_bytes(), ScriptActivityTypeHex::Vout, tx.txid, - vout.n, + vout.n.to_be_bytes(), ); services.script_activity.by_id.put(&id, &script_activity)?; Ok(()) @@ -494,7 +494,7 @@ fn invalidate_script_activity_vin( height.to_be_bytes(), ScriptActivityTypeHex::Vin, vin.txid, - vin.vout, + vin.vout.to_be_bytes(), ); services.script_activity.by_id.delete(&id)?; @@ -534,7 +534,7 @@ fn invalidate_script_activity_vout( ctx.block.height.to_be_bytes(), ScriptActivityTypeHex::Vout, ctx.tx.txid, - vout.n, + vout.n.to_be_bytes(), ); services.script_activity.by_id.delete(&id)?; Ok(()) diff --git a/lib/ain-ocean/src/model/script_activity.rs b/lib/ain-ocean/src/model/script_activity.rs index daa40cf8f2..5e1fab5649 100644 --- a/lib/ain-ocean/src/model/script_activity.rs +++ b/lib/ain-ocean/src/model/script_activity.rs @@ -34,7 +34,7 @@ impl fmt::Display for ScriptActivityTypeHex { } } -pub type ScriptActivityId = ([u8; 32], [u8; 4], ScriptActivityTypeHex, Txid, usize); // (hid, block.height, type_hex, txid, index) +pub type ScriptActivityId = ([u8; 32], [u8; 4], ScriptActivityTypeHex, Txid, [u8; 8]); // (hid, block.height, type_hex, txid, index) #[derive(Debug, Serialize, Deserialize)] pub struct ScriptActivity { From e4dd6609101d035cb2e9ada20c95bc7a5c8be331 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 26 Nov 2024 12:52:02 +0800 Subject: [PATCH 16/17] fmt_rs --- lib/ain-ocean/src/api/prices.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index c2e5c261f9..d320e3b07d 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -154,14 +154,16 @@ async fn get_price( ) -> Result>> { let (token, currency) = parse_token_currency(&key)?; - let price_ticker = ctx.services.price_ticker.by_id.list(Some(( - [0xffu8; 4], - [0xffu8; 4], - token.clone(), - currency.clone(), - )), SortOrder::Descending)? - .next() - .transpose()?; + let price_ticker = ctx + .services + .price_ticker + .by_id + .list( + Some(([0xffu8; 4], [0xffu8; 4], token.clone(), currency.clone())), + SortOrder::Descending, + )? + .next() + .transpose()?; let Some((_, price_ticker)) = price_ticker else { return Ok(Response::new(None)); From 22c9c0684ad995023786894360a4cdbd05e5adcf Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 26 Nov 2024 22:34:07 +0800 Subject: [PATCH 17/17] use filter_map to perform set --- lib/ain-ocean/src/api/stats/cache.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/lib/ain-ocean/src/api/stats/cache.rs b/lib/ain-ocean/src/api/stats/cache.rs index de29ea27cf..cdc60a00b8 100644 --- a/lib/ain-ocean/src/api/stats/cache.rs +++ b/lib/ain-ocean/src/api/stats/cache.rs @@ -105,19 +105,15 @@ pub async fn get_count(ctx: &Arc) -> Result { .get_latest()? .map_or(0, |mn| mn.stats.count); - let mut set: HashSet<(Token, Currency)> = HashSet::new(); let prices = ctx .services .price_ticker .by_id .list(None, SortOrder::Descending)? - .flat_map(|item| { - let ((_, _, token, currency), _) = item?; - set.insert((token, currency)); - Ok::, Error>(set.clone()) + .filter_map(|item| { + item.ok().map(|((_, _, token, currency), _)| (token, currency)) }) - .next() - .unwrap_or(set); + .collect::>(); Ok(Count { blocks: 0,