From fffcf2c00dc159269573f4cf7393d63c7e86201d Mon Sep 17 00:00:00 2001 From: Chen Kai <281165273grape@gmail.com> Date: Tue, 13 Aug 2024 23:09:32 +0800 Subject: [PATCH] feat:rearchive api Signed-off-by: Chen Kai <281165273grape@gmail.com> --- Cargo.lock | 134 ++++++++++++++++++++ Cargo.toml | 3 +- bin/archiver/Cargo.toml | 1 + bin/archiver/src/api.rs | 189 +++++++++++++++++++++++++++++ bin/archiver/src/archiver.rs | 27 ++--- bin/archiver/src/main.rs | 57 +++++++-- crates/beacon/src/beacon_client.rs | 2 +- 7 files changed, 384 insertions(+), 29 deletions(-) create mode 100644 bin/archiver/src/api.rs diff --git a/Cargo.lock b/Cargo.lock index 27c0590..4174f60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -269,6 +269,7 @@ dependencies = [ "tracing", "tracing-subscriber", "uuid 1.10.0", + "warp", ] [[package]] @@ -2846,6 +2847,30 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 0.2.12", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http 0.2.12", +] + [[package]] name = "heck" version = "0.4.1" @@ -4366,6 +4391,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -4398,6 +4433,24 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fafa6961cabd9c63bcd77a45d7e3b7f3b552b70417831fb0f56db717e72407e" +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 0.2.12", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "multiaddr" version = "0.18.1" @@ -6026,6 +6079,12 @@ dependencies = [ "parking_lot 0.12.3", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -7073,6 +7132,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -7274,6 +7345,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -7348,6 +7438,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccb97dac3243214f8d8507998906ca3e2e0b900bf9bf4870477f125b82e68f6e" +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -7455,6 +7554,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.2" @@ -7556,6 +7661,35 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http 0.2.12", + "hyper 0.14.30", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tungstenite", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 4a14c4f..57a6be2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,4 +33,5 @@ rand = "0.8.5" once_cell = "1.19.0" hex = "0.4.3" tracing-subscriber = "0.3.18" -uuid = { version = "1.10.0", features = ["v4", "fast-rng", "macro-diagnostics"] } \ No newline at end of file +uuid = { version = "1.10.0", features = ["v4", "fast-rng", "macro-diagnostics"] } +warp = "0.3.2" \ No newline at end of file diff --git a/bin/archiver/Cargo.toml b/bin/archiver/Cargo.toml index d833f93..2125e1e 100644 --- a/bin/archiver/Cargo.toml +++ b/bin/archiver/Cargo.toml @@ -23,6 +23,7 @@ again.workspace = true hex.workspace = true tracing-subscriber.workspace = true uuid.workspace = true +warp.workspace = true blob-archiver-storage = { path = "../../crates/storage" } blob-archiver-beacon = { path = "../../crates/beacon" } diff --git a/bin/archiver/src/api.rs b/bin/archiver/src/api.rs new file mode 100644 index 0000000..ebb5d63 --- /dev/null +++ b/bin/archiver/src/api.rs @@ -0,0 +1,189 @@ +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use crate::archiver::Archiver; +use serde::Serialize; +use warp::reject::Reject; +use warp::{Filter, Rejection, Reply}; + +pub struct Api { + archiver: Arc, +} + +#[derive(Serialize)] +struct RearchiveResponse { + success: bool, + message: String, + block_start: u64, + block_end: u64, +} + +impl Debug for RearchiveResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RearchiveResponse") + .field("success", &self.success) + .field("message", &self.message) + .field("block_start", &self.block_start) + .field("block_end", &self.block_end) + .finish() + } +} + +impl Reject for RearchiveResponse {} + +impl Api { + pub fn new(archiver: Archiver) -> Self { + Self { + archiver: Arc::new(archiver), + } + } + + pub fn routes(&self) -> impl Filter + Clone { + let archiver = self.archiver.clone(); + warp::path!("rearchive") + .and(warp::get()) + .and(warp::query::()) + .and(warp::any().map(move || archiver.clone())) + .and_then(Self::rearchive_range) + .or(warp::path!("healthz") + .and(warp::get()) + .and_then(Self::healthz)) + } + + async fn healthz() -> Result { + Ok(warp::reply::json(&serde_json::json!({ + "status": "ok" + }))) + } + + async fn rearchive_range( + query: RearchiveQuery, + archiver: Arc, + ) -> Result { + if query.from.is_none() || query.to.is_none() { + return Err(warp::reject::custom(RearchiveResponse { + success: false, + message: "Invalid query parameters".to_string(), + block_start: 0, + block_end: 0, + })); + } + + if query.from > query.to { + return Err(warp::reject::custom(RearchiveResponse { + success: false, + message: "Invalid query parameters".to_string(), + block_start: 0, + block_end: 0, + })); + } + + let res = archiver + .rearchive_range(query.from.unwrap(), query.to.unwrap()) + .await; + if res.error.is_some() { + return Err(warp::reject::custom(RearchiveResponse { + success: false, + message: res.error.unwrap(), + block_start: res.from, + block_end: res.to, + })); + } + Ok(warp::reply::json(&res)) + } +} + +#[derive(serde::Deserialize)] +struct RearchiveQuery { + from: Option, + to: Option, +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + use std::sync::Arc; + use std::time::Duration; + + use eth2::types::MainnetEthSpec; + use tokio::sync::watch::Receiver; + use tokio::sync::Mutex; + use tracing_subscriber::fmt; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + + use blob_archiver_beacon::beacon_client::BeaconClientStub; + use blob_archiver_beacon::blob_test_helper; + use blob_archiver_storage::fs::{FSStorage, TestFSStorage}; + use blob_archiver_storage::Storage; + + use crate::api::Api; + use crate::archiver::{Archiver, Config}; + use crate::INIT; + + fn setup_tracing() { + INIT.call_once(|| { + tracing_subscriber::registry().with(fmt::layer()).init(); + }); + } + + pub async fn create_test_archiver( + storage: Arc>, + shutdown_rx: Receiver, + ) -> (Archiver, Arc>>) { + setup_tracing(); + let beacon_client = Arc::new(Mutex::new(BeaconClientStub::default())); + let config = Config { + poll_interval: Duration::from_secs(5), + listen_addr: "".to_string(), + origin_block: *blob_test_helper::ORIGIN_BLOCK, + }; + let archiver = Archiver::new(beacon_client.clone(), storage, config, shutdown_rx); + (archiver, beacon_client) + } + + #[tokio::test] + async fn test_healthz() { + let (_, rx) = tokio::sync::watch::channel(false); + let dir = &PathBuf::from("test_healthz"); + let storage = FSStorage::new(dir.clone()).await.unwrap(); + tokio::fs::create_dir_all(dir).await.unwrap(); + let test_storage = Arc::new(Mutex::new(TestFSStorage::new(storage).await.unwrap())); + let (archiver, _) = create_test_archiver(test_storage.clone(), rx).await; + let res = warp::test::request() + .method("GET") + .path("/healthz") + .reply(&Api::new(archiver).routes()) + .await; + + assert_eq!(res.status(), 200); + assert_eq!( + std::str::from_utf8(res.body()).unwrap(), + "{\"status\":\"ok\"}" + ); + clean_dir(dir); + } + + #[tokio::test] + async fn test_rearchive_range() { + let (_, rx) = tokio::sync::watch::channel(false); + let dir = &PathBuf::from("test_rearchive_range"); + let storage = FSStorage::new(dir.clone()).await.unwrap(); + tokio::fs::create_dir_all(dir).await.unwrap(); + let test_storage = Arc::new(Mutex::new(TestFSStorage::new(storage).await.unwrap())); + let (archiver, _) = create_test_archiver(test_storage.clone(), rx).await; + let res = warp::test::request() + .method("GET") + .path("/rearchive?from=2001&to=2000") + .reply(&Api::new(archiver).routes()) + .await; + assert_eq!(res.status(), 500); + clean_dir(dir); + } + + fn clean_dir(dir: &PathBuf) { + if dir.exists() { + std::fs::remove_dir_all(dir).unwrap(); + } + } +} diff --git a/bin/archiver/src/archiver.rs b/bin/archiver/src/archiver.rs index 9953f6f..12a2e8a 100644 --- a/bin/archiver/src/archiver.rs +++ b/bin/archiver/src/archiver.rs @@ -22,7 +22,7 @@ use uuid::Uuid; #[allow(dead_code)] const LIVE_FETCH_BLOB_MAXIMUM_RETRIES: usize = 10; #[allow(dead_code)] -const STARTUP_FETCH_BLOB_MAXIMUM_RETRIES: usize = 3; +pub const STARTUP_FETCH_BLOB_MAXIMUM_RETRIES: usize = 3; #[allow(dead_code)] const REARCHIVE_MAXIMUM_RETRIES: usize = 3; #[allow(dead_code)] @@ -32,7 +32,7 @@ const LOCK_UPDATE_INTERVAL: Duration = Duration::from_secs(10); #[allow(dead_code)] const LOCK_TIMEOUT: Duration = Duration::from_secs(20); #[allow(dead_code)] -const OBTAIN_LOCK_RETRY_INTERVAL_SECS: u64 = 2; +const OBTAIN_LOCK_RETRY_INTERVAL_SECS: u64 = 10; #[allow(dead_code)] static OBTAIN_LOCK_RETRY_INTERVAL: AtomicU64 = AtomicU64::new(OBTAIN_LOCK_RETRY_INTERVAL_SECS); @@ -129,7 +129,7 @@ impl Archiver { } #[allow(dead_code)] - async fn wait_obtain_storage_lock(&self) { + pub(crate) async fn wait_obtain_storage_lock(&self) { let mut lock_file_res = self.storage.lock().await.read_lock_file().await; let mut now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -225,7 +225,7 @@ impl Archiver { } #[allow(dead_code)] - async fn backfill_blobs(&self, latest: &BlockHeaderData) { + pub(crate) async fn backfill_blobs(&self, latest: BlockHeaderData) { let backfill_processes_res = self.storage.lock().await.read_backfill_processes().await; match backfill_processes_res { @@ -234,7 +234,7 @@ impl Archiver { start_block: latest.clone(), current_block: latest.clone(), }; - backfill_processes.insert(latest.root, backfill_process); + backfill_processes.insert(latest.clone().root, backfill_process); let _ = self .storage .lock() @@ -394,7 +394,7 @@ impl Archiver { } #[allow(dead_code)] - async fn track_latest_block(&self) { + pub(crate) async fn track_latest_block(&self) { let mut ticket = interval(self.config.poll_interval); let mut shutdown_rx = self.shutdown_rx.clone(); loop { @@ -411,10 +411,7 @@ impl Archiver { } #[allow(dead_code)] - async fn start(&self) {} - - #[allow(dead_code)] - async fn rearchive_range(&self, from: u64, to: u64) -> RearchiveResp { + pub async fn rearchive_range(&self, from: u64, to: u64) -> RearchiveResp { for i in from..=to { info!("rearchiving block: {}", i); let retry_policy = RetryPolicy::exponential(Duration::from_millis(250)) @@ -468,6 +465,7 @@ mod tests { use tracing_subscriber::fmt; use super::*; + use crate::INIT; use blob_archiver_beacon::beacon_client::{BeaconClientEth2, BeaconClientStub}; use blob_archiver_beacon::blob_test_helper; use blob_archiver_beacon::blob_test_helper::{new_blob_sidecars, START_SLOT}; @@ -478,14 +476,13 @@ mod tests { use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; - static INIT: std::sync::Once = std::sync::Once::new(); fn setup_tracing() { INIT.call_once(|| { tracing_subscriber::registry().with(fmt::layer()).init(); }); } - async fn create_test_archiver( + pub async fn create_test_archiver( storage: Arc>, shutdown_rx: Receiver, ) -> (Archiver, Arc>>) { @@ -717,7 +714,7 @@ mod tests { .unwrap() .clone(); - archiver.backfill_blobs(&head).await; + archiver.backfill_blobs(head).await; for blob in expected_blobs.iter() { assert!(archiver.storage.lock().await.exists(blob).await); @@ -825,7 +822,7 @@ mod tests { .unwrap() .clone(); - archiver.backfill_blobs(&head).await; + archiver.backfill_blobs(head).await; for blob in expected_blobs.iter() { assert!(archiver.storage.lock().await.exists(blob).await); @@ -1177,7 +1174,7 @@ mod tests { .unwrap() .clone(); - archiver.backfill_blobs(&head).await; + archiver.backfill_blobs(head).await; for expected_blob in expected_blobs.iter() { assert!(archiver.storage.lock().await.exists(expected_blob).await); diff --git a/bin/archiver/src/main.rs b/bin/archiver/src/main.rs index ef34233..af3b8d0 100644 --- a/bin/archiver/src/main.rs +++ b/bin/archiver/src/main.rs @@ -1,26 +1,34 @@ -use std::path::PathBuf; -use std::str::FromStr; -use std::sync::Arc; -use std::time::Duration; - -use crate::archiver::{Archiver, Config}; +use crate::archiver::{Archiver, Config, STARTUP_FETCH_BLOB_MAXIMUM_RETRIES}; +use again::RetryPolicy; use blob_archiver_beacon::beacon_client::BeaconClientEth2; use blob_archiver_beacon::blob_test_helper; use blob_archiver_storage::fs::FSStorage; use eth2::types::BlockId; use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts}; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; use tokio::sync::Mutex; +use tracing::log::error; +use tracing_subscriber::fmt; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +mod api; mod archiver; +static INIT: std::sync::Once = std::sync::Once::new(); + #[tokio::main] async fn main() { + setup_tracing(); let beacon_client = BeaconNodeHttpClient::new( SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(), Timeouts::set_all(Duration::from_secs(30)), ); let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap(); - let (_, shutdown_rx) = tokio::sync::watch::channel(false); + let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); let beacon_client_eth2 = BeaconClientEth2 { beacon_client }; let config = Config { poll_interval: Duration::from_secs(5), @@ -34,10 +42,35 @@ async fn main() { shutdown_rx, ); - let block_id = BlockId::Head; + let retry_policy = RetryPolicy::exponential(Duration::from_millis(250)) + .with_jitter(true) + .with_max_delay(Duration::from_secs(10)) + .with_max_retries(STARTUP_FETCH_BLOB_MAXIMUM_RETRIES); + let res = retry_policy + .retry(|| archiver.persist_blobs_for_block(BlockId::Head, false)) + .await; - archiver - .persist_blobs_for_block(block_id, false) - .await - .expect("TODO: panic message"); + match res { + Err(e) => { + error!("failed to seed archiver with initial block: {:#?}", e); + std::process::exit(1); + } + Ok(Some((curr, _))) => { + archiver.wait_obtain_storage_lock().await; + archiver.track_latest_block().await; + tokio::spawn(async move { + archiver.backfill_blobs(curr).await; + }); + } + Ok(None) => { + error!("Error fetching blobs for block"); + std::process::exit(1); + } + }; } + +fn setup_tracing() { + INIT.call_once(|| { + tracing_subscriber::registry().with(fmt::layer()).init(); + }); +} \ No newline at end of file diff --git a/crates/beacon/src/beacon_client.rs b/crates/beacon/src/beacon_client.rs index 4625e47..89d6a40 100644 --- a/crates/beacon/src/beacon_client.rs +++ b/crates/beacon/src/beacon_client.rs @@ -13,7 +13,7 @@ use crate::blob_test_helper::{ }; #[async_trait] -pub trait BeaconClient { +pub trait BeaconClient: Send + Sync { async fn get_beacon_headers_block_id( &self, block_id: BlockId,