diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 9fd45e0..cf976b3 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -17,6 +17,10 @@ jobs: steps: - uses: actions/checkout@v4 - name: Build - run: cargo build --verbose + run: cargo build --workspace --verbose + - name: Lint + run: cargo clippy --workspace --verbose + - name: Format + run: cargo fmt --all -- --check - name: Run tests - run: cargo test --verbose + run: cargo test --workspace --verbose diff --git a/bin/archiver/src/archiver.rs b/bin/archiver/src/archiver.rs index 28db2e0..17cfec5 100644 --- a/bin/archiver/src/archiver.rs +++ b/bin/archiver/src/archiver.rs @@ -1,7 +1,7 @@ -use eth2::{BeaconNodeHttpClient, Error}; +use blob_archiver_storage::{BlobData, BlobSidecars, Header}; use eth2::types::{BlockId, MainnetEthSpec}; +use eth2::{BeaconNodeHttpClient, Error}; use tracing::log::trace; -use blob_archiver_storage::{BlobData, BlobSidecars, Header}; pub struct Archiver { pub beacon_client: BeaconNodeHttpClient, @@ -13,13 +13,25 @@ impl Archiver { } pub async fn persist_blobs_for_block(&self, block_id: BlockId) -> Result<(), Error> { - let header_resp_opt = self.beacon_client.get_beacon_headers_block_id(block_id).await?; + let header_resp_opt = self + .beacon_client + .get_beacon_headers_block_id(block_id) + .await?; if let Some(header) = header_resp_opt { let beacon_client = self.beacon_client.clone(); - let blobs_resp_opt = beacon_client.get_blobs::(BlockId::Root(header.data.root), None).await?; + let blobs_resp_opt = beacon_client + .get_blobs::(BlockId::Root(header.data.root), None) + .await?; if let Some(blob_sidecars) = blobs_resp_opt { let blob_sidecar_list = blob_sidecars.data; - let blob_data = BlobData::new(Header { beacon_block_hash: header.data.root }, BlobSidecars { data: blob_sidecar_list }); + let blob_data = BlobData::new( + Header { + beacon_block_hash: header.data.root, + }, + BlobSidecars { + data: blob_sidecar_list, + }, + ); trace!("Persisting blobs for block: {:?}", blob_data); return Ok(()); } @@ -41,10 +53,13 @@ mod tests { #[tokio::test] async fn test_persist_blobs_for_block() { - let beacon_client = BeaconNodeHttpClient::new(SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(), Timeouts::set_all(Duration::from_secs(30))); + let beacon_client = BeaconNodeHttpClient::new( + SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(), + Timeouts::set_all(Duration::from_secs(30)), + ); let archiver = Archiver::new(beacon_client); let block_id = BlockId::Head; archiver.persist_blobs_for_block(block_id).await.unwrap(); } -} \ No newline at end of file +} diff --git a/bin/archiver/src/main.rs b/bin/archiver/src/main.rs index 0813064..60f611a 100644 --- a/bin/archiver/src/main.rs +++ b/bin/archiver/src/main.rs @@ -1,8 +1,8 @@ use std::str::FromStr; use std::time::Duration; -use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts}; use eth2::types::BlockId; +use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts}; use crate::archiver::Archiver; @@ -10,10 +10,16 @@ mod archiver; #[tokio::main] async fn main() { - let beacon_client = BeaconNodeHttpClient::new(SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(), Timeouts::set_all(Duration::from_secs(30))); + let beacon_client = BeaconNodeHttpClient::new( + SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(), + Timeouts::set_all(Duration::from_secs(30)), + ); let archiver = Archiver::new(beacon_client); let block_id = BlockId::Head; - archiver.persist_blobs_for_block(block_id).await.expect("TODO: panic message"); + archiver + .persist_blobs_for_block(block_id) + .await + .expect("TODO: panic message"); } diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index a5f856f..48f3672 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -1,7 +1,6 @@ -use std::str::FromStr; use std::time::Duration; + use eth2::types::Hash256; -use anyhow::Result; #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct BeaconConfig { @@ -16,4 +15,4 @@ pub struct ArchiverConfig { pub beacon: BeaconConfig, pub poll_interval: Duration, pub origin_block: Hash256, -} \ No newline at end of file +} diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index f4de43c..f22c6f4 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -1,3 +1,3 @@ pub mod config; -pub use config::{ArchiverConfig}; +pub use config::ArchiverConfig; diff --git a/crates/storage/src/fs.rs b/crates/storage/src/fs.rs new file mode 100644 index 0000000..3216694 --- /dev/null +++ b/crates/storage/src/fs.rs @@ -0,0 +1,147 @@ +use std::path::PathBuf; + +use async_trait::async_trait; +use eth2::types::Hash256; +use eyre::Result; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use crate::storage::{BackfillProcesses, BACKFILL_LOCK}; +use crate::{BlobData, LockFile, StorageReader, StorageWriter}; + +pub struct FSStorage { + pub(crate) dir: PathBuf, +} + +impl FSStorage { + pub async fn new(dir: PathBuf) -> Result { + Ok(Self { dir }) + } +} + +#[async_trait] +impl StorageReader for FSStorage { + async fn read_blob_data(&self, hash: Hash256) -> Result { + let path = self.dir.join(format!("{:x}", hash)); + let mut file = tokio::fs::File::open(path).await?; + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + Ok(serde_json::from_slice(&data)?) + } + + async fn exists(&self, hash: Hash256) -> bool { + self.dir.join(format!("{:x}", hash)).exists() + } + + async fn read_lock_file(&self) -> Result { + let path = self.dir.join("lockfile"); + let mut file = tokio::fs::File::open(path).await?; + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + Ok(serde_json::from_slice(&data)?) + } + + async fn read_backfill_processes(&self) -> Result { + BACKFILL_LOCK.lock(); + let path = self.dir.join("backfill_processes"); + let mut file = tokio::fs::File::open(path).await?; + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + Ok(serde_json::from_slice(&data)?) + } +} + +#[async_trait] +impl StorageWriter for FSStorage { + async fn write_blob_data(&self, blob_data: BlobData) -> Result<()> { + let path = self + .dir + .join(format!("{:x}", blob_data.header.beacon_block_hash)); + tokio::fs::create_dir_all(path.parent().unwrap()).await?; + let mut file = tokio::fs::File::create(path).await?; + file.write_all(&serde_json::to_vec(&blob_data)?).await?; + Ok(()) + } + + async fn write_lock_file(&self, lock_file: LockFile) -> Result<()> { + let path = self.dir.join("lockfile"); + tokio::fs::create_dir_all(path.parent().unwrap()).await?; + let mut file = tokio::fs::File::create(path).await?; + file.write_all(&serde_json::to_vec(&lock_file)?).await?; + Ok(()) + } + + async fn write_backfill_process(&self, backfill_process: BackfillProcesses) -> Result<()> { + BACKFILL_LOCK.lock(); + let path = self.dir.join("backfill_processes"); + tokio::fs::create_dir_all(path.parent().unwrap()).await?; + let mut file = tokio::fs::File::create(path).await?; + file.write_all(&serde_json::to_vec(&backfill_process)?) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::storage::{ + create_test_blob_data, create_test_lock_file, create_test_test_backfill_processes, + }; + use tokio::io; + + use super::*; + + #[tokio::test] + async fn test_fs_storage() { + let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap(); + let blob_data = create_test_blob_data(); + assert!(storage + .read_blob_data(blob_data.header.beacon_block_hash) + .await + .is_err_and(|e| e.downcast_ref::().is_some())); + storage.write_blob_data(blob_data.clone()).await.unwrap(); + assert_eq!( + storage + .read_blob_data(blob_data.header.beacon_block_hash) + .await + .unwrap(), + blob_data + ); + let lock_file = create_test_lock_file(); + assert!(storage + .read_lock_file() + .await + .is_err_and(|e| e.downcast_ref::().is_some())); + storage.write_lock_file(lock_file.clone()).await.unwrap(); + assert_eq!(storage.read_lock_file().await.unwrap(), lock_file); + let test_backfill_processes = create_test_test_backfill_processes(); + assert!(storage + .read_backfill_processes() + .await + .is_err_and(|e| e.downcast_ref::().is_some())); + storage + .write_backfill_process(test_backfill_processes.clone()) + .await + .unwrap(); + assert_eq!( + storage.read_backfill_processes().await.unwrap(), + test_backfill_processes + ); + clean_dir(&storage.dir); + } + + #[tokio::test] + async fn test_fs_storage_exists() { + let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap(); + let blob_data = create_test_blob_data(); + assert!(!storage.exists(blob_data.header.beacon_block_hash).await); + storage.write_blob_data(blob_data.clone()).await.unwrap(); + assert!(storage.exists(blob_data.header.beacon_block_hash).await); + clean_dir(&storage.dir); + } + + fn clean_dir(dir: &PathBuf) { + if dir.exists() { + std::fs::remove_dir_all(dir).unwrap(); + } + } +} diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 2598cc9..7eab9f4 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,7 +1,8 @@ extern crate core; -pub mod storage; +mod fs; mod s3; +pub mod storage; pub use storage::*; diff --git a/crates/storage/src/s3.rs b/crates/storage/src/s3.rs index 7e00746..d6f959a 100644 --- a/crates/storage/src/s3.rs +++ b/crates/storage/src/s3.rs @@ -2,17 +2,19 @@ use std::path::Path; use std::time::Duration; use async_trait::async_trait; -use aws_sdk_s3::Client; use aws_sdk_s3::config::retry::RetryConfig; use aws_sdk_s3::config::timeout::TimeoutConfig; use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::Client; use eth2::types::Hash256; use eyre::Result; use tracing::info; use tracing::log::trace; + use storage::BackfillProcesses; -use crate::{BlobData, LockFile, storage, StorageReader, StorageWriter}; + use crate::storage::BACKFILL_LOCK; +use crate::{storage, BlobData, LockFile, StorageReader, StorageWriter}; #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct Config { @@ -34,7 +36,12 @@ impl S3Storage { pub async fn new(config: Config) -> Result { let env_config = aws_config::from_env().load().await; let sdk_config = aws_sdk_s3::config::Builder::from(&env_config) - .timeout_config(TimeoutConfig::builder().connect_timeout(Duration::from_secs(15)).operation_timeout(Duration::from_secs(30)).build()) + .timeout_config( + TimeoutConfig::builder() + .connect_timeout(Duration::from_secs(15)) + .operation_timeout(Duration::from_secs(30)) + .build(), + ) .retry_config(RetryConfig::standard()) .endpoint_url(config.endpoint.as_str()) .force_path_style(true) @@ -54,7 +61,13 @@ impl S3Storage { impl StorageReader for S3Storage { async fn read_blob_data(&self, hash: Hash256) -> Result { let blob_path = Path::new(&self.path).join(format!("{:x}", hash)); - let blob_res = self.client.get_object().bucket(self.bucket.as_str()).key(blob_path.to_str().ok_or(eyre::eyre!("Invalid blob path"))?).send().await?; + let blob_res = self + .client + .get_object() + .bucket(self.bucket.as_str()) + .key(blob_path.to_str().ok_or(eyre::eyre!("Invalid blob path"))?) + .send() + .await?; let blob_data_bytes = blob_res.body.collect().await?.to_vec(); @@ -71,7 +84,13 @@ impl StorageReader for S3Storage { async fn exists(&self, _hash: Hash256) -> bool { let blob_path = Path::new(&self.path).join(format!("{:x}", _hash)); if let Some(path) = blob_path.to_str() { - self.client.head_object().bucket(self.bucket.as_str()).key(path).send().await.is_ok() + self.client + .head_object() + .bucket(self.bucket.as_str()) + .key(path) + .send() + .await + .is_ok() } else { false } @@ -79,7 +98,17 @@ impl StorageReader for S3Storage { async fn read_lock_file(&self) -> Result { let lock_file_path = Path::new(&self.path).join("lockfile"); - let lock_file_res = self.client.get_object().bucket(self.bucket.as_str()).key(lock_file_path.to_str().ok_or(eyre::eyre!("Invalid lock file path"))?).send().await?; + let lock_file_res = self + .client + .get_object() + .bucket(self.bucket.as_str()) + .key( + lock_file_path + .to_str() + .ok_or(eyre::eyre!("Invalid lock file path"))?, + ) + .send() + .await?; let lock_file_bytes = lock_file_res.body.collect().await?.to_vec(); let lock_file: LockFile = serde_json::from_slice(lock_file_bytes.as_slice())?; @@ -90,10 +119,21 @@ impl StorageReader for S3Storage { async fn read_backfill_processes(&self) -> Result { BACKFILL_LOCK.lock(); let backfill_process_path = Path::new(&self.path).join("backfill_processes"); - let backfill_process_res = self.client.get_object().bucket(self.bucket.as_str()).key(backfill_process_path.to_str().ok_or(eyre::eyre!("Invalid backfill processes path"))?).send().await?; + let backfill_process_res = self + .client + .get_object() + .bucket(self.bucket.as_str()) + .key( + backfill_process_path + .to_str() + .ok_or(eyre::eyre!("Invalid backfill processes path"))?, + ) + .send() + .await?; let backfill_process_bytes = backfill_process_res.body.collect().await?.to_vec(); - let backfill_processes: BackfillProcesses = serde_json::from_slice(backfill_process_bytes.as_slice())?; + let backfill_processes: BackfillProcesses = + serde_json::from_slice(backfill_process_bytes.as_slice())?; Ok(backfill_processes) } @@ -102,16 +142,20 @@ impl StorageReader for S3Storage { #[async_trait] impl StorageWriter for S3Storage { async fn write_blob_data(&self, blob_data: BlobData) -> Result<()> { - let blob_path = Path::new(&self.path).join(format!("{:x}", blob_data.header.beacon_block_hash)); + let blob_path = + Path::new(&self.path).join(format!("{:x}", blob_data.header.beacon_block_hash)); let blob_data_bytes = if self.compression { - let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + let mut encoder = + flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); serde_json::to_writer(&mut encoder, &blob_data)?; encoder.finish()? } else { serde_json::to_vec(&blob_data)? }; - let mut put_object_request = self.client.put_object() + let mut put_object_request = self + .client + .put_object() .bucket(self.bucket.as_str()) .key(blob_path.to_str().ok_or(eyre::eyre!("Invalid blob path"))?) .content_type("application/json") @@ -130,12 +174,19 @@ impl StorageWriter for S3Storage { let lock_file_bytes = serde_json::to_vec(&lock_file)?; let lock_file_path = Path::new(&self.path).join("lockfile"); - let _ = self.client.put_object() + let _ = self + .client + .put_object() .bucket(self.bucket.as_str()) - .key(lock_file_path.to_str().ok_or(eyre::eyre!("Invalid lock file path"))?) + .key( + lock_file_path + .to_str() + .ok_or(eyre::eyre!("Invalid lock file path"))?, + ) .content_type("application/json") .body(ByteStream::from(lock_file_bytes)) - .send().await?; + .send() + .await?; trace!("Wrote lock file to S3: {:?}", lock_file_path); Ok(()) @@ -146,14 +197,24 @@ impl StorageWriter for S3Storage { let backfill_process_bytes = serde_json::to_vec(&backfill_processes)?; let backfill_process_path = Path::new(&self.path).join("backfill_processes"); - let _ = self.client.put_object() + let _ = self + .client + .put_object() .bucket(self.bucket.as_str()) - .key(backfill_process_path.to_str().ok_or(eyre::eyre!("Invalid backfill processes path"))?) + .key( + backfill_process_path + .to_str() + .ok_or(eyre::eyre!("Invalid backfill processes path"))?, + ) .content_type("application/json") .body(ByteStream::from(backfill_process_bytes)) - .send().await?; + .send() + .await?; - info!("Wrote backfill processes to S3: {:?}", backfill_process_path); + info!( + "Wrote backfill processes to S3: {:?}", + backfill_process_path + ); Ok(()) } } @@ -162,15 +223,192 @@ impl StorageWriter for S3Storage { mod tests { use std::env; - use eth2::types::{BlobSidecarList, Hash256}; + use crate::s3::{Config, S3Storage}; + use crate::storage::create_test_blob_data; + use crate::{storage, StorageReader, StorageWriter}; + use aws_sdk_s3::types::error::NoSuchKey; + use storage::{create_test_lock_file, create_test_test_backfill_processes}; use testcontainers_modules::localstack::LocalStack; - use testcontainers_modules::testcontainers::ImageExt; use testcontainers_modules::testcontainers::runners::AsyncRunner; - - use crate::{StorageReader, StorageWriter}; + use testcontainers_modules::testcontainers::{ContainerAsync, ImageExt}; #[tokio::test] async fn test_write_read_blob_data() { + let (storage, _container) = setup(false).await; + storage + .client + .create_bucket() + .bucket("test-bucket") + .send() + .await + .unwrap(); + + let blob_data = create_test_blob_data(); + let hash = blob_data.header.beacon_block_hash; + assert!(storage + .read_blob_data(hash) + .await + .is_err_and(|e| e.root_cause().downcast_ref::().is_some())); + storage.write_blob_data(blob_data).await.unwrap(); + + let actual_blob_data = storage.read_blob_data(hash).await.unwrap(); + assert_eq!(actual_blob_data.header.beacon_block_hash, hash); + assert_eq!(actual_blob_data.blob_sidecars.data.len(), 0); + } + + #[tokio::test] + async fn test_write_read_lock_file() { + let (storage, _container) = setup(false).await; + storage + .client + .create_bucket() + .bucket("test-bucket") + .send() + .await + .unwrap(); + + let lock_file = create_test_lock_file(); + assert!(storage + .read_lock_file() + .await + .is_err_and(|e| e.root_cause().downcast_ref::().is_some())); + storage.write_lock_file(lock_file.clone()).await.unwrap(); + + let actual_lock_file = storage.read_lock_file().await.unwrap(); + assert_eq!(actual_lock_file.archiver_id, lock_file.archiver_id); + assert_eq!(actual_lock_file.timestamp, lock_file.timestamp); + } + + #[tokio::test] + async fn test_write_read_backfill_processes() { + let (storage, _container) = setup(false).await; + storage + .client + .create_bucket() + .bucket("test-bucket") + .send() + .await + .unwrap(); + + let backfill_processes = create_test_test_backfill_processes(); + assert!(storage + .read_backfill_processes() + .await + .is_err_and(|e| e.root_cause().downcast_ref::().is_some())); + storage + .write_backfill_process(backfill_processes.clone()) + .await + .unwrap(); + + let actual_backfill_processes = storage.read_backfill_processes().await.unwrap(); + assert_eq!(actual_backfill_processes.len(), 1); + assert_eq!( + actual_backfill_processes + .values() + .next() + .unwrap() + .start_block, + backfill_processes.values().next().unwrap().start_block + ); + assert_eq!( + actual_backfill_processes + .values() + .next() + .unwrap() + .current_block, + backfill_processes.values().next().unwrap().current_block + ); + } + + #[tokio::test] + async fn test_write_read_blob_data_compressed() { + let (storage, _container) = setup(true).await; + storage + .client + .create_bucket() + .bucket("test-bucket") + .send() + .await + .unwrap(); + + let blob_data = create_test_blob_data(); + let hash = blob_data.header.beacon_block_hash; + assert!(storage + .read_blob_data(hash) + .await + .is_err_and(|e| e.root_cause().downcast_ref::().is_some())); + storage.write_blob_data(blob_data).await.unwrap(); + + let actual_blob_data = storage.read_blob_data(hash).await.unwrap(); + assert_eq!(actual_blob_data.header.beacon_block_hash, hash); + assert_eq!(actual_blob_data.blob_sidecars.data.len(), 0); + } + + #[tokio::test] + async fn test_write_read_lock_file_compressed() { + let (storage, _container) = setup(true).await; + storage + .client + .create_bucket() + .bucket("test-bucket") + .send() + .await + .unwrap(); + + let lock_file = create_test_lock_file(); + assert!(storage + .read_lock_file() + .await + .is_err_and(|e| e.root_cause().downcast_ref::().is_some())); + storage.write_lock_file(lock_file.clone()).await.unwrap(); + + let actual_lock_file = storage.read_lock_file().await.unwrap(); + assert_eq!(actual_lock_file.archiver_id, lock_file.archiver_id); + assert_eq!(actual_lock_file.timestamp, lock_file.timestamp); + } + + #[tokio::test] + async fn test_write_read_backfill_processes_compressed() { + let (storage, _container) = setup(true).await; + storage + .client + .create_bucket() + .bucket("test-bucket") + .send() + .await + .unwrap(); + + let backfill_processes = create_test_test_backfill_processes(); + assert!(storage + .read_backfill_processes() + .await + .is_err_and(|e| e.root_cause().downcast_ref::().is_some())); + storage + .write_backfill_process(backfill_processes.clone()) + .await + .unwrap(); + + let actual_backfill_processes = storage.read_backfill_processes().await.unwrap(); + assert_eq!(actual_backfill_processes.len(), 1); + assert_eq!( + actual_backfill_processes + .values() + .next() + .unwrap() + .start_block, + backfill_processes.values().next().unwrap().start_block + ); + assert_eq!( + actual_backfill_processes + .values() + .next() + .unwrap() + .current_block, + backfill_processes.values().next().unwrap().current_block + ); + } + + async fn setup(compression: bool) -> (S3Storage, ContainerAsync) { let request = LocalStack::default().with_env_var("SERVICES", "s3"); let container = request.start().await.unwrap(); @@ -181,52 +419,14 @@ mod tests { env::set_var("AWS_SECRET_ACCESS_KEY", "test"); env::set_var("AWS_REGION", "us-east-1"); - let mut config = crate::s3::Config { + let config = Config { endpoint: format!("http://{}:{}", host_ip, host_port), bucket: "test-bucket".to_string(), path: "blobs".to_string(), - compression: false, + compression, }; - let storage = crate::s3::S3Storage::new(config.clone()).await.unwrap(); - storage.client.create_bucket().bucket("test-bucket").send().await.unwrap(); - - let header_hash = Hash256::random(); - let blob_data = crate::BlobData::new( - crate::Header { - beacon_block_hash: header_hash, - }, - crate::BlobSidecars { - data: BlobSidecarList::default(), - }, - ); - - storage.write_blob_data(blob_data).await.unwrap(); - - let actual_blob_data = storage.read_blob_data(header_hash).await.unwrap(); - assert_eq!(actual_blob_data.header.beacon_block_hash, header_hash); - assert_eq!(actual_blob_data.blob_sidecars.data.len(), 0); - - config.compression = true; - let storage = crate::s3::S3Storage::new(config.clone()).await.unwrap(); - - let header_hash = Hash256::random(); - let blob_data = crate::BlobData::new( - crate::Header { - beacon_block_hash: header_hash, - }, - crate::BlobSidecars { - data: BlobSidecarList::default(), - }, - ); - - storage.write_blob_data(blob_data).await.unwrap(); - let actual_blob_data = storage.read_blob_data(header_hash).await.unwrap(); - assert_eq!(actual_blob_data.header.beacon_block_hash, header_hash); - assert_eq!(actual_blob_data.blob_sidecars.data.len(), 0); + let storage = S3Storage::new(config).await.unwrap(); + (storage, container) } } - - - - diff --git a/crates/storage/src/storage.rs b/crates/storage/src/storage.rs index e409a32..f3cf709 100644 --- a/crates/storage/src/storage.rs +++ b/crates/storage/src/storage.rs @@ -6,8 +6,8 @@ use eyre::Result; use serde::{Deserialize, Serialize}; use spin::Mutex; -pub(crate) type BackfillProcesses = HashMap; -pub(crate) static BACKFILL_LOCK: Mutex<()> = Mutex::new(()); +pub type BackfillProcesses = HashMap; +pub static BACKFILL_LOCK: Mutex<()> = Mutex::new(()); #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct BackfillProcess { @@ -40,7 +40,7 @@ impl BlobData { } } -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Default, Serialize, Deserialize)] pub struct LockFile { pub archiver_id: String, pub timestamp: u64, @@ -68,3 +68,47 @@ pub trait StorageWriter { #[async_trait] pub trait Storage: StorageReader + StorageWriter {} + +pub fn create_test_lock_file() -> LockFile { + LockFile { + archiver_id: "test_archiver".to_string(), + timestamp: 0, + } +} + +pub fn create_test_test_backfill_processes() -> BackfillProcesses { + let mut backfill_processes: BackfillProcesses = HashMap::new(); + let header_hash = Hash256::random(); + let backfill_process = BackfillProcess { + start_block: create_test_block_header(), + current_block: create_test_block_header(), + }; + backfill_processes.insert(header_hash, backfill_process); + backfill_processes +} + +pub fn create_test_blob_data() -> BlobData { + BlobData::new(create_test_header(), create_test_blob_sidecars()) +} + +fn create_test_header() -> Header { + Header { + beacon_block_hash: Hash256::random(), + } +} + +fn create_test_blob_sidecars() -> BlobSidecars { + BlobSidecars { + data: BlobSidecarList::default(), + } +} + +fn create_test_block_header() -> BeaconBlockHeader { + BeaconBlockHeader { + slot: Default::default(), + proposer_index: 0, + parent_root: Default::default(), + state_root: Default::default(), + body_root: Default::default(), + } +} diff --git a/crates/storage/tests/s3_tests.rs b/crates/storage/tests/s3_tests.rs deleted file mode 100644 index 8b13789..0000000 --- a/crates/storage/tests/s3_tests.rs +++ /dev/null @@ -1 +0,0 @@ -