Skip to content

Commit

Permalink
reffactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Oct 10, 2024
1 parent b7291e0 commit a010067
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 173 deletions.
21 changes: 13 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,17 @@ extern crate derive_builder;
pub use near_indexer_primitives;

pub use aws_credential_types::Credentials;
pub use providers::fastnear::client as fastnear_client;

mod providers;

pub use providers::fastnear;
pub use providers::s3;

pub use providers::fastnear::client::FastNearClient;
pub use providers::fastnear::types::{FastNearConfig, FastNearConfigBuilder};
pub use providers::s3::client as s3_client;

pub use providers::s3::client::LakeS3Client;
pub use providers::s3::types::{LakeConfig, LakeConfigBuilder};
pub mod providers;

pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework";

Expand Down Expand Up @@ -317,11 +323,10 @@ pub fn streamer<T: Into<providers::NearLakeFrameworkConfig>>(
let (sender, receiver) = tokio::sync::mpsc::channel(config.blocks_preload_pool_size());
match config {
providers::NearLakeFrameworkConfig::Lake(config) => {
(tokio::spawn(providers::s3::start(sender, config)), receiver)
(tokio::spawn(s3::start(sender, config)), receiver)
}
providers::NearLakeFrameworkConfig::FastNear(config) => {
(tokio::spawn(fastnear::start(sender, config)), receiver)
}
providers::NearLakeFrameworkConfig::FastNear(config) => (
tokio::spawn(providers::fastnear::start(sender, config)),
receiver,
),
}
}
9 changes: 8 additions & 1 deletion src/providers/fastnear/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use super::types;
/// It is used to fetch the blocks from the FastNear
#[derive(Clone, Debug)]
pub struct FastNearClient {
client: reqwest::Client,
endpoint: String,
client: reqwest::Client,
}

impl FastNearClient {
Expand All @@ -16,6 +16,13 @@ impl FastNearClient {
}
}

pub fn from_conf(config: &types::FastNearConfig) -> Self {
Self {
endpoint: config.endpoint.clone(),
client: reqwest::Client::new(),
}
}

/// Fetches the block from the FastNear API
/// Returns the result in `Option<near_indexer_primitives::StreamerMessage>`
/// If the block does not exist, returns `None`
Expand Down
15 changes: 15 additions & 0 deletions src/providers/fastnear/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ pub async fn fetch_optimistic_block(
.expect("Failed to fetch optimistic block")
}

/// Fetches the optimistic block from the fastenar
/// This function is used to fetch the optimistic block by height
/// This function will be using endpoint `/v0/block_opt/:block_height`
/// This would be waiting some time until the optimistic block is available
/// Returns `near_indexer_primitives::StreamerMessage` if the block is available
/// Returns `None` if the block height is skipped
pub async fn fetch_optimistic_block_by_height(
client: &FastNearClient,
block_height: types::BlockHeight,
) -> Option<near_indexer_primitives::StreamerMessage> {
client
.fetch_until_success(&format!("/v0/block_opt/{}", block_height))
.await
}

