Skip to content

Commit

Permalink
Indexing epochs data (#142)
Browse files Browse the repository at this point in the history
* add epoch indexer

* epoch indexer improvement

* add gas_price endpoint

* add validators endpoint

* add protocol_config endpoint

* fix epoch indexer bug

* improvement according pr comments

* composite primary key to epoch_id
  • Loading branch information
kobayurii authored Dec 14, 2023
1 parent 0154109 commit 05f88e0
Show file tree
Hide file tree
Showing 35 changed files with 1,530 additions and 129 deletions.
239 changes: 166 additions & 73 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ resolver = "2"

members = [
"database",
"epoch-indexer",
"perf-testing",
"readnode-primitives",
"rpc-server",
Expand Down
7 changes: 4 additions & 3 deletions database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ prettytable-rs = { version = "0.10", optional = true }
num-bigint = { version = "0.3", optional = true }
num-traits = { version = "0.2.15", optional = true }
scylla = { version = "0.9.0", optional = true }
serde = { version = "1.0.145", features = ["derive"], optional = true }
serde_json = { version = "1.0.85", optional = true }
serde = { version = "1.0.145", features = ["derive"]}
serde_json = "1.0.85"
tokio = { version = "1.19.2", features = [
"sync",
"time",
Expand All @@ -29,13 +29,14 @@ uuid = { version = "1.3.0", optional = true }

readnode-primitives = { path = "../readnode-primitives" }

near-chain-configs = "1.36.0"
near-primitives = "1.36.0"
near-crypto = "1.36.0"
near-indexer-primitives = "1.36.0"

[features]
default = ["scylla_db"]
postgres_db = ["dep:diesel", "dep:diesel-async", "dep:bigdecimal", "dep:serde", "dep:serde_json"]
postgres_db = ["dep:diesel", "dep:diesel-async", "dep:bigdecimal"]
scylla_db = ["dep:scylla", "dep:num-bigint", "dep:num-traits"]
scylla_db_tracing = ["dep:prettytable-rs", "dep:uuid", "scylla_db"]
account_access_keys = []
9 changes: 9 additions & 0 deletions database/src/base/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,13 @@ pub trait ReaderDbManager {
block_height: near_primitives::types::BlockHeight,
shard_id: near_primitives::types::ShardId,
) -> anyhow::Result<readnode_primitives::BlockHeightShardId>;
async fn get_validators_by_epoch_id(
&self,
epoch_id: near_primitives::hash::CryptoHash,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo>;

async fn get_protocol_config_by_epoch_id(
&self,
epoch_id: near_primitives::hash::CryptoHash,
) -> anyhow::Result<near_chain_configs::ProtocolConfigView>;
}
15 changes: 15 additions & 0 deletions database/src/base/state_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,19 @@ pub trait StateIndexerDbManager {

async fn update_meta(&self, indexer_id: &str, block_height: u64) -> anyhow::Result<()>;
async fn get_last_processed_block_height(&self, indexer_id: &str) -> anyhow::Result<u64>;
async fn add_validators(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_height: u64,
epoch_start_height: u64,
validators_info: &near_primitives::views::EpochValidatorInfo,
) -> anyhow::Result<()>;

async fn add_protocol_config(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_height: u64,
epoch_start_height: u64,
protocol_config: &near_chain_configs::ProtocolConfigView,
) -> anyhow::Result<()>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS validators;
DROP TABLE IF EXISTS protocol_configs;
20 changes: 20 additions & 0 deletions database/src/postgres/migrations/2023-12-07-144820_epochs/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CREATE TABLE IF NOT EXISTS validators (
epoch_id text NOT NULL,
epoch_height numeric(20,0) NOT NULL,
epoch_start_height numeric(20,0) NOT NULL,
validators_info jsonb NOT NULL
);

ALTER TABLE ONLY validators
ADD CONSTRAINT validators_pk PRIMARY KEY (epoch_id);


CREATE TABLE IF NOT EXISTS protocol_configs (
epoch_id text NOT NULL,
epoch_height numeric(20,0) NOT NULL,
epoch_start_height numeric(20,0) NOT NULL,
protocol_config jsonb NOT NULL
);

ALTER TABLE ONLY protocol_configs
ADD CONSTRAINT protocol_config_pk PRIMARY KEY (epoch_id);
72 changes: 72 additions & 0 deletions database/src/postgres/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,3 +603,75 @@ impl Meta {
Ok(response.last_processed_block_height)
}
}

#[derive(Insertable, Queryable, Selectable)]
#[diesel(table_name = validators)]
pub struct Validators {
pub epoch_id: String,
pub epoch_height: bigdecimal::BigDecimal,
pub epoch_start_height: bigdecimal::BigDecimal,
pub validators_info: serde_json::Value,
}

impl Validators {
pub async fn insert_or_ignore(
&self,
mut conn: crate::postgres::PgAsyncConn,
) -> anyhow::Result<()> {
diesel::insert_into(validators::table)
.values(self)
.on_conflict_do_nothing()
.execute(&mut conn)
.await?;
Ok(())
}

pub async fn get_validators(
mut conn: crate::postgres::PgAsyncConn,
epoch_id: near_indexer_primitives::CryptoHash,
) -> anyhow::Result<Self> {
let response = validators::table
.filter(validators::epoch_id.eq(epoch_id.to_string()))
.select(Self::as_select())
.first(&mut conn)
.await?;

Ok(response)
}
}

#[derive(Insertable, Queryable, Selectable)]
#[diesel(table_name = protocol_configs)]
pub struct ProtocolConfig {
pub epoch_id: String,
pub epoch_height: bigdecimal::BigDecimal,
pub epoch_start_height: bigdecimal::BigDecimal,
pub protocol_config: serde_json::Value,
}

impl ProtocolConfig {
pub async fn insert_or_ignore(
&self,
mut conn: crate::postgres::PgAsyncConn,
) -> anyhow::Result<()> {
diesel::insert_into(protocol_configs::table)
.values(self)
.on_conflict_do_nothing()
.execute(&mut conn)
.await?;
Ok(())
}

pub async fn get_protocol_config(
mut conn: crate::postgres::PgAsyncConn,
epoch_id: near_indexer_primitives::CryptoHash,
) -> anyhow::Result<Self> {
let response = protocol_configs::table
.filter(protocol_configs::epoch_id.eq(epoch_id.to_string()))
.select(Self::as_select())
.first(&mut conn)
.await?;

Ok(response)
}
}
43 changes: 43 additions & 0 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,47 @@ impl crate::ReaderDbManager for PostgresDBManager {
))
})
}

