Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate to fastnear #358

Merged
merged 9 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/near/read-rpc/compare/main...develop)

### What's Changed
* Migrate from lake data to fastnear data
* Add metrics to calculate the number of blocks which fetched from the cache and fastnear

## [0.3.3](https://github.com/near/read-rpc/releases/tag/v0.3.3)

### Supported Nearcore Version
Expand Down
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ license.workspace = true

[dependencies]
anyhow = "1.0.70"
aws-credential-types = "1.1.4"
aws-sdk-s3 = { version = "1.14.0", features = ["behavior-version-latest"] }
aws-types = "1.1.4"
dotenv = "0.15.0"
google-cloud-storage = "0.23.0"
lazy_static = "1.4.0"
Expand Down
83 changes: 26 additions & 57 deletions configuration/src/configs/lake.rs
Original file line number Diff line number Diff line change
@@ -1,86 +1,55 @@
use aws_sdk_s3::config::StalledStreamProtectionConfig;
use crate::configs::deserialize_optional_data_or_env;
use near_lake_framework::near_indexer_primitives::near_primitives;
use serde_derive::Deserialize;

use crate::configs::{deserialize_optional_data_or_env, required_value_or_panic};

#[derive(Debug, Clone)]
pub struct LakeConfig {
pub aws_access_key_id: String,
pub aws_secret_access_key: String,
pub aws_default_region: String,
pub aws_bucket_name: String,
pub num_threads: Option<u64>,
}

impl LakeConfig {
pub async fn s3_config(&self) -> aws_sdk_s3::Config {
let credentials = aws_credential_types::Credentials::new(
&self.aws_access_key_id,
&self.aws_secret_access_key,
None,
None,
"",
);
aws_sdk_s3::Config::builder()
.stalled_stream_protection(StalledStreamProtectionConfig::disabled())
.credentials_provider(credentials)
.region(aws_types::region::Region::new(
self.aws_default_region.clone(),
))
.build()
}

pub async fn lake_config(
&self,
start_block_height: near_primitives::types::BlockHeight,
) -> anyhow::Result<near_lake_framework::LakeConfig> {
let config_builder = near_lake_framework::LakeConfigBuilder::default();
chain_id: crate::ChainId,
) -> anyhow::Result<near_lake_framework::FastNearConfig> {
let mut config_builder = near_lake_framework::FastNearConfigBuilder::default();
match chain_id {
crate::ChainId::Mainnet => config_builder = config_builder.mainnet(),
// Testnet is the default chain for other chain_id
_ => config_builder = config_builder.testnet(),
};
if let Some(num_threads) = self.num_threads {
config_builder = config_builder.num_threads(num_threads);
};
Ok(config_builder
.s3_config(self.s3_config().await)
.s3_region_name(&self.aws_default_region)
.s3_bucket_name(&self.aws_bucket_name)
.start_block_height(start_block_height)
.build()
.expect("Failed to build LakeConfig"))
.build()?)
}

pub async fn lake_s3_client(&self) -> near_lake_framework::LakeS3Client {
let s3_config = self.s3_config().await;
near_lake_framework::LakeS3Client::new(aws_sdk_s3::Client::from_conf(s3_config))
pub async fn lake_client(
&self,
chain_id: crate::ChainId,
) -> anyhow::Result<near_lake_framework::FastNearClient> {
let fast_near_endpoint = match chain_id {
crate::ChainId::Mainnet => String::from("https://mainnet.neardata.xyz"),
// Testnet is the default chain for other chain_id
_ => String::from("https://testnet.neardata.xyz"),
};
Ok(near_lake_framework::FastNearClient::new(fast_near_endpoint))
}
}

#[derive(Deserialize, Debug, Clone, Default)]
pub struct CommonLakeConfig {
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub aws_access_key_id: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub aws_secret_access_key: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub aws_default_region: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub aws_bucket_name: Option<String>,
pub num_threads: Option<u64>,
}

impl From<CommonLakeConfig> for LakeConfig {
fn from(common_config: CommonLakeConfig) -> Self {
Self {
aws_access_key_id: required_value_or_panic(
"aws_access_key_id",
common_config.aws_access_key_id,
),
aws_secret_access_key: required_value_or_panic(
"aws_secret_access_key",
common_config.aws_secret_access_key,
),
aws_default_region: required_value_or_panic(
"aws_default_region",
common_config.aws_default_region,
),
aws_bucket_name: required_value_or_panic(
"aws_bucket_name",
common_config.aws_bucket_name,
),
num_threads: common_config.num_threads,
}
}
}
15 changes: 3 additions & 12 deletions configuration/src/default_env_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,9 @@ tracked_changes = "${TRACKED_CHANGES}"

### Lake framework configuration
[lake_config]

## Lake framework AWS access key id
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"

## Lake framework AWS secret access key
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"

## Lake framework AWS default region
aws_default_region = "${AWS_DEFAULT_REGION}"

## Lake framework bucket name
aws_bucket_name = "${AWS_BUCKET_NAME}"
# Number of threads to use for fetching data from fatnear
# Default: 2 * available threads
#num_threads = 8