/// Fetches the genesis block from the fastenar
/// Returns `near_indexer_primitives::StreamerMessage`
pub async fn fetch_first_block(
Expand Down
4 changes: 3 additions & 1 deletion src/providers/fastnear/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::FastNearClient;

pub mod client;
pub mod fetchers;
pub mod types;
Expand All @@ -12,7 +14,7 @@ pub async fn start(
blocks_sink: tokio::sync::mpsc::Sender<near_indexer_primitives::StreamerMessage>,
config: types::FastNearConfig,
) -> anyhow::Result<()> {
let client = config.client();
let client = FastNearClient::from_conf(&config);
let max_num_threads = config.num_threads;
let next_sink_block =
std::sync::Arc::new(std::sync::atomic::AtomicU64::new(config.start_block_height));
Expand Down
6 changes: 0 additions & 6 deletions src/providers/fastnear/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ pub struct FastNearConfig {
pub(crate) blocks_preload_pool_size: usize,
}

impl FastNearConfig {
pub fn client(&self) -> crate::fastnear_client::FastNearClient {
crate::fastnear_client::FastNearClient::new(self.endpoint.clone())
}
}

impl FastNearConfigBuilder {
/// Shortcut to set up [FastNearConfigBuilder] for mainnet
/// ```
Expand Down
148 changes: 148 additions & 0 deletions src/providers/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,151 @@ pub trait S3Client: Send + Sync {
start_after_prefix: &str,
) -> Result<Vec<String>, ListCommonPrefixesError>;
}

#[derive(Clone, Debug)]
pub struct LakeS3Client {
s3: aws_sdk_s3::Client,
}

impl LakeS3Client {
pub fn new(s3: aws_sdk_s3::Client) -> Self {
Self { s3 }
}

pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self {
let s3_client = aws_sdk_s3::Client::from_conf(config);

Self { s3: s3_client }
}
}

#[async_trait]
impl S3Client for LakeS3Client {
async fn get_object_bytes(
&self,
bucket: &str,
prefix: &str,
) -> Result<Vec<u8>, GetObjectBytesError> {
let object = self
.s3
.get_object()
.bucket(bucket)
.key(prefix)
.request_payer(aws_sdk_s3::types::RequestPayer::Requester)
.send()
.await?;

let bytes = object.body.collect().await?.into_bytes().to_vec();

Ok(bytes)
}

async fn list_common_prefixes(
&self,
bucket: &str,
start_after_prefix: &str,
) -> Result<Vec<String>, ListCommonPrefixesError> {
let response = self
.s3
.list_objects_v2()
.max_keys(1000) // 1000 is the default and max value for this parameter
.delimiter("/".to_string())
.start_after(start_after_prefix)
.request_payer(aws_sdk_s3::types::RequestPayer::Requester)
.bucket(bucket)
.send()
.await?;

let prefixes = match response.common_prefixes {
None => vec![],
Some(common_prefixes) => common_prefixes
.into_iter()
.filter_map(|common_prefix| common_prefix.prefix)
.collect::<Vec<String>>()
.into_iter()
.filter_map(|prefix_string| prefix_string.split('/').next().map(String::from))
.collect(),
};

Ok(prefixes)
}
}

#[cfg(test)]
mod test {
use super::*;

use std::sync::Arc;

use crate::providers::s3::fetchers::fetch_streamer_message;
use async_trait::async_trait;

#[derive(Clone, Debug)]
pub struct LakeS3Client {}

#[async_trait]
impl S3Client for LakeS3Client {
async fn get_object_bytes(
&self,
_bucket: &str,
prefix: &str,
) -> Result<Vec<u8>, GetObjectBytesError> {
let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix);
tokio::fs::read(path)
.await
.map_err(|e| GetObjectBytesError(Arc::new(e)))
}

async fn list_common_prefixes(
&self,
_bucket: &str,
_start_after: &str,
) -> Result<Vec<String>, ListCommonPrefixesError> {
Ok(Vec::new())
}
}

#[tokio::test]
async fn deserializes_meta_transactions() {
let lake_client = LakeS3Client {};
let streamer_message =
fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765)
.await
.unwrap();

let delegate_action = &streamer_message.shards[0]
.chunk
.as_ref()
.unwrap()
.transactions[0]
.transaction
.actions[0];

assert_eq!(
serde_json::to_value(delegate_action).unwrap(),
serde_json::json!({
"Delegate": {
"delegate_action": {
"sender_id": "test.near",
"receiver_id": "test.near",
"actions": [
{
"AddKey": {
"public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg",
"access_key": {
"nonce": 0,
"permission": "FullAccess"
}
}
}
],
"nonce": 879546,
"max_block_height": 100,
"public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib"
},
"signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS"
}
})
);
}
}
Loading

0 comments on commit a010067

Please sign in to comment.