Skip to content

Commit

Permalink
feat:refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Kai <[email protected]>
  • Loading branch information
GrapeBaBa committed Jul 24, 2024
1 parent 13c668d commit f7e8791
Show file tree
Hide file tree
Showing 10 changed files with 500 additions and 85 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Build
run: cargo build --verbose
run: cargo build --workspace --verbose
- name: Lint
run: cargo clippy --workspace --verbose
- name: Format
run: cargo fmt --all -- --check
- name: Run tests
run: cargo test --verbose
run: cargo test --workspace --verbose
29 changes: 22 additions & 7 deletions bin/archiver/src/archiver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use eth2::{BeaconNodeHttpClient, Error};
use blob_archiver_storage::{BlobData, BlobSidecars, Header};
use eth2::types::{BlockId, MainnetEthSpec};
use eth2::{BeaconNodeHttpClient, Error};
use tracing::log::trace;
use blob_archiver_storage::{BlobData, BlobSidecars, Header};

pub struct Archiver {
pub beacon_client: BeaconNodeHttpClient,
Expand All @@ -13,13 +13,25 @@ impl Archiver {
}

pub async fn persist_blobs_for_block(&self, block_id: BlockId) -> Result<(), Error> {
let header_resp_opt = self.beacon_client.get_beacon_headers_block_id(block_id).await?;
let header_resp_opt = self
.beacon_client
.get_beacon_headers_block_id(block_id)
.await?;
if let Some(header) = header_resp_opt {
let beacon_client = self.beacon_client.clone();
let blobs_resp_opt = beacon_client.get_blobs::<MainnetEthSpec>(BlockId::Root(header.data.root), None).await?;
let blobs_resp_opt = beacon_client
.get_blobs::<MainnetEthSpec>(BlockId::Root(header.data.root), None)
.await?;
if let Some(blob_sidecars) = blobs_resp_opt {
let blob_sidecar_list = blob_sidecars.data;
let blob_data = BlobData::new(Header { beacon_block_hash: header.data.root }, BlobSidecars { data: blob_sidecar_list });
let blob_data = BlobData::new(
Header {
beacon_block_hash: header.data.root,
},
BlobSidecars {
data: blob_sidecar_list,
},
);
trace!("Persisting blobs for block: {:?}", blob_data);
return Ok(());
}
Expand All @@ -41,10 +53,13 @@ mod tests {

#[tokio::test]
async fn test_persist_blobs_for_block() {
let beacon_client = BeaconNodeHttpClient::new(SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(), Timeouts::set_all(Duration::from_secs(30)));
let beacon_client = BeaconNodeHttpClient::new(
SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(),
Timeouts::set_all(Duration::from_secs(30)),
);
let archiver = Archiver::new(beacon_client);

let block_id = BlockId::Head;
archiver.persist_blobs_for_block(block_id).await.unwrap();
}
}
}
12 changes: 9 additions & 3 deletions bin/archiver/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
use std::str::FromStr;
use std::time::Duration;

use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};
use eth2::types::BlockId;
use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};

use crate::archiver::Archiver;

mod archiver;

#[tokio::main]
async fn main() {
let beacon_client = BeaconNodeHttpClient::new(SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(), Timeouts::set_all(Duration::from_secs(30)));
let beacon_client = BeaconNodeHttpClient::new(
SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(),
Timeouts::set_all(Duration::from_secs(30)),
);
let archiver = Archiver::new(beacon_client);

let block_id = BlockId::Head;

archiver.persist_blobs_for_block(block_id).await.expect("TODO: panic message");
archiver
.persist_blobs_for_block(block_id)
.await
.expect("TODO: panic message");
}
5 changes: 2 additions & 3 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::str::FromStr;
use std::time::Duration;

use eth2::types::Hash256;
use anyhow::Result;

#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct BeaconConfig {
Expand All @@ -16,4 +15,4 @@ pub struct ArchiverConfig {
pub beacon: BeaconConfig,
pub poll_interval: Duration,
pub origin_block: Hash256,
}
}
2 changes: 1 addition & 1 deletion crates/config/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod config;

pub use config::{ArchiverConfig};
pub use config::ArchiverConfig;
147 changes: 147 additions & 0 deletions crates/storage/src/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use std::path::PathBuf;

use async_trait::async_trait;
use eth2::types::Hash256;
use eyre::Result;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use crate::storage::{BackfillProcesses, BACKFILL_LOCK};
use crate::{BlobData, LockFile, StorageReader, StorageWriter};