## Transaction details are stored in the Google Cloud Storage
[tx_details_storage]
Expand Down
64 changes: 30 additions & 34 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::executor::block_on;
use near_primitives::epoch_manager::{AllEpochConfig, EpochConfig};

use crate::modules::blocks::{BlocksInfoByFinality, CacheBlock};
use crate::utils;

static NEARD_VERSION: &str = env!("CARGO_PKG_VERSION");
static NEARD_BUILD: &str = env!("BUILD_VERSION");
Expand All @@ -20,8 +21,7 @@ pub struct GenesisInfo {
impl GenesisInfo {
pub async fn get(
near_rpc_client: &crate::utils::JsonRpcClient,
s3_client: &near_lake_framework::LakeS3Client,
s3_bucket_name: &str,
fastnear_client: &near_lake_framework::FastNearClient,
) -> Self {
tracing::info!("Get genesis config...");
let genesis_config = near_rpc_client
Expand All @@ -32,25 +32,20 @@ impl GenesisInfo {
.await
.expect("Error to get genesis config");

let genesis_block = near_lake_framework::s3::fetchers::fetch_block(
s3_client,
s3_bucket_name,
genesis_config.genesis_height,
)
.await
.expect("Error to get genesis block");
let genesis_block =
near_lake_framework::fastnear::fetchers::fetch_first_block(fastnear_client).await;

Self {
genesis_config,
genesis_block_cache: CacheBlock::from(&genesis_block),
genesis_block_cache: CacheBlock::from(&genesis_block.block),
}
}
}

#[derive(Clone)]
pub struct ServerContext {
/// Lake s3 client
pub s3_client: near_lake_framework::LakeS3Client,
/// Fastnear client
pub fastnear_client: near_lake_framework::FastNearClient,
/// Database manager
pub db_manager: std::sync::Arc<Box<dyn database::ReaderDbManager + Sync + Send + 'static>>,
/// TransactionDetails storage
Expand All @@ -61,8 +56,6 @@ pub struct ServerContext {
pub genesis_info: GenesisInfo,
/// Near rpc client
pub near_rpc_client: crate::utils::JsonRpcClient,
/// AWS s3 lake bucket name
pub s3_bucket_name: String,
/// Blocks cache
pub blocks_cache: std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, CacheBlock>>,
/// Final block info include final_block_cache and current_validators_info
Expand All @@ -89,27 +82,36 @@ pub struct ServerContext {
}

impl ServerContext {
pub async fn init(
rpc_server_config: configuration::RpcServerConfig,
near_rpc_client: crate::utils::JsonRpcClient,
) -> anyhow::Result<Self> {
pub async fn init(rpc_server_config: configuration::RpcServerConfig) -> anyhow::Result<Self> {
let contract_code_cache_size_in_bytes =
crate::utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size)
.await;
utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size).await;
let contract_code_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new(
contract_code_cache_size_in_bytes,
));

let block_cache_size_in_bytes =
crate::utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await;
utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await;
let blocks_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new(
block_cache_size_in_bytes,
));

let blocks_info_by_finality =
std::sync::Arc::new(BlocksInfoByFinality::new(&near_rpc_client, &blocks_cache).await);

let s3_client = rpc_server_config.lake_config.lake_s3_client().await;
let near_rpc_client = utils::JsonRpcClient::new(
rpc_server_config.general.near_rpc_url.clone(),
rpc_server_config.general.near_archival_rpc_url.clone(),
);
// We want to set a custom referer to let NEAR JSON RPC nodes know that we are a read-rpc instance
let near_rpc_client = near_rpc_client.header(
"Referer".to_string(),
rpc_server_config.general.referer_header_value.clone(),
)?;

let fastnear_client = rpc_server_config
.lake_config
.lake_client(rpc_server_config.general.chain_id)
.await?;

let blocks_info_by_finality = std::sync::Arc::new(
BlocksInfoByFinality::new(&near_rpc_client, &fastnear_client).await,
);

let tx_details_storage = tx_details_storage::TxDetailsStorage::new(
rpc_server_config.tx_details_storage.storage_client().await,
Expand All @@ -124,12 +126,7 @@ impl ServerContext {
})
.ok();

let genesis_info = GenesisInfo::get(
&near_rpc_client,
&s3_client,
&rpc_server_config.lake_config.aws_bucket_name,
)
.await;
let genesis_info = GenesisInfo::get(&near_rpc_client, &fastnear_client).await;

let default_epoch_config = EpochConfig::from(&genesis_info.genesis_config);
let all_epoch_config = AllEpochConfig::new(
Expand Down Expand Up @@ -159,13 +156,12 @@ impl ServerContext {
.inc();

Ok(Self {
s3_client,
fastnear_client,
db_manager: std::sync::Arc::new(Box::new(db_manager)),
tx_details_storage: std::sync::Arc::new(tx_details_storage),
tx_cache_storage,
genesis_info,
near_rpc_client,
s3_bucket_name: rpc_server_config.lake_config.aws_bucket_name.clone(),
blocks_cache,
blocks_info_by_finality,
compiled_contract_code_cache,
Expand Down
Loading
Loading