Skip to content

Commit

Permalink
refactor(rooch-da): restructure backend architecture for OpenDA (#3140)
Browse files Browse the repository at this point in the history
## Summary

Refactored OpenDA backend by introducing an adapter-based architecture, consolidating repeated logic, and reorganizing code structure. Simplified configurations, removed deprecated files, and added backend prioritization for efficient batch submission.
  • Loading branch information
popcnt1 authored Jan 1, 2025
1 parent d2083a6 commit 15c0421
Show file tree
Hide file tree
Showing 14 changed files with 756 additions and 629 deletions.
243 changes: 169 additions & 74 deletions crates/rooch-config/src/da_config.rs

Large diffs are not rendered by default.

164 changes: 43 additions & 121 deletions crates/rooch-da/src/actor/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
// SPDX-License-Identifier: Apache-2.0

use crate::actor::messages::{AppendTransactionMessage, GetServerStatusMessage};
use crate::backend::openda::OpenDABackend;
use crate::backend::DABackend;
use crate::backend::{DABackend, DABackends};
use crate::batcher::BatchMaker;
use anyhow::anyhow;
use async_trait::async_trait;
use coerce::actor::context::ActorContext;
use coerce::actor::message::Handler;
use coerce::actor::Actor;
use moveos_types::h256::H256;
use rooch_config::da_config::{DABackendConfigType, DAConfig};
use rooch_config::da_config::{DAConfig, DEFAULT_DA_BACKGROUND_SUBMIT_INTERVAL};
use rooch_store::da_store::DAMetaStore;
use rooch_store::transaction_store::TransactionStore;
use rooch_store::RoochStore;
Expand All @@ -25,18 +24,9 @@ use std::time;
use std::time::{Duration, SystemTime};
use tokio::sync::broadcast;

// default background submit interval: 5 seconds
// smaller interval helps to reduce the delay of blocks making and submitting, get more accurate block number by status query
// the major duty of background submitter is to submit unsubmitted blocks made before server start,
// in most cases, backends work well enough to submit new blocks in time, which means after submitting old blocks,
// background submitter will have nothing to do.
// Only few database operations are needed to catch up with the latest block numbers,
// so it's okay to have a small interval.
const DEFAULT_BACKGROUND_SUBMIT_INTERVAL: u64 = 5;

pub struct DAServerActor {
rooch_store: RoochStore,
backend_names: Vec<String>,
backend_identifiers: Vec<String>,
last_block_number: Option<u128>,
last_block_update_time: u64,
background_last_block_update_time: Arc<AtomicU64>,
Expand All @@ -45,83 +35,6 @@ pub struct DAServerActor {

impl Actor for DAServerActor {}

struct ServerBackends {
backends: Vec<Arc<dyn DABackend>>,
backend_names: Vec<String>,
submit_threshold: usize,
is_nop_backend: bool,
background_submit_interval: u64,
}

impl ServerBackends {
const DEFAULT_SUBMIT_THRESHOLD: usize = 1;
const DEFAULT_IS_NOP_BACKEND: bool = false;
const DEFAULT_BACKGROUND_INTERVAL: u64 = DEFAULT_BACKGROUND_SUBMIT_INTERVAL;

async fn process_backend_configs(
backend_configs: &[DABackendConfigType],
genesis_namespace: String,
backends: &mut Vec<Arc<dyn DABackend>>,
backend_names: &mut Vec<String>,
) -> anyhow::Result<usize> {
let mut available_backends = 0;
for backend_type in backend_configs {
#[allow(irrefutable_let_patterns)]
if let DABackendConfigType::OpenDa(openda_config) = backend_type {
let backend = OpenDABackend::new(openda_config, genesis_namespace.clone()).await?;
backends.push(Arc::new(backend));
backend_names.push(format!("openda-{}", openda_config.scheme));
available_backends += 1;
}
}
Ok(available_backends)
}

async fn build(da_config: DAConfig, genesis_namespace: String) -> anyhow::Result<Self> {
let mut backends: Vec<Arc<dyn DABackend>> = Vec::new();
let mut backend_names: Vec<String> = Vec::new();
let mut submit_threshold = Self::DEFAULT_SUBMIT_THRESHOLD;
let mut is_nop_backend = Self::DEFAULT_IS_NOP_BACKEND;
let background_submit_interval = da_config
.da_backend
.as_ref()
.and_then(|backend_config| backend_config.background_submit_interval)
.unwrap_or(Self::DEFAULT_BACKGROUND_INTERVAL);

let mut available_backends_count = 1; // Nop is always available
if let Some(mut backend_config) = da_config.da_backend {
submit_threshold = backend_config.calculate_submit_threshold();
available_backends_count = Self::process_backend_configs(
&backend_config.backends,
genesis_namespace,
&mut backends,
&mut backend_names,
)
.await?;
} else {
is_nop_backend = true;
backends.push(Arc::new(crate::backend::DABackendNopProxy {}));
backend_names.push("nop".to_string());
}

if available_backends_count < submit_threshold {
return Err(anyhow!(
"failed to start da: not enough backends for future submissions. exp>= {} act: {}",
submit_threshold,
available_backends_count
));
}

Ok(Self {
backends,
backend_names,
submit_threshold,
is_nop_backend,
background_submit_interval,
})
}
}

impl DAServerActor {
pub async fn new(
da_config: DAConfig,
Expand All @@ -131,27 +44,33 @@ impl DAServerActor {
shutdown_rx: broadcast::Receiver<()>,
) -> anyhow::Result<Self> {
let min_block_to_submit = da_config.da_min_block_to_submit;
let ServerBackends {
let background_submit_interval = da_config
.background_submit_interval
.unwrap_or(DEFAULT_DA_BACKGROUND_SUBMIT_INTERVAL);

let DABackends {
backends,
backend_names,
submit_threshold,
is_nop_backend,
background_submit_interval,
} = ServerBackends::build(da_config, genesis_namespace).await?;
} = DABackends::initialize(da_config.da_backend, genesis_namespace).await?;

let backend_identifiers: Vec<String> = backends
.iter()
.map(|backend| backend.get_identifier())
.collect();

let last_block_number = rooch_store.get_last_block_number()?;
let background_last_block_update_time = Arc::new(AtomicU64::new(0));
let server = DAServerActor {
rooch_store: rooch_store.clone(),
backend_names,
backend_identifiers,
last_block_number,
last_block_update_time: 0,
background_last_block_update_time: background_last_block_update_time.clone(),
batch_maker: BatchMaker::new(),
};

if !is_nop_backend {
Self::create_background_submitter(
if submit_threshold != 0 {
Self::run_background_submitter(
rooch_store,
sequencer_key,
backends,
Expand Down Expand Up @@ -195,14 +114,20 @@ impl DAServerActor {
None
};

let avail_backends = if self.backend_identifiers.is_empty() {
vec!["nop".to_string()]
} else {
self.backend_identifiers.clone()
};

Ok(DAServerStatus {
last_block_number: self.last_block_number,
last_tx_order,
last_block_update_time,
last_avail_block_number,
last_avail_tx_order,
last_avail_block_update_time,
avail_backends: self.backend_names.clone(),
avail_backends,
})
}

Expand All @@ -227,7 +152,7 @@ impl DAServerActor {

// Spawns a background submitter to handle unsubmitted blocks off the main thread.
// This prevents blocking other actor handlers and maintains the actor's responsiveness.
fn create_background_submitter(
fn run_background_submitter(
rooch_store: RoochStore,
sequencer_key: RoochKeyPair,
backends: Vec<Arc<dyn DABackend>>,
Expand All @@ -243,7 +168,6 @@ impl DAServerActor {
submitter: Submitter {
sequencer_key: sequencer_key.copy(),
rooch_store: rooch_store.clone(),
nop_backend: false, // background submitter should not be nop-backend
backends: backends.clone(),
submit_threshold,
},
Expand Down Expand Up @@ -322,14 +246,11 @@ pub(crate) struct Submitter {
sequencer_key: RoochKeyPair,
rooch_store: RoochStore,

nop_backend: bool,
backends: Vec<Arc<dyn DABackend>>,
submit_threshold: usize,
}

impl Submitter {
// TODO check all backends are idempotent or not, if not, we need to add a check to avoid duplicated submission
// assume it's idempotent for now
async fn submit_batch_raw(
&self,
block_range: BlockRange,
Expand All @@ -354,21 +275,18 @@ impl Submitter {
// submit batch
self.submit_batch_to_backends(batch).await?;

// update block submitting state if it's not nop-backend
// if it's nop-backend, we don't need to update submitting state, we may need to submit batch to other backends later by fetch unsubmitted blocks
if !self.nop_backend {
match self.rooch_store.set_submitting_block_done(
block_number,
tx_order_start,
tx_order_end,
batch_hash,
) {
Ok(_) => {}
Err(e) => {
tracing::warn!("{:?}, fail to set submitting block done.", e);
}
};
match self.rooch_store.set_submitting_block_done(
block_number,
tx_order_start,
tx_order_end,
batch_hash,
) {
Ok(_) => {}
Err(e) => {
tracing::warn!("{:?}, fail to set submitting block done.", e);
}
};

Ok(SignedDABatchMeta {
meta: batch_meta,
signature: meta_signature,
Expand All @@ -378,16 +296,20 @@ impl Submitter {
async fn submit_batch_to_backends(&self, batch: DABatch) -> anyhow::Result<()> {
let backends = self.backends.clone();
let submit_threshold = self.submit_threshold;

let batch = Arc::new(batch);

// submit to backend in order until meet submit_threshold
let mut success_count = 0;
for backend in &backends {
let submit_fut = backend.submit_batch(batch.clone());
match submit_fut.await {
Ok(_) => {
success_count += 1;
if success_count >= submit_threshold {
break;
}
// TODO parallel submit
// if success_count >= submit_threshold {
// break;
// }
}
Err(e) => {
tracing::warn!("{:?}, fail to submit batch to backend.", e);
Expand Down
60 changes: 34 additions & 26 deletions crates/rooch-da/src/backend/README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,37 @@
# Backend

Implementations of DA backend.
## Overview

```
+-------------------------+
| DAServerActor |
| (Manages Backends) |
+-------------------------+
|
v
+-------------------------+
| DABackend | <- Trait for all backends
+-------------------------+
^
|
v
+---------------------------------------------------+
| OpenDABackendManager | <- Manages OpenDA-specific backends
| - Common OpenDA logic (batches, configs, etc.) |
| - Reduces redundancy among OpenDA backends |
+---------------------------------------------------+
|
v
+-------------------------------------+
| OpenDAAdapter | <- Trait for OpenDA-specific backend operations
| - submit_segment(), ... |
| - Backend-specific operations |
+-------------------------------------+
^
|
v
+-------------------+ +-------------------+
| CelestiaAdapter | | AvailAdapter | <- Actual backend-specific adapter implementations
+-------------------+ +-------------------+
```

## Open-DA

> - fs: local/remote file system
>- avail: Avail project DA
>- celestia: Celestia DA
## New Backend

For new added backend:

If it could satisfy open-da config, it should be added to `open-da` folder as a module. If not, it should be added to
`backend` folder directly.

## Backend Implementations & Verification

| Name | Description | Category | Implementation | Local | Testnet | Mainnet |
|----------|--------------------------------------------|----------|------------------------------|-------|---------|---------|
| fs | file I/O based on local/remote file system | open-da | [fs](open-da/fs) ||||
| avail | Avail project DA | open-da | [avail](open-da/avail) | 🔲 | 🔲 | 🔲 |
| celestia | Celestia DA | open-da | [celestia](open-da/celestia) | 🔲 | 🔲 | 🔲 |
| gcs | file I/O based on Google Cloud Storage | open-da | [gcs](open-da/fs) ||||

- [x] ✅ done
- [ ] 🔲 unfinished
- [ ] ❌ has issues
Loading

0 comments on commit 15c0421

Please sign in to comment.