Skip to content

Commit

Permalink
feat:rearchive range
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Kai <[email protected]>
  • Loading branch information
GrapeBaBa committed Aug 6, 2024
1 parent 82ea105 commit 3041071
Showing 1 changed file with 68 additions and 6 deletions.
74 changes: 68 additions & 6 deletions bin/archiver/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ use blob_archiver_storage::{
BackfillProcess, BackfillProcesses, BlobData, BlobSidecars, Header, LockFile, Storage,
};
use eth2::types::{BlockHeaderData, BlockId, Hash256};
use eyre::Result;
use eyre::{eyre, Result};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use eth2::{Error};
use eth2::Error::ServerMessage;
use eth2::types::Slot;
use tokio::sync::watch::Receiver;
use tokio::time::{interval, sleep};
use tracing::log::{debug, error, info, trace};
Expand All @@ -16,9 +19,9 @@ use blob_archiver_beacon::beacon_client::BeaconClient;
#[allow(dead_code)]
const LIVE_FETCH_BLOB_MAXIMUM_RETRIES: usize = 10;
#[allow(dead_code)]
const STARTUP_FETCH_BLOB_MAXIMUM_RETRIES: i32 = 3;
const STARTUP_FETCH_BLOB_MAXIMUM_RETRIES: usize = 3;
#[allow(dead_code)]
const REARCHIVE_MAXIMUM_RETRIES: i32 = 3;
const REARCHIVE_MAXIMUM_RETRIES: usize = 3;
#[allow(dead_code)]
const BACKFILL_ERROR_RETRY_INTERVAL: Duration = Duration::from_secs(5);
#[allow(dead_code)]
Expand All @@ -30,6 +33,13 @@ const OBTAIN_LOCK_RETRY_INTERVAL_SECS: u64 = 10;
#[allow(dead_code)]
static OBTAIN_LOCK_RETRY_INTERVAL: AtomicU64 = AtomicU64::new(OBTAIN_LOCK_RETRY_INTERVAL_SECS);

#[derive(Debug, Serialize, Deserialize)]
pub struct RearchiveResp {
pub from: u64,
pub to: u64,
pub error: Option<String>,
}

#[derive(Debug, PartialEq, Eq, Clone, Default, Serialize, Deserialize)]
pub struct Config {
pub poll_interval: Duration,
Expand All @@ -43,9 +53,9 @@ pub struct Archiver {
pub beacon_client: Arc<dyn BeaconClient>,

storage: Arc<dyn Storage>,
#[allow(dead_code)]

id: String,
#[allow(dead_code)]

pub config: Config,

shutdown_rx: Receiver<bool>,
Expand Down Expand Up @@ -322,7 +332,7 @@ impl Archiver {
let mut current_block_id = BlockId::Head;

loop {
let retry_policy = RetryPolicy::exponential(Duration::from_secs(1))
let retry_policy = RetryPolicy::exponential(Duration::from_millis(250))
.with_jitter(true)
.with_max_delay(Duration::from_secs(10))
.with_max_retries(LIVE_FETCH_BLOB_MAXIMUM_RETRIES);
Expand Down Expand Up @@ -376,6 +386,58 @@ impl Archiver {

#[allow(dead_code)]
async fn start(&self) {}

#[allow(dead_code)]
async fn rearchive_range(&self, from: u64, to: u64) -> RearchiveResp {
for i in from..=to {
info!("rearchiving block: {}",i);
let retry_policy = RetryPolicy::exponential(Duration::from_millis(250))
.with_jitter(true)
.with_max_delay(Duration::from_secs(10))
.with_max_retries(REARCHIVE_MAXIMUM_RETRIES);
let r = retry_policy
.retry(|| {
self.rearchive(i)
})
.await;

match r {
Err(e) => {
error!("Error fetching blobs for block: {:#?}", e);
return RearchiveResp {
from,
to,
error: Some(e.downcast::<Error>().unwrap().to_string()),
};
}
Ok(false) => {
info!("block not found, skipping");
}
Ok(true) => {
info!("block rearchived successfully")
}
}
}
RearchiveResp { from, to, error: None }
}

async fn rearchive(&self, i: u64) -> Result<bool> {
let res = self.persist_blobs_for_block(BlockId::Slot(Slot::new(i)), true).await;

match res {
Err(e) => {
if let Some(error) = e.downcast_ref::<Error>() {
match error {
ServerMessage(sm) if sm.code == 404 => Ok(false),
_ => Err(eyre::eyre!(e)),
}
} else {
Err(eyre!(e))
}
}
Ok(_) => Ok(true)
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit 3041071

Please sign in to comment.