async fn get_validators_by_epoch_id(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo> {
let epoch = crate::models::Validators::get_validators(
Self::get_connection(&self.pg_pool).await?,
epoch_id,
)
.await?;
let epoch_height = epoch
.epoch_height
.to_u64()
.ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_height` to u64"))?;
let epoch_start_height = epoch
.epoch_start_height
.to_u64()
.ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_start_height` to u64"))?;
let (validators_info,) = serde_json::from_value::<(
near_indexer_primitives::views::EpochValidatorInfo,
)>(epoch.validators_info)?;
Ok(readnode_primitives::EpochValidatorsInfo {
epoch_id,
epoch_height,
epoch_start_height,
validators_info,
})
}

async fn get_protocol_config_by_epoch_id(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
) -> anyhow::Result<near_chain_configs::ProtocolConfigView> {
let protocol_config = crate::models::ProtocolConfig::get_protocol_config(
Self::get_connection(&self.pg_pool).await?,
epoch_id,
)
.await?;
let (protocol_config,) = serde_json::from_value::<(near_chain_configs::ProtocolConfigView,)>(
protocol_config.protocol_config,
)?;
Ok(protocol_config)
}
}
20 changes: 20 additions & 0 deletions database/src/postgres/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ diesel::table! {
}
}

diesel::table! {
protocol_configs (epoch_id) {
epoch_id -> Text,
epoch_height -> Numeric,
epoch_start_height -> Numeric,
protocol_config -> Jsonb,
}
}

diesel::table! {
receipt_map (receipt_id) {
receipt_id -> Text,
Expand Down Expand Up @@ -113,11 +122,21 @@ diesel::table! {
}
}

diesel::table! {
validators (epoch_id) {
epoch_id -> Text,
epoch_height -> Numeric,
epoch_start_height -> Numeric,
validators_info -> Jsonb,
}
}

diesel::allow_tables_to_appear_in_same_query!(
account_state,
block,
chunk,
meta,
protocol_configs,
receipt_map,
receipt_outcome,
state_changes_access_key,
Expand All @@ -127,4 +146,5 @@ diesel::allow_tables_to_appear_in_same_query!(
state_changes_data,
transaction_cache,
transaction_detail,
validators,
);
35 changes: 35 additions & 0 deletions database/src/postgres/state_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,39 @@ impl crate::StateIndexerDbManager for PostgresDBManager {
.to_u64()
.ok_or_else(|| anyhow::anyhow!("Failed to parse `block_height` to u64"))
}
async fn add_validators(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_height: u64,
epoch_start_height: u64,
validators_info: &near_primitives::views::EpochValidatorInfo,
) -> anyhow::Result<()> {
crate::models::Validators {
epoch_id: epoch_id.to_string(),
epoch_height: bigdecimal::BigDecimal::from(epoch_height),
epoch_start_height: bigdecimal::BigDecimal::from(epoch_start_height),
validators_info: serde_json::to_value(validators_info)?,
}
.insert_or_ignore(Self::get_connection(&self.pg_pool).await?)
.await?;
Ok(())
}

async fn add_protocol_config(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_height: u64,
epoch_start_height: u64,
protocol_config: &near_chain_configs::ProtocolConfigView,
) -> anyhow::Result<()> {
crate::models::ProtocolConfig {
epoch_id: epoch_id.to_string(),
epoch_height: bigdecimal::BigDecimal::from(epoch_height),
epoch_start_height: bigdecimal::BigDecimal::from(epoch_start_height),
protocol_config: serde_json::to_value(protocol_config)?,
}
.insert_or_ignore(Self::get_connection(&self.pg_pool).await?)
.await?;
Ok(())
}
}
55 changes: 55 additions & 0 deletions database/src/scylladb/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub struct ScyllaDBManager {
get_receipt: PreparedStatement,
get_transaction_by_hash: PreparedStatement,
get_stored_at_block_height_and_shard_id_by_block_height: PreparedStatement,
get_validators_by_epoch_id: PreparedStatement,
get_protocol_config_by_epoch_id: PreparedStatement,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -115,6 +117,14 @@ impl ScyllaStorageManager for ScyllaDBManager {
&scylla_db_session,
"SELECT stored_at_block_height, shard_id FROM state_indexer.chunks WHERE block_height = ?",
).await?,
get_validators_by_epoch_id: Self::prepare_read_query(
&scylla_db_session,
"SELECT epoch_height, validators_info FROM state_indexer.validators WHERE epoch_id = ?",
).await?,
get_protocol_config_by_epoch_id: Self::prepare_read_query(
&scylla_db_session,
"SELECT protocol_config FROM state_indexer.protocol_configs WHERE epoch_id = ?",
).await?,
}))
}
}
Expand Down Expand Up @@ -409,4 +419,49 @@ impl crate::ReaderDbManager for ScyllaDBManager {
))
})
}

async fn get_validators_by_epoch_id(
&self,
epoch_id: near_primitives::hash::CryptoHash,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo> {
let (epoch_height, validators_info) = Self::execute_prepared_query(
&self.scylla_session,
&self.get_validators_by_epoch_id,
(epoch_id.to_string(),),
)
.await?
.single_row()?
.into_typed::<(num_bigint::BigInt, String)>()?;

let validators_info: near_primitives::views::EpochValidatorInfo =
serde_json::from_str(&validators_info)?;

Ok(readnode_primitives::EpochValidatorsInfo {
epoch_id,
epoch_height: epoch_height
.to_u64()
.ok_or_else(|| anyhow::anyhow!("Failed to parse `epoch_height` to u64"))?,
epoch_start_height: validators_info.epoch_start_height,
validators_info,
})
}

async fn get_protocol_config_by_epoch_id(
&self,
epoch_id: near_primitives::hash::CryptoHash,
) -> anyhow::Result<near_chain_configs::ProtocolConfigView> {
let (protocol_config,) = Self::execute_prepared_query(
&self.scylla_session,
&self.get_protocol_config_by_epoch_id,
(epoch_id.to_string(),),
)
.await?
.single_row()?
.into_typed::<(String,)>()?;

let protocol_config: near_chain_configs::ProtocolConfigView =
serde_json::from_str(&protocol_config)?;

Ok(protocol_config)
}
}
Loading

0 comments on commit 05f88e0

Please sign in to comment.