Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate to fastnear #358

Merged
merged 9 commits into from
Dec 24, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
revert deeted changes for near state indexer
  • Loading branch information
kobayurii committed Dec 24, 2024
commit 4ebd3c20091c0f072cd37ced22f8860b3a85df67
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 87 additions & 0 deletions cache-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::FutureExt;
use near_indexer_primitives::near_primitives;

mod utils;

@@ -114,6 +115,92 @@ impl RedisCacheStorage {
}
}

#[derive(Clone)]
pub struct BlocksByFinalityCache {
cache_storage: RedisCacheStorage,
}

/// Sets the keys in Redis shared between the ReadRPC components about the most recent
/// blocks based on finalities (final or optimistic).
/// `final_height` of `optimistic_height` depending on `block_type` passed.
/// Additionally, sets the JSON serialized `StreamerMessage` into keys `final` or `optimistic`
/// accordingly.
impl BlocksByFinalityCache {
// Use redis database 0(default for redis) for handling the blocks by finality cache.
pub async fn new(redis_url: String) -> anyhow::Result<Self> {
Ok(Self {
cache_storage: RedisCacheStorage::new(redis_url, 0).await?,
})
}

pub async fn update_block_by_finality(
&self,
finality: near_indexer_primitives::near_primitives::types::Finality,
streamer_message: &near_indexer_primitives::StreamerMessage,
) -> anyhow::Result<()> {
let block_height = streamer_message.block.header.height;
let block_type = serde_json::to_string(&finality)?;

let last_height = self
.cache_storage
.get(format!("{}_height", block_type))
.await
.unwrap_or(0);

// If the block height is greater than the last height, update the block streamer message
// if we have a few indexers running, we need to make sure that we are not updating the same block
// or block which is already processed or block less than the last processed block
if block_height > last_height {
let json_streamer_message = serde_json::to_string(streamer_message)?;
// Update the last block height
// Create a clone of the redis client and redis cmd to avoid borrowing issues
let update_height_feature = self
.cache_storage
.set(format!("{}_height", block_type), block_height);

// Update the block streamer message
// Create a clone of the redis client and redis cmd to avoid borrowing issues
let update_stream_msg_feature =
self.cache_storage.set(block_type, json_streamer_message);

// Wait for both futures to complete
futures::future::join_all([
update_height_feature.boxed(),
update_stream_msg_feature.boxed(),
])
.await
.into_iter()
.collect::<anyhow::Result<()>>()?;
};

Ok(())
}

pub async fn get_block_by_finality(
&self,
finality: near_indexer_primitives::near_primitives::types::Finality,
) -> anyhow::Result<near_indexer_primitives::StreamerMessage> {
let block_type = serde_json::to_string(&finality)?;
let resp: String = self.cache_storage.get(block_type).await?;
Ok(serde_json::from_str(&resp)?)
}

pub async fn update_protocol_version(
&self,
protocol_version: near_primitives::types::ProtocolVersion,
) -> anyhow::Result<()> {
self.cache_storage
.set("protocol_version", protocol_version)
.await
}

pub async fn get_protocol_version(
&self,
) -> anyhow::Result<near_primitives::types::ProtocolVersion> {
self.cache_storage.get("protocol_version").await
}
}

#[derive(Clone)]
pub struct TxIndexerCache {
cache_storage: RedisCacheStorage,
1 change: 1 addition & 0 deletions near-state-indexer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ tokio = { version = "1.36.0", features = [
tokio-stream = "0.1"
tracing = "0.1.34"

cache-storage.workspace = true
configuration.workspace = true
database.workspace = true
logic-state-indexer.workspace = true
24 changes: 23 additions & 1 deletion near-state-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ use logic_state_indexer::{handle_streamer_message, NearClient, INDEXER};
mod configs;
mod metrics;
mod near_client;
mod utils;

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
@@ -58,6 +59,12 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> {
let state_indexer_config =
configuration::read_configuration::<configuration::NearStateIndexerConfig>().await?;

tracing::info!(target: INDEXER, "Connecting to redis...");
let finality_blocks_storage = cache_storage::BlocksByFinalityCache::new(
state_indexer_config.general.redis_url.to_string(),
)
.await?;

tracing::info!(target: INDEXER, "Setup near_indexer...");
let indexer_config = near_indexer::IndexerConfig {
home_dir,
@@ -73,7 +80,7 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> {
// Regular indexer process starts here
tracing::info!(target: INDEXER, "Instantiating the stream...");
let stream = indexer.streamer();
let (view_client, _) = indexer.client_actors();
let (view_client, client) = indexer.client_actors();

let genesis_config = indexer.near_config().genesis.config.clone();
let shard_layout = logic_state_indexer::configs::shard_layout(genesis_config).await?;
@@ -92,6 +99,21 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> {
near_client.clone(),
));

// Initiate the job of updating the optimistic blocks to Redis
tokio::spawn(utils::update_block_in_redis_by_finality(
view_client.clone(),
client.clone(),
finality_blocks_storage.clone(),
near_indexer_primitives::near_primitives::types::Finality::None,
));
// And the same job for the final blocks
tokio::spawn(utils::update_block_in_redis_by_finality(
view_client.clone(),
client.clone(),
finality_blocks_storage.clone(),
near_indexer_primitives::near_primitives::types::Finality::Final,
));

// ! Note that the `handle_streamer_message` doesn't interact with the Redis
tracing::info!(target: INDEXER, "Starting near_state_indexer...");
let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
110 changes: 110 additions & 0 deletions near-state-indexer/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use near_indexer::near_primitives;
use near_o11y::WithSpanContextExt;

const INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);

/// The universal function that fetches the block by the given finality.
/// It is used in the `update_block_in_redis_by_finality` function.
/// ! The function does not support the DoomSlug finality.
pub(crate) async fn fetch_block_by_finality(
client: &actix::Addr<near_client::ViewClientActor>,
finality: &near_primitives::types::Finality,
) -> anyhow::Result<near_primitives::views::BlockView> {
let block_reference = near_primitives::types::BlockReference::Finality(finality.clone());
Ok(client
.send(near_client::GetBlock(block_reference).with_span_context())
.await??)
}

pub(crate) async fn fetch_status(
client: &actix::Addr<near_client::ClientActor>,
) -> anyhow::Result<near_primitives::views::StatusResponse> {
tracing::debug!(target: crate::INDEXER, "Fetching status");
Ok(client
.send(
near_client::Status {
is_health_check: false,
detailed: false,
}
.with_span_context(),
)
.await??)
}

/// This function starts a busy-loop that does the similar job to the near-indexer one.
/// However, this one deals with the blocks by provided finality, and instead of streaming them to
/// the client, it stores the block directly to the Redis instance shared between
/// ReadRPC components.
pub async fn update_block_in_redis_by_finality(
view_client: actix::Addr<near_client::ViewClientActor>,
client: actix::Addr<near_client::ClientActor>,
finality_blocks_storage: cache_storage::BlocksByFinalityCache,
finality: near_primitives::types::Finality,
) {
let block_type = serde_json::to_string(&finality).unwrap();
tracing::info!(target: crate::INDEXER, "Starting [{}] block update job...", block_type);

let mut last_stored_block_height: Option<near_primitives::types::BlockHeight> = None;
loop {
tokio::time::sleep(INTERVAL).await;

if let Ok(status) = fetch_status(&client).await {
// Update protocol version in Redis
// This is need for read-rpc to know the current protocol version
if let Err(err) = finality_blocks_storage
.update_protocol_version(status.protocol_version)
.await
{
tracing::error!(
target: crate::INDEXER,
"Failed to update protocol version in Redis: {:?}", err
);
};

// If the node is not fully synced the optimistic blocks are outdated
// and are useless for our case. To avoid any misleading in our Redis
// we don't update blocks until the node is fully synced.
if status.sync_info.syncing {
continue;
}
}

if let Ok(block) = fetch_block_by_finality(&view_client, &finality).await {
let height = block.header.height;
if let Some(block_height) = last_stored_block_height {
if height <= block_height {
continue;
} else {
last_stored_block_height = Some(height);
}
} else {
last_stored_block_height = Some(height);
};
let response = near_indexer::build_streamer_message(&view_client, block).await;
match response {
Ok(streamer_message) => {
tracing::debug!(target: crate::INDEXER, "[{}] block {:?}", block_type, last_stored_block_height);
if let Err(err) = finality_blocks_storage
.update_block_by_finality(
near_primitives::types::Finality::None,
&streamer_message,
)
.await
{
tracing::error!(
target: crate::INDEXER,
"Failed to publish [{}] block streamer message: {:?}", block_type, err
);
};
}
Err(err) => {
tracing::error!(
target: crate::INDEXER,
"Missing data, skipping block #{}...", height
);
tracing::error!(target: crate::INDEXER, "{:#?}", err);
}
}
};
}
}
2 changes: 1 addition & 1 deletion rpc-server/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -123,7 +123,7 @@ lazy_static! {
pub(crate) static ref REQUESTS_BLOCKS_COUNTERS: IntCounterVec = register_int_counter_vec(
"requests_blocks_counters",
"Total number of requests blocks from Lake and Cache",
&["method_name", "source"] // // This declares a label named `method_name` and `source`(lake or cache)
&["method_name", "source"] // This declares a label named `method_name` and `source`(lake or cache)
).unwrap();

// Error metrics
Loading