Skip to content

Commit

Permalink
feat(rooch-da): enhance adapter with submit stat tracking (#3211)
Browse files Browse the repository at this point in the history
* chore(rooch-db): improve formatting for rollback logs

Update `best_rollback` to use `print!` for consistent multiline log formatting. This ensures cleaner output and better readability of rollback details.

* feat(rooch-da): enhance adapter with submit stat tracking

Add `AdapterSubmitStat` for tracking chunk/segment submission status.
Integrated tracking logic into various DA backends and their adapters.
Updated `DAServerActor` to report backend statuses with latest completed chunks.

* chore(rooch-da): handle latest done chunk ID retrieval

Added comments to clarify its association with block numbers in the current ChunkV0 version.

* feat(rooch-rpc-api): add `avail_backends` to DAInfoView

Extend `DAInfoView` with an `avail_backends` field to include backend availability details. Updated `From<DAServerStatus>` impl to map the new field as part of the conversion.

* feat(rooch-open-rpc-spec): add avail_backends to DAInfoView

Add a required `avail_backends` property to the `DAInfoView` schema. This property is an array of pairs (string and u128) and enforces a strict length of two items in each pair.

* chore(rooch-da): update doc comments in status struct

Clarified documentation for `avail_backends` to specify block number update behavior. Improved overall comment readability for better understanding.
  • Loading branch information
popcnt1 authored Jan 22, 2025
1 parent d5a765f commit e2079bb
Show file tree
Hide file tree
Showing 12 changed files with 293 additions and 64 deletions.
29 changes: 22 additions & 7 deletions crates/rooch-da/src/actor/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::actor::messages::{AppendTransactionMessage, GetServerStatusMessage};
use crate::backend::openda::AdapterSubmitStat;
use crate::backend::{DABackend, DABackends};
use crate::batcher::BatchMaker;
use anyhow::anyhow;
Expand All @@ -27,6 +28,7 @@ use tokio::sync::broadcast;
pub struct DAServerActor {
rooch_store: RoochStore,
backend_identifiers: Vec<String>,
adapter_stats: Vec<AdapterSubmitStat>,
last_block_number: Option<u128>,
last_block_update_time: u64,
background_last_block_update_time: Arc<AtomicU64>,
Expand Down Expand Up @@ -57,12 +59,17 @@ impl DAServerActor {
.iter()
.map(|backend| backend.get_identifier())
.collect();
let adapter_stats = backends
.iter()
.map(|backend| backend.get_adapter_stats())
.collect();

let last_block_number = rooch_store.get_last_block_number()?;
let background_last_block_update_time = Arc::new(AtomicU64::new(0));
let server = DAServerActor {
rooch_store: rooch_store.clone(),
backend_identifiers,
adapter_stats,
last_block_number,
last_block_update_time: 0,
background_last_block_update_time: background_last_block_update_time.clone(),
Expand All @@ -85,7 +92,7 @@ impl DAServerActor {
Ok(server)
}

pub fn get_status(&self) -> anyhow::Result<DAServerStatus> {
pub async fn get_status(&self) -> anyhow::Result<DAServerStatus> {
let last_tx_order = if let Some(last_block_number) = self.last_block_number {
let last_block_state = self.rooch_store.get_block_state(last_block_number)?;
Some(last_block_state.block_range.tx_order_end)
Expand Down Expand Up @@ -114,11 +121,19 @@ impl DAServerActor {
None
};

let avail_backends = if self.backend_identifiers.is_empty() {
vec!["nop".to_string()]
} else {
self.backend_identifiers.clone()
};
let mut avail_backends = Vec::new();
for (identifier, stat) in self
.backend_identifiers
.iter()
.zip(self.adapter_stats.iter())
{
let identifier = identifier.clone();
// Get the latest done chunk id
// (it's block number too for ChunkV0, which is the only version now)
let future = stat.get_latest_done_chunk_id();
let result = future.await; // Resolve the future
avail_backends.push((identifier, result));
}

Ok(DAServerStatus {
last_block_number: self.last_block_number,
Expand Down Expand Up @@ -224,7 +239,7 @@ impl Handler<GetServerStatusMessage> for DAServerActor {
_msg: GetServerStatusMessage,
_ctx: &mut ActorContext,
) -> anyhow::Result<DAServerStatus> {
self.get_status()
self.get_status().await
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/rooch-da/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::backend::openda::OpenDABackendManager;
use crate::backend::openda::{AdapterSubmitStat, OpenDABackendManager};
use anyhow::anyhow;
use async_trait::async_trait;
use rooch_config::da_config::{DABackendConfig, DABackendConfigType};
Expand All @@ -26,6 +26,7 @@ pub const UNKNOWN_BACKEND_PRIORITY: usize = usize::MAX;
pub trait DABackend: Sync + Send {
async fn submit_batch(&self, batch: Arc<DABatch>) -> anyhow::Result<()>;
fn get_identifier(&self) -> String;
fn get_adapter_stats(&self) -> AdapterSubmitStat;
}

pub struct DABackends {
Expand Down
123 changes: 108 additions & 15 deletions crates/rooch-da/src/backend/openda/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use crate::backend::openda::celestia::{
CelestiaAdapter, WrappedNamespace, DEFAULT_CELESTIA_MAX_RETRIES,
DEFAULT_CELESTIA_MAX_SEGMENT_SIZE,
};
use crate::backend::openda::opendal::BACK_OFF_MIN_DELAY;
use crate::backend::openda::opendal::OpenDalAdapter;
use anyhow::anyhow;
use async_trait::async_trait;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::Scheme;
use rooch_config::da_config::{DABackendOpenDAConfig, OpenDAScheme};
use rooch_config::retrieve_map_config_value;
use rooch_types::da::segment::SegmentID;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

const DEFAULT_MAX_SEGMENT_SIZE: u64 = 8 * 1024 * 1024;
pub(crate) const DEFAULT_MAX_RETRY_TIMES: usize = 3;
Expand All @@ -28,9 +28,64 @@ pub(crate) trait OpenDAAdapter: Sync + Send {
&self,
segment_id: SegmentID,
segment_bytes: &[u8],
is_last_segment: bool,
) -> anyhow::Result<()>;
}

#[derive(Clone)]
pub struct AdapterSubmitStat {
inner: Arc<RwLock<AdapterSubmitStatInner>>,
}

struct AdapterSubmitStatInner {
chunk_id: u128,
segment_number_sum: u64,
latest_done_chunk_id: u128,
}

impl Default for AdapterSubmitStat {
fn default() -> Self {
Self::new()
}
}

impl AdapterSubmitStat {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(AdapterSubmitStatInner {
chunk_id: 0,
segment_number_sum: 0,
latest_done_chunk_id: 0,
})),
}
}

pub async fn add_done_segment(&self, segment_id: SegmentID, is_last_segment: bool) {
let mut inner = self.inner.write().await;
if segment_id.chunk_id != inner.chunk_id {
// new chunk
inner.segment_number_sum = 0;
inner.chunk_id = segment_id.chunk_id;
}
inner.segment_number_sum += segment_id.segment_number;
if is_last_segment {
let mut exp_segment_number_sum = 0;
for i in 0..=segment_id.segment_number {
exp_segment_number_sum += i;
}
if exp_segment_number_sum == inner.segment_number_sum {
// only accept segments added in order
inner.latest_done_chunk_id = inner.chunk_id;
}
}
}

pub async fn get_latest_done_chunk_id(&self) -> u128 {
let inner = self.inner.read().await;
inner.latest_done_chunk_id
}
}

#[derive(Clone)]
pub(crate) struct OpenDAAdapterConfig {
pub(crate) namespace: String,
Expand Down Expand Up @@ -73,7 +128,10 @@ impl OpenDAAdapterConfig {
})
}

pub(crate) async fn build(&self) -> anyhow::Result<Box<dyn OpenDAAdapter>> {
pub(crate) async fn build(
&self,
stats: AdapterSubmitStat,
) -> anyhow::Result<Box<dyn OpenDAAdapter>> {
let max_retries = self.max_retries;
let scheme = self.scheme.clone();
let scheme_config = self.scheme_config.clone();
Expand All @@ -82,7 +140,7 @@ impl OpenDAAdapterConfig {
OpenDAScheme::Avail => {
let avail_fusion_config =
AvailFusionClientConfig::from_scheme_config(scheme_config, max_retries)?;
let avail_fusion_client = avail_fusion_config.build_client()?;
let avail_fusion_client = avail_fusion_config.build_client(stats)?;
Box::new(avail_fusion_client)
}
OpenDAScheme::Celestia => {
Expand All @@ -93,21 +151,15 @@ impl OpenDAAdapterConfig {
&scheme_config["endpoint"],
scheme_config.get("auth_token").map(|s| s.as_str()),
max_retries,
stats,
)
.await?,
)
}
_ => {
let mut op = opendal::Operator::via_iter(Scheme::from(scheme), scheme_config)?;
op = op
.layer(
RetryLayer::new()
.with_max_times(max_retries)
.with_min_delay(BACK_OFF_MIN_DELAY),
)
.layer(LoggingLayer::default());
op.check().await?;
Box::new(op)
let adapter =
OpenDalAdapter::new(scheme, scheme_config, max_retries, stats).await?;
Box::new(adapter)
}
};
Ok(operator)
Expand Down Expand Up @@ -279,4 +331,45 @@ mod tests {

assert_eq!(map_config.get("default_storage_class").unwrap(), "STANDARD");
}

#[tokio::test]
async fn test_adapter_submit_stats() {
let stats = AdapterSubmitStat::new();
assert_eq!(stats.get_latest_done_chunk_id().await, 0);

let segment_id1 = SegmentID {
chunk_id: 1,
segment_number: 0,
};
stats.add_done_segment(segment_id1, false).await;
assert_eq!(stats.get_latest_done_chunk_id().await, 0);

let segment_id2 = SegmentID {
chunk_id: 1,
segment_number: 1,
};
stats.add_done_segment(segment_id2, true).await;
assert_eq!(stats.get_latest_done_chunk_id().await, 1);

let segment_id3 = SegmentID {
chunk_id: 2,
segment_number: 0,
};
stats.add_done_segment(segment_id3, false).await;
assert_eq!(stats.get_latest_done_chunk_id().await, 1);

let segment_id4 = SegmentID {
chunk_id: 2,
segment_number: 1,
};
stats.add_done_segment(segment_id4, false).await;
assert_eq!(stats.get_latest_done_chunk_id().await, 1);

let segment_id5 = SegmentID {
chunk_id: 2,
segment_number: 2,
};
stats.add_done_segment(segment_id5, true).await;
assert_eq!(stats.get_latest_done_chunk_id().await, 2);
}
}
42 changes: 28 additions & 14 deletions crates/rooch-da/src/backend/openda/avail.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::backend::openda::adapter::OpenDAAdapter;
use crate::backend::openda::adapter::{AdapterSubmitStat, OpenDAAdapter};
use anyhow::anyhow;
use async_trait::async_trait;
use base64::engine::general_purpose;
Expand All @@ -28,19 +28,14 @@ const DEFAULT_TURBO_PAYMENT_TOKEN: &str = "ethereum";

/// Avail client: A turbo and Light
/// Turbo client has higher priority, if not available, use the Light client
#[derive(Clone)]
pub struct AvailFusionAdapter {
stats: AdapterSubmitStat,
turbo_client: Option<AvailTurboClient>,
light_client: Option<AvailLightClient>,
}

#[async_trait]
impl OpenDAAdapter for AvailFusionAdapter {
async fn submit_segment(
&self,
segment_id: SegmentID,
segment_bytes: &[u8],
) -> anyhow::Result<()> {
impl AvailFusionAdapter {
async fn submit(&self, segment_id: SegmentID, segment_bytes: &[u8]) -> anyhow::Result<()> {
match &self.turbo_client {
Some(turbo_client) => {
match turbo_client.submit_segment(segment_id, segment_bytes).await {
Expand Down Expand Up @@ -69,6 +64,26 @@ impl OpenDAAdapter for AvailFusionAdapter {
}
}

#[async_trait]
impl OpenDAAdapter for AvailFusionAdapter {
async fn submit_segment(
&self,
segment_id: SegmentID,
segment_bytes: &[u8],
is_last_segment: bool,
) -> anyhow::Result<()> {
match self.submit(segment_id, segment_bytes).await {
Ok(_) => {
self.stats
.add_done_segment(segment_id, is_last_segment)
.await;
Ok(())
}
Err(error) => Err(error),
}
}
}

pub struct AvailFusionClientConfig {
pub turbo_endpoint: Option<String>,
pub turbo_auth_token: Option<String>,
Expand Down Expand Up @@ -103,7 +118,7 @@ impl AvailFusionClientConfig {
})
}

pub fn build_client(&self) -> anyhow::Result<AvailFusionAdapter> {
pub fn build_client(&self, stats: AdapterSubmitStat) -> anyhow::Result<AvailFusionAdapter> {
let turbo_client = if let Some(endpoint) = &self.turbo_endpoint {
Some(AvailTurboClient::new(
endpoint,
Expand All @@ -123,6 +138,7 @@ impl AvailFusionClientConfig {
};

Ok(AvailFusionAdapter {
stats,
turbo_client,
light_client,
})
Expand Down Expand Up @@ -183,8 +199,7 @@ pub struct AvailTurboClientSubmitResponse {
submission_id: String,
}

#[async_trait]
impl OpenDAAdapter for AvailTurboClient {
impl AvailTurboClient {
async fn submit_segment(
&self,
segment_id: SegmentID,
Expand Down Expand Up @@ -268,8 +283,7 @@ pub struct AvailLightClientSubmitResponse {
index: u32,
}

#[async_trait]
impl OpenDAAdapter for AvailLightClient {
impl AvailLightClient {
async fn submit_segment(
&self,
segment_id: SegmentID,
Expand Down
Loading

0 comments on commit e2079bb

Please sign in to comment.