From 30410716650bf45f59d7910bf6cb529bcafab810 Mon Sep 17 00:00:00 2001 From: Chen Kai <281165273grape@gmail.com> Date: Tue, 6 Aug 2024 15:51:25 +0800 Subject: [PATCH] feat:rearchive range Signed-off-by: Chen Kai <281165273grape@gmail.com> --- bin/archiver/src/archiver.rs | 74 +++++++++++++++++++++++++++++++++--- 1 file changed, 68 insertions(+), 6 deletions(-) diff --git a/bin/archiver/src/archiver.rs b/bin/archiver/src/archiver.rs index 37818a7..e11281a 100644 --- a/bin/archiver/src/archiver.rs +++ b/bin/archiver/src/archiver.rs @@ -3,11 +3,14 @@ use blob_archiver_storage::{ BackfillProcess, BackfillProcesses, BlobData, BlobSidecars, Header, LockFile, Storage, }; use eth2::types::{BlockHeaderData, BlockId, Hash256}; -use eyre::Result; +use eyre::{eyre, Result}; use serde::{Deserialize, Serialize}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; +use eth2::{Error}; +use eth2::Error::ServerMessage; +use eth2::types::Slot; use tokio::sync::watch::Receiver; use tokio::time::{interval, sleep}; use tracing::log::{debug, error, info, trace}; @@ -16,9 +19,9 @@ use blob_archiver_beacon::beacon_client::BeaconClient; #[allow(dead_code)] const LIVE_FETCH_BLOB_MAXIMUM_RETRIES: usize = 10; #[allow(dead_code)] -const STARTUP_FETCH_BLOB_MAXIMUM_RETRIES: i32 = 3; +const STARTUP_FETCH_BLOB_MAXIMUM_RETRIES: usize = 3; #[allow(dead_code)] -const REARCHIVE_MAXIMUM_RETRIES: i32 = 3; +const REARCHIVE_MAXIMUM_RETRIES: usize = 3; #[allow(dead_code)] const BACKFILL_ERROR_RETRY_INTERVAL: Duration = Duration::from_secs(5); #[allow(dead_code)] @@ -30,6 +33,13 @@ const OBTAIN_LOCK_RETRY_INTERVAL_SECS: u64 = 10; #[allow(dead_code)] static OBTAIN_LOCK_RETRY_INTERVAL: AtomicU64 = AtomicU64::new(OBTAIN_LOCK_RETRY_INTERVAL_SECS); +#[derive(Debug, Serialize, Deserialize)] +pub struct RearchiveResp { + pub from: u64, + pub to: u64, + pub error: Option, +} + #[derive(Debug, PartialEq, Eq, Clone, Default, Serialize, Deserialize)] pub struct Config { pub poll_interval: Duration, @@ -43,9 +53,9 @@ pub struct Archiver { pub beacon_client: Arc, storage: Arc, - #[allow(dead_code)] + id: String, - #[allow(dead_code)] + pub config: Config, shutdown_rx: Receiver, @@ -322,7 +332,7 @@ impl Archiver { let mut current_block_id = BlockId::Head; loop { - let retry_policy = RetryPolicy::exponential(Duration::from_secs(1)) + let retry_policy = RetryPolicy::exponential(Duration::from_millis(250)) .with_jitter(true) .with_max_delay(Duration::from_secs(10)) .with_max_retries(LIVE_FETCH_BLOB_MAXIMUM_RETRIES); @@ -376,6 +386,58 @@ impl Archiver { #[allow(dead_code)] async fn start(&self) {} + + #[allow(dead_code)] + async fn rearchive_range(&self, from: u64, to: u64) -> RearchiveResp { + for i in from..=to { + info!("rearchiving block: {}",i); + let retry_policy = RetryPolicy::exponential(Duration::from_millis(250)) + .with_jitter(true) + .with_max_delay(Duration::from_secs(10)) + .with_max_retries(REARCHIVE_MAXIMUM_RETRIES); + let r = retry_policy + .retry(|| { + self.rearchive(i) + }) + .await; + + match r { + Err(e) => { + error!("Error fetching blobs for block: {:#?}", e); + return RearchiveResp { + from, + to, + error: Some(e.downcast::().unwrap().to_string()), + }; + } + Ok(false) => { + info!("block not found, skipping"); + } + Ok(true) => { + info!("block rearchived successfully") + } + } + } + RearchiveResp { from, to, error: None } + } + + async fn rearchive(&self, i: u64) -> Result { + let res = self.persist_blobs_for_block(BlockId::Slot(Slot::new(i)), true).await; + + match res { + Err(e) => { + if let Some(error) = e.downcast_ref::() { + match error { + ServerMessage(sm) if sm.code == 404 => Ok(false), + _ => Err(eyre::eyre!(e)), + } + } else { + Err(eyre!(e)) + } + } + Ok(_) => Ok(true) + } + } } #[cfg(test)]