pub struct FSStorage {
pub(crate) dir: PathBuf,
}

impl FSStorage {
pub async fn new(dir: PathBuf) -> Result<Self> {
Ok(Self { dir })
}
}

#[async_trait]
impl StorageReader for FSStorage {
async fn read_blob_data(&self, hash: Hash256) -> Result<BlobData> {
let path = self.dir.join(format!("{:x}", hash));
let mut file = tokio::fs::File::open(path).await?;
let mut data = Vec::new();
file.read_to_end(&mut data).await?;
Ok(serde_json::from_slice(&data)?)
}

async fn exists(&self, hash: Hash256) -> bool {
self.dir.join(format!("{:x}", hash)).exists()
}

async fn read_lock_file(&self) -> Result<LockFile> {
let path = self.dir.join("lockfile");
let mut file = tokio::fs::File::open(path).await?;
let mut data = Vec::new();
file.read_to_end(&mut data).await?;
Ok(serde_json::from_slice(&data)?)
}

async fn read_backfill_processes(&self) -> Result<BackfillProcesses> {
BACKFILL_LOCK.lock();
let path = self.dir.join("backfill_processes");
let mut file = tokio::fs::File::open(path).await?;
let mut data = Vec::new();
file.read_to_end(&mut data).await?;
Ok(serde_json::from_slice(&data)?)
}
}

#[async_trait]
impl StorageWriter for FSStorage {
async fn write_blob_data(&self, blob_data: BlobData) -> Result<()> {
let path = self
.dir
.join(format!("{:x}", blob_data.header.beacon_block_hash));
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
let mut file = tokio::fs::File::create(path).await?;
file.write_all(&serde_json::to_vec(&blob_data)?).await?;
Ok(())
}

async fn write_lock_file(&self, lock_file: LockFile) -> Result<()> {
let path = self.dir.join("lockfile");
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
let mut file = tokio::fs::File::create(path).await?;
file.write_all(&serde_json::to_vec(&lock_file)?).await?;
Ok(())
}

async fn write_backfill_process(&self, backfill_process: BackfillProcesses) -> Result<()> {
BACKFILL_LOCK.lock();
let path = self.dir.join("backfill_processes");
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
let mut file = tokio::fs::File::create(path).await?;
file.write_all(&serde_json::to_vec(&backfill_process)?)
.await?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use crate::storage::{
create_test_blob_data, create_test_lock_file, create_test_test_backfill_processes,
};
use tokio::io;

use super::*;

#[tokio::test]
async fn test_fs_storage() {
let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap();
let blob_data = create_test_blob_data();
assert!(storage
.read_blob_data(blob_data.header.beacon_block_hash)
.await
.is_err_and(|e| e.downcast_ref::<io::Error>().is_some()));
storage.write_blob_data(blob_data.clone()).await.unwrap();
assert_eq!(
storage
.read_blob_data(blob_data.header.beacon_block_hash)
.await
.unwrap(),
blob_data
);
let lock_file = create_test_lock_file();
assert!(storage
.read_lock_file()
.await
.is_err_and(|e| e.downcast_ref::<io::Error>().is_some()));
storage.write_lock_file(lock_file.clone()).await.unwrap();
assert_eq!(storage.read_lock_file().await.unwrap(), lock_file);
let test_backfill_processes = create_test_test_backfill_processes();
assert!(storage
.read_backfill_processes()
.await
.is_err_and(|e| e.downcast_ref::<io::Error>().is_some()));
storage
.write_backfill_process(test_backfill_processes.clone())
.await
.unwrap();
assert_eq!(
storage.read_backfill_processes().await.unwrap(),
test_backfill_processes
);
clean_dir(&storage.dir);
}

#[tokio::test]
async fn test_fs_storage_exists() {
let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap();
let blob_data = create_test_blob_data();
assert!(!storage.exists(blob_data.header.beacon_block_hash).await);
storage.write_blob_data(blob_data.clone()).await.unwrap();
assert!(storage.exists(blob_data.header.beacon_block_hash).await);
clean_dir(&storage.dir);
}

fn clean_dir(dir: &PathBuf) {
if dir.exists() {
std::fs::remove_dir_all(dir).unwrap();
}
}
}
3 changes: 2 additions & 1 deletion crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
extern crate core;

pub mod storage;
mod fs;
mod s3;
pub mod storage;

pub use storage::*;

Expand Down
Loading

0 comments on commit f7e8791

Please sign in to comment.