diff --git a/crates/rooch-config/src/da_config.rs b/crates/rooch-config/src/da_config.rs index 763ea8cd30..3c592b17d4 100644 --- a/crates/rooch-config/src/da_config.rs +++ b/crates/rooch-config/src/da_config.rs @@ -11,74 +11,103 @@ use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; -const NAMESPACE_FROM_GENESIS_LENGTH: usize = 8; +const DA_NAMESPACE_FROM_GENESIS_LENGTH: usize = 8; const DEFAULT_OPENDA_FS_DIR: &str = "openda-fs"; - +// Default background submit interval: 5 seconds +// a smaller interval helps to reduce the delay of blocks-making and submitting. +// +// After the first background submit job which, the cursor will be updated to the last submitted block number. +// Only a few database operations are needed to catch up with the latest block numbers after a restart, +// so it's okay to have a small interval. +pub const DEFAULT_DA_BACKGROUND_SUBMIT_INTERVAL: u64 = 5; + +/// This enum specifies the strategy for submitting DA data. +/// +/// `All` means all backends must submit. +/// `Quorum` means a majority (>= n/2+1) must submit. +/// `Number(n)` means at least `n` backends must submit. +/// +/// No matter what the strategy is, an independent process will sync all the data to all backends. +/// Eventual consistency is guaranteed. #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(rename_all = "lowercase")] -pub enum DAServerSubmitStrategy { - // = n +pub enum DASubmitStrategy { All, - // >= n/2+1 Quorum, - // >= number, at least 1 Number(usize), } -impl FromStr for DAServerSubmitStrategy { +impl FromStr for DASubmitStrategy { type Err = String; fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { - "all" => Ok(DAServerSubmitStrategy::All), - "quorum" => Ok(DAServerSubmitStrategy::Quorum), + "all" => Ok(DASubmitStrategy::All), + "quorum" => Ok(DASubmitStrategy::Quorum), _ => { if let Ok(n) = s.parse::() { - Ok(DAServerSubmitStrategy::Number(n)) + Ok(DASubmitStrategy::Number(n)) } else { - Err(format!("invalid da server submit strategy: {}", s)) + Err(format!("invalid da submit strategy: {}", s)) } } } } } -impl Display for DAServerSubmitStrategy { +impl Display for DASubmitStrategy { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - DAServerSubmitStrategy::All => write!(f, "all"), - DAServerSubmitStrategy::Quorum => write!(f, "quorum"), - DAServerSubmitStrategy::Number(n) => write!(f, "{}", n), + DASubmitStrategy::All => write!(f, "all"), + DASubmitStrategy::Quorum => write!(f, "quorum"), + DASubmitStrategy::Number(n) => write!(f, "{}", n), } } } +/// Represents the available Open-DA schemes supported by the backend. +/// +/// Each enum variant corresponds to a specific backend type and its respective configuration. #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] #[serde(rename_all = "lowercase")] pub enum OpenDAScheme { - // local filesystem, main config: - // root: file path + /// Local file system backend. + /// + /// Main configuration: + /// - `root`: The root file path for storing data files. #[default] Fs, - // gcs(Google Could Service), main config: - // bucket - // credential/credential_path (using path instead) + + /// Google Cloud Storage (GCS) backend. + /// + /// Main configuration: + /// - `bucket`: The storage bucket. + /// - `credential`: The authentication credential (or `credential_path`, using a file path). Gcs, - // s3, main config: - // bucket - // region - // endpoint - // access_key_id - // secret_access_key + + /// Amazon S3-compatible backend. + /// + /// Main configuration: + /// - `bucket`: The storage bucket. + /// - `region`: The AWS region. + /// - `endpoint`: The S3 endpoint URL. + /// - `access_key_id`: The AWS access key ID. + /// - `secret_access_key`: The AWS secret access key. S3, - // Avail Fusion(TurboDA & Light Client), - // turbo_endpoint - // turbo_auth_token - // light_endpoint + + /// Avail Fusion backend, supporting TurboDA and Light Client. + /// + /// Main configuration: + /// - `turbo_endpoint`: The TurboDA service endpoint. + /// - `turbo_auth_token`: The authentication token for TurboDA. + /// - `light_endpoint`: The Light Client service endpoint. Avail, - // Celestia, main config: - // endpoint - // Option + + /// Celestia backend. + /// + /// Main configuration: + /// - `endpoint`: The Celestia service endpoint. + /// - `auth_token` (optional): The authentication token for accessing the Celestia backend. Celestia, } @@ -122,16 +151,39 @@ impl From for opendal::Scheme { } } +/// Configuration for Data Availability (DA). +/// +/// This struct controls how the node interacts with DA backends and specifies the starting point +/// for submitting blocks to DA. It balances flexibility, efficiency, and clarity while ensuring +/// compatibility with other configuration components. #[derive(Clone, Default, Debug, PartialEq, Deserialize, Serialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "kebab-case")] pub struct DAConfig { + /// Specifies the configuration for the DA backends. + /// + /// This contains details about the backends used to ensure data availability, + /// such as their types and additional configuration options. If not set, no DA + /// backends will be used. #[serde(skip_serializing_if = "Option::is_none")] pub da_backend: Option, + + /// The first block to be submitted to the DA. + /// + /// If left unset, all blocks will be submitted starting from the genesis block. + /// This allows flexibility in choosing whether to submit old blocks or just newly + /// created ones. #[serde(skip_serializing_if = "Option::is_none")] - /// The first block to be submitted. - /// If not set, all blocks will be submitted. pub da_min_block_to_submit: Option, + /// Specifies the interval for background submission in seconds. + /// If not set, the default value is `DEFAULT_DA_BACKGROUND_SUBMIT_INTERVAL`. + #[serde(skip_serializing_if = "Option::is_none")] + pub background_submit_interval: Option, + + /// Internal reference to the base configuration. + /// + /// This is used internally by the node to access basic configuration details + /// (e.g., data directories) and is initialized when the configuration is loaded. #[serde(skip)] base: Option>, } @@ -162,6 +214,9 @@ impl DAConfig { pub(crate) fn init(&mut self, base: Arc) -> anyhow::Result<()> { self.base = Some(base); + self.background_submit_interval + .get_or_insert(DEFAULT_DA_BACKGROUND_SUBMIT_INTERVAL); + let default_fs_root = self.get_openda_fs_dir(); if let Some(da_backend_cfg) = &mut self.da_backend { @@ -216,76 +271,117 @@ impl DAConfig { } } +/// Configuration for DA (Data Availability) backends. +/// +/// This struct defines how the node interacts with different DA backends, +/// including their types and the strategy used for submitting data to them. #[derive(Clone, Default, Debug, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] #[serde(deny_unknown_fields)] pub struct DABackendConfig { + /// Configures the submission strategy for DA operations. + /// + /// This option defines how many backends are required to successfully process data submissions: + /// - `All`: All backends must successfully submit the data. + /// - `Quorum`: A majority (>= n/2 + 1) of backends must submit. + /// - `Number(n)`: At least `n` backends must submit. + /// + /// If not set, the default behavior is equivalent to requiring `All`. #[serde(skip_serializing_if = "Option::is_none")] - pub submit_strategy: Option, // specifies the submission strategy of DA. 'all' with all backends, 'quorum' with quorum backends, 'n' with n backends, etc. - pub backends: Vec, // specifies the type of DA backends to be used. 'celestia' with corresponding Celestia backend configuration, 'foo' with corresponding foo backend configuration, etc. - #[serde(skip_serializing_if = "Option::is_none")] - pub background_submit_interval: Option, // specifies the interval of background submit in seconds. If not set, the default value is 600s. + pub submit_strategy: Option, + + /// Specifies the types of DA backends to be used. + /// + /// Each backend entry corresponds to a specific configuration. + /// For example, + /// - `OpenDA`: Configured for access to storage solutions like S3, GCS, etc. + /// - Additional backend types can extend this field as the system grows. + pub backends: Vec, } impl DABackendConfig { + const DEFAULT_SUBMIT_STRATEGY: DASubmitStrategy = DASubmitStrategy::Number(1); + pub fn calculate_submit_threshold(&mut self) -> usize { self.adjust_submit_strategy(); // Make sure submit_strategy is adjusted before calling this function. let backends_count = self.backends.len(); match self.submit_strategy { - Some(DAServerSubmitStrategy::All) => backends_count, - Some(DAServerSubmitStrategy::Quorum) => backends_count / 2 + 1, - Some(DAServerSubmitStrategy::Number(number)) => number, - None => backends_count, // Default to 'All' if submit_strategy is None + Some(DASubmitStrategy::All) => backends_count, + Some(DASubmitStrategy::Quorum) => backends_count / 2 + 1, + Some(DASubmitStrategy::Number(number)) => number, + None => 1, // Default to 1 } } fn adjust_submit_strategy(&mut self) { - // Set default strategy to All if it's None. let strategy = self .submit_strategy - .get_or_insert(DAServerSubmitStrategy::All); + .get_or_insert(Self::DEFAULT_SUBMIT_STRATEGY); let backends_count = self.backends.len(); // If it's Number, adjust the value to be within [1, n]. - if let DAServerSubmitStrategy::Number(ref mut num) = strategy { + if let DASubmitStrategy::Number(ref mut num) = strategy { *num = std::cmp::max(1, std::cmp::min(*num, backends_count)); } } } +/// Represents the type of DA (Data Availability) backend configuration. +/// +/// Each variant corresponds to a specific backend type and its associated configuration. #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(rename_all = "kebab-case")] pub enum DABackendConfigType { + /// OpenDA backend configuration. + /// + /// This variant contains the configuration specific to OpenDA, enabling access + /// to various storage backends (e.g., Avail, Celestia, S3, GCS, etc.). OpenDa(DABackendOpenDAConfig), } +/// Configuration for the Open DA backend. +/// +/// Open DA provides the ability to interact with various backend implementations. +/// Each backend is defined by its unique configuration options. #[derive(Clone, Default, Debug, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] #[serde(deny_unknown_fields)] -/// Open DA provides ability to access various backends pub struct DABackendOpenDAConfig { - /// specifies the type of backend to be used. 'gcs' with corresponding GCS server configuration, 's3' with corresponding S3 server configuration, etc + /// Specifies the type of backend to be used. + /// The `scheme` informs the backend logic on how to handle the associated configuration. #[serde(default)] pub scheme: OpenDAScheme, - /// specifies the configuration of the backend. 'gcs' with corresponding GCS server configuration, 's3' with corresponding S3 server configuration, etc. + + /// Specifies the detailed configuration for the selected backend. pub config: HashMap, + + /// Specifies the namespace for data storage, depending on the backend. + /// + /// - **Filesystem-like backends** (e.g., S3, GCS, local filesystem): + /// - The path is structured as `/` to store the segment. + /// - If not set: + /// - `/` is used as the full path. + /// - If the `root` field is set in the `config`, the full path becomes `//`. + /// - **Celestia**: + /// - The namespace must already exist and is specified directly in hexadecimal format. #[serde(skip_serializing_if = "Option::is_none")] - /// for fs backend: - /// / is the path to store the segment. - /// If not set, the / is the full path - /// If root is set in config, the // is the full path - /// for celestia: - /// must be existed, it's Namespace in hex pub namespace: Option, + + /// Specifies the maximum segment size (in bytes). + /// + /// - If not set, the backend implementation will use its default value. + /// - This helps determine the maximum allowed size for data segments. #[serde(skip_serializing_if = "Option::is_none")] - /// max segment size. - /// Set at crates/rooch-da/src/backend/openda if None. pub max_segment_size: Option, + + /// Specifies the maximum number of retry attempts for failed segment submissions. + /// + /// - If not set, the backend implementation will determine the default number of retries. + /// - This configuration can help fine-tune the reliability of segment submission in case of transient errors. #[serde(skip_serializing_if = "Option::is_none")] - /// maximum number of attempts to retransmit a failed segment submission. - pub max_retires: Option, + pub max_retries: Option, } /// Derives a namespace from the genesis hash for the DA backend. @@ -295,7 +391,7 @@ pub fn derive_namespace_from_genesis(genesis_hash: H256) -> String { let encoded_hash = hex::encode(genesis_hash.0); encoded_hash .chars() - .take(NAMESPACE_FROM_GENESIS_LENGTH) + .take(DA_NAMESPACE_FROM_GENESIS_LENGTH) .collect() } @@ -306,38 +402,37 @@ mod tests { #[test] fn calculate_submit_threshold() { let mut da_backend_config = DABackendConfig { - submit_strategy: Some(DAServerSubmitStrategy::All), + submit_strategy: Some(DASubmitStrategy::All), backends: vec![ DABackendConfigType::OpenDa(DABackendOpenDAConfig { scheme: OpenDAScheme::Fs, config: HashMap::new(), namespace: None, max_segment_size: None, - max_retires: None, + max_retries: None, }), DABackendConfigType::OpenDa(DABackendOpenDAConfig { scheme: OpenDAScheme::Fs, config: HashMap::new(), namespace: None, max_segment_size: None, - max_retires: None, + max_retries: None, }), ], - background_submit_interval: None, }; assert_eq!(da_backend_config.calculate_submit_threshold(), 2); - da_backend_config.submit_strategy = Some(DAServerSubmitStrategy::Quorum); + da_backend_config.submit_strategy = Some(DASubmitStrategy::Quorum); assert_eq!(da_backend_config.calculate_submit_threshold(), 2); - da_backend_config.submit_strategy = Some(DAServerSubmitStrategy::Number(1)); + da_backend_config.submit_strategy = Some(DASubmitStrategy::Number(1)); assert_eq!(da_backend_config.calculate_submit_threshold(), 1); - da_backend_config.submit_strategy = Some(DAServerSubmitStrategy::Number(3)); + da_backend_config.submit_strategy = Some(DASubmitStrategy::Number(3)); assert_eq!(da_backend_config.calculate_submit_threshold(), 2); da_backend_config.submit_strategy = None; - assert_eq!(da_backend_config.calculate_submit_threshold(), 2); + assert_eq!(da_backend_config.calculate_submit_threshold(), 1); } #[test] @@ -357,7 +452,7 @@ mod tests { .collect(), namespace: None, max_segment_size: None, - max_retires: None, + max_retries: None, }; let exp_celestia_config = DABackendOpenDAConfig { scheme: OpenDAScheme::Celestia, @@ -371,26 +466,26 @@ mod tests { "000000000000000000000000000000000000000102030405060708090a".to_string(), ), max_segment_size: None, - max_retires: None, + max_retries: None, }; let exp_fs_config = DABackendOpenDAConfig { scheme: OpenDAScheme::Fs, config: HashMap::new(), namespace: None, max_segment_size: None, - max_retires: None, + max_retries: None, }; let exp_da_config = DAConfig { da_backend: Some(DABackendConfig { - submit_strategy: Some(DAServerSubmitStrategy::All), + submit_strategy: Some(DASubmitStrategy::All), backends: vec![ DABackendConfigType::OpenDa(exp_gcs_config.clone()), DABackendConfigType::OpenDa(exp_celestia_config.clone()), DABackendConfigType::OpenDa(exp_fs_config.clone()), ], - background_submit_interval: None, }), da_min_block_to_submit: Some(340282366920938463463374607431768211455), + background_submit_interval: None, base: None, }; match DAConfig::from_str(da_config_str) { @@ -411,9 +506,9 @@ mod tests { da_backend: Some(DABackendConfig { submit_strategy: None, backends: vec![DABackendConfigType::OpenDa(exp_fs_config.clone())], - background_submit_interval: None, }), da_min_block_to_submit: None, + background_submit_interval: None, base: None, }; match DAConfig::from_str(da_config_str) { diff --git a/crates/rooch-da/src/actor/server.rs b/crates/rooch-da/src/actor/server.rs index eca3e0a6cc..d7bd045807 100644 --- a/crates/rooch-da/src/actor/server.rs +++ b/crates/rooch-da/src/actor/server.rs @@ -2,8 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::actor::messages::{AppendTransactionMessage, GetServerStatusMessage}; -use crate::backend::openda::OpenDABackend; -use crate::backend::DABackend; +use crate::backend::{DABackend, DABackends}; use crate::batcher::BatchMaker; use anyhow::anyhow; use async_trait::async_trait; @@ -11,7 +10,7 @@ use coerce::actor::context::ActorContext; use coerce::actor::message::Handler; use coerce::actor::Actor; use moveos_types::h256::H256; -use rooch_config::da_config::{DABackendConfigType, DAConfig}; +use rooch_config::da_config::{DAConfig, DEFAULT_DA_BACKGROUND_SUBMIT_INTERVAL}; use rooch_store::da_store::DAMetaStore; use rooch_store::transaction_store::TransactionStore; use rooch_store::RoochStore; @@ -25,18 +24,9 @@ use std::time; use std::time::{Duration, SystemTime}; use tokio::sync::broadcast; -// default background submit interval: 5 seconds -// smaller interval helps to reduce the delay of blocks making and submitting, get more accurate block number by status query -// the major duty of background submitter is to submit unsubmitted blocks made before server start, -// in most cases, backends work well enough to submit new blocks in time, which means after submitting old blocks, -// background submitter will have nothing to do. -// Only few database operations are needed to catch up with the latest block numbers, -// so it's okay to have a small interval. -const DEFAULT_BACKGROUND_SUBMIT_INTERVAL: u64 = 5; - pub struct DAServerActor { rooch_store: RoochStore, - backend_names: Vec, + backend_identifiers: Vec, last_block_number: Option, last_block_update_time: u64, background_last_block_update_time: Arc, @@ -45,83 +35,6 @@ pub struct DAServerActor { impl Actor for DAServerActor {} -struct ServerBackends { - backends: Vec>, - backend_names: Vec, - submit_threshold: usize, - is_nop_backend: bool, - background_submit_interval: u64, -} - -impl ServerBackends { - const DEFAULT_SUBMIT_THRESHOLD: usize = 1; - const DEFAULT_IS_NOP_BACKEND: bool = false; - const DEFAULT_BACKGROUND_INTERVAL: u64 = DEFAULT_BACKGROUND_SUBMIT_INTERVAL; - - async fn process_backend_configs( - backend_configs: &[DABackendConfigType], - genesis_namespace: String, - backends: &mut Vec>, - backend_names: &mut Vec, - ) -> anyhow::Result { - let mut available_backends = 0; - for backend_type in backend_configs { - #[allow(irrefutable_let_patterns)] - if let DABackendConfigType::OpenDa(openda_config) = backend_type { - let backend = OpenDABackend::new(openda_config, genesis_namespace.clone()).await?; - backends.push(Arc::new(backend)); - backend_names.push(format!("openda-{}", openda_config.scheme)); - available_backends += 1; - } - } - Ok(available_backends) - } - - async fn build(da_config: DAConfig, genesis_namespace: String) -> anyhow::Result { - let mut backends: Vec> = Vec::new(); - let mut backend_names: Vec = Vec::new(); - let mut submit_threshold = Self::DEFAULT_SUBMIT_THRESHOLD; - let mut is_nop_backend = Self::DEFAULT_IS_NOP_BACKEND; - let background_submit_interval = da_config - .da_backend - .as_ref() - .and_then(|backend_config| backend_config.background_submit_interval) - .unwrap_or(Self::DEFAULT_BACKGROUND_INTERVAL); - - let mut available_backends_count = 1; // Nop is always available - if let Some(mut backend_config) = da_config.da_backend { - submit_threshold = backend_config.calculate_submit_threshold(); - available_backends_count = Self::process_backend_configs( - &backend_config.backends, - genesis_namespace, - &mut backends, - &mut backend_names, - ) - .await?; - } else { - is_nop_backend = true; - backends.push(Arc::new(crate::backend::DABackendNopProxy {})); - backend_names.push("nop".to_string()); - } - - if available_backends_count < submit_threshold { - return Err(anyhow!( - "failed to start da: not enough backends for future submissions. exp>= {} act: {}", - submit_threshold, - available_backends_count - )); - } - - Ok(Self { - backends, - backend_names, - submit_threshold, - is_nop_backend, - background_submit_interval, - }) - } -} - impl DAServerActor { pub async fn new( da_config: DAConfig, @@ -131,27 +44,33 @@ impl DAServerActor { shutdown_rx: broadcast::Receiver<()>, ) -> anyhow::Result { let min_block_to_submit = da_config.da_min_block_to_submit; - let ServerBackends { + let background_submit_interval = da_config + .background_submit_interval + .unwrap_or(DEFAULT_DA_BACKGROUND_SUBMIT_INTERVAL); + + let DABackends { backends, - backend_names, submit_threshold, - is_nop_backend, - background_submit_interval, - } = ServerBackends::build(da_config, genesis_namespace).await?; + } = DABackends::initialize(da_config.da_backend, genesis_namespace).await?; + + let backend_identifiers: Vec = backends + .iter() + .map(|backend| backend.get_identifier()) + .collect(); let last_block_number = rooch_store.get_last_block_number()?; let background_last_block_update_time = Arc::new(AtomicU64::new(0)); let server = DAServerActor { rooch_store: rooch_store.clone(), - backend_names, + backend_identifiers, last_block_number, last_block_update_time: 0, background_last_block_update_time: background_last_block_update_time.clone(), batch_maker: BatchMaker::new(), }; - if !is_nop_backend { - Self::create_background_submitter( + if submit_threshold != 0 { + Self::run_background_submitter( rooch_store, sequencer_key, backends, @@ -195,6 +114,12 @@ impl DAServerActor { None }; + let avail_backends = if self.backend_identifiers.is_empty() { + vec!["nop".to_string()] + } else { + self.backend_identifiers.clone() + }; + Ok(DAServerStatus { last_block_number: self.last_block_number, last_tx_order, @@ -202,7 +127,7 @@ impl DAServerActor { last_avail_block_number, last_avail_tx_order, last_avail_block_update_time, - avail_backends: self.backend_names.clone(), + avail_backends, }) } @@ -227,7 +152,7 @@ impl DAServerActor { // Spawns a background submitter to handle unsubmitted blocks off the main thread. // This prevents blocking other actor handlers and maintains the actor's responsiveness. - fn create_background_submitter( + fn run_background_submitter( rooch_store: RoochStore, sequencer_key: RoochKeyPair, backends: Vec>, @@ -243,7 +168,6 @@ impl DAServerActor { submitter: Submitter { sequencer_key: sequencer_key.copy(), rooch_store: rooch_store.clone(), - nop_backend: false, // background submitter should not be nop-backend backends: backends.clone(), submit_threshold, }, @@ -322,14 +246,11 @@ pub(crate) struct Submitter { sequencer_key: RoochKeyPair, rooch_store: RoochStore, - nop_backend: bool, backends: Vec>, submit_threshold: usize, } impl Submitter { - // TODO check all backends are idempotent or not, if not, we need to add a check to avoid duplicated submission - // assume it's idempotent for now async fn submit_batch_raw( &self, block_range: BlockRange, @@ -354,21 +275,18 @@ impl Submitter { // submit batch self.submit_batch_to_backends(batch).await?; - // update block submitting state if it's not nop-backend - // if it's nop-backend, we don't need to update submitting state, we may need to submit batch to other backends later by fetch unsubmitted blocks - if !self.nop_backend { - match self.rooch_store.set_submitting_block_done( - block_number, - tx_order_start, - tx_order_end, - batch_hash, - ) { - Ok(_) => {} - Err(e) => { - tracing::warn!("{:?}, fail to set submitting block done.", e); - } - }; + match self.rooch_store.set_submitting_block_done( + block_number, + tx_order_start, + tx_order_end, + batch_hash, + ) { + Ok(_) => {} + Err(e) => { + tracing::warn!("{:?}, fail to set submitting block done.", e); + } }; + Ok(SignedDABatchMeta { meta: batch_meta, signature: meta_signature, @@ -378,6 +296,9 @@ impl Submitter { async fn submit_batch_to_backends(&self, batch: DABatch) -> anyhow::Result<()> { let backends = self.backends.clone(); let submit_threshold = self.submit_threshold; + + let batch = Arc::new(batch); + // submit to backend in order until meet submit_threshold let mut success_count = 0; for backend in &backends { @@ -385,9 +306,10 @@ impl Submitter { match submit_fut.await { Ok(_) => { success_count += 1; - if success_count >= submit_threshold { - break; - } + // TODO parallel submit + // if success_count >= submit_threshold { + // break; + // } } Err(e) => { tracing::warn!("{:?}, fail to submit batch to backend.", e); diff --git a/crates/rooch-da/src/backend/README.md b/crates/rooch-da/src/backend/README.md index d88173c8c2..376cebb379 100644 --- a/crates/rooch-da/src/backend/README.md +++ b/crates/rooch-da/src/backend/README.md @@ -1,29 +1,37 @@ # Backend -Implementations of DA backend. +## Overview + +``` ++-------------------------+ +| DAServerActor | +| (Manages Backends) | ++-------------------------+ +| +v ++-------------------------+ +| DABackend | <- Trait for all backends ++-------------------------+ +^ +| +v ++---------------------------------------------------+ +| OpenDABackendManager | <- Manages OpenDA-specific backends +| - Common OpenDA logic (batches, configs, etc.) | +| - Reduces redundancy among OpenDA backends | ++---------------------------------------------------+ +| +v ++-------------------------------------+ +| OpenDAAdapter | <- Trait for OpenDA-specific backend operations +| - submit_segment(), ... | +| - Backend-specific operations | ++-------------------------------------+ +^ +| +v ++-------------------+ +-------------------+ +| CelestiaAdapter | | AvailAdapter | <- Actual backend-specific adapter implementations ++-------------------+ +-------------------+ +``` -## Open-DA - -> - fs: local/remote file system ->- avail: Avail project DA ->- celestia: Celestia DA - -## New Backend - -For new added backend: - -If it could satisfy open-da config, it should be added to `open-da` folder as a module. If not, it should be added to -`backend` folder directly. - -## Backend Implementations & Verification - -| Name | Description | Category | Implementation | Local | Testnet | Mainnet | -|----------|--------------------------------------------|----------|------------------------------|-------|---------|---------| -| fs | file I/O based on local/remote file system | open-da | [fs](open-da/fs) | ✅ | ✅ | ✅ | -| avail | Avail project DA | open-da | [avail](open-da/avail) | 🔲 | 🔲 | 🔲 | -| celestia | Celestia DA | open-da | [celestia](open-da/celestia) | 🔲 | 🔲 | 🔲 | -| gcs | file I/O based on Google Cloud Storage | open-da | [gcs](open-da/fs) | ✅ | ✅ | ✅ | - -- [x] ✅ done -- [ ] 🔲 unfinished -- [ ] ❌ has issues \ No newline at end of file diff --git a/crates/rooch-da/src/backend/mod.rs b/crates/rooch-da/src/backend/mod.rs index 71cfc4bad8..4fba395f88 100644 --- a/crates/rooch-da/src/backend/mod.rs +++ b/crates/rooch-da/src/backend/mod.rs @@ -1,22 +1,117 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use crate::backend::openda::OpenDABackendManager; +use anyhow::anyhow; use async_trait::async_trait; +use rooch_config::da_config::{DABackendConfig, DABackendConfigType}; use rooch_types::da::batch::DABatch; +use std::collections::HashMap; +use std::sync::Arc; pub mod openda; +// manually set backend priority +// lower index means higher priority +pub const BACKENDS_PRIORITY: [&str; 5] = [ + "openda-fs", + "openda-gcs", + "openda-s3", + "openda-avail", + "openda-celestia", +]; +pub const UNKNOWN_BACKEND_PRIORITY: usize = usize::MAX; + #[async_trait] pub trait DABackend: Sync + Send { - async fn submit_batch(&self, batch: DABatch) -> anyhow::Result<()>; + async fn submit_batch(&self, batch: Arc) -> anyhow::Result<()>; + fn get_identifier(&self) -> String; +} + +pub struct DABackends { + pub backends: Vec>, + pub submit_threshold: usize, } -// DABackendNopProxy is a no-op implementation of DABackendProxy -pub struct DABackendNopProxy; +impl DABackends { + /// Initializes the DA backends based on the given configuration and genesis namespace. + pub async fn initialize( + config: Option, + genesis_namespace: String, + ) -> anyhow::Result { + let mut backends = Vec::new(); -#[async_trait] -impl DABackend for DABackendNopProxy { - async fn submit_batch(&self, _batch: DABatch) -> anyhow::Result<()> { - Ok(()) + let submit_threshold = if let Some(mut backend_config) = config { + let submit_threshold = backend_config.calculate_submit_threshold(); + + // Load backends from the provided configuration + let active_backends_count = Self::load_backends_from_configs( + &backend_config.backends, + genesis_namespace, + &mut backends, + ) + .await?; + + // Ensure enough backends are available for submission + if active_backends_count < submit_threshold { + return Err(anyhow!( + "failed to start DA: not enough backends for future submissions. exp >= {} act: {}", + submit_threshold, + active_backends_count + )); + } + + submit_threshold + } else { + 0 // No configuration provided, default threshold is 0 + }; + + let mut this = Self { + backends, + submit_threshold, + }; + this.sort_backends(); + + Ok(this) + } + + // sort backends by their priority + fn sort_backends(&mut self) { + let priority_map: HashMap<&str, usize> = BACKENDS_PRIORITY + .iter() + .enumerate() + .map(|(i, &id)| (id, i)) + .collect(); + + self.backends.sort_by(|a, b| { + let a_priority = priority_map + .get(a.get_identifier().as_str()) + .unwrap_or(&UNKNOWN_BACKEND_PRIORITY); + let b_priority = priority_map + .get(b.get_identifier().as_str()) + .unwrap_or(&UNKNOWN_BACKEND_PRIORITY); + a_priority.cmp(b_priority) + }); + } + + async fn load_backends_from_configs( + backend_configs: &[DABackendConfigType], + genesis_namespace: String, + backends: &mut Vec>, + ) -> anyhow::Result { + let mut available_backends = 0; + for backend_type in backend_configs { + #[allow(irrefutable_let_patterns)] + if let DABackendConfigType::OpenDa(open_da_config) = backend_type { + let mut open_da_config = open_da_config.clone(); + if open_da_config.namespace.is_none() { + open_da_config.namespace = Some(genesis_namespace.clone()); + } + let backend = OpenDABackendManager::new(&open_da_config).await?; + backends.push(Arc::new(backend)); + available_backends += 1; + } + } + Ok(available_backends) } } diff --git a/crates/rooch-da/src/backend/openda/adapter.rs b/crates/rooch-da/src/backend/openda/adapter.rs new file mode 100644 index 0000000000..2ac829d482 --- /dev/null +++ b/crates/rooch-da/src/backend/openda/adapter.rs @@ -0,0 +1,282 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use crate::backend::openda::avail::{ + AvailFusionClientConfig, DEFAULT_AVAIL_MAX_RETRIES, DEFAULT_AVAIL_MAX_SEGMENT_SIZE, +}; +use crate::backend::openda::celestia::{ + CelestiaAdapter, WrappedNamespace, DEFAULT_CELESTIA_MAX_RETRIES, + DEFAULT_CELESTIA_MAX_SEGMENT_SIZE, +}; +use crate::backend::openda::opendal::BACK_OFF_MIN_DELAY; +use anyhow::anyhow; +use async_trait::async_trait; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::Scheme; +use rooch_config::da_config::{DABackendOpenDAConfig, OpenDAScheme}; +use rooch_config::retrieve_map_config_value; +use rooch_types::da::segment::SegmentID; +use std::collections::HashMap; + +const DEFAULT_MAX_SEGMENT_SIZE: u64 = 8 * 1024 * 1024; +pub(crate) const DEFAULT_MAX_RETRY_TIMES: usize = 3; + +/// OpenDAAdapter connecting to OpenDA-compatible backends +#[async_trait] +pub(crate) trait OpenDAAdapter: Sync + Send { + async fn submit_segment( + &self, + segment_id: SegmentID, + segment_bytes: &[u8], + ) -> anyhow::Result<()>; +} + +#[derive(Clone)] +pub(crate) struct OpenDAAdapterConfig { + pub(crate) namespace: String, + pub(crate) max_segment_size: usize, + pub(crate) max_retries: usize, + pub(crate) scheme: OpenDAScheme, + pub(crate) scheme_config: HashMap, +} + +impl OpenDAAdapterConfig { + pub(crate) fn derive_from_open_da_config( + open_da_config: &DABackendOpenDAConfig, + ) -> anyhow::Result { + let scheme = open_da_config.scheme.clone(); + let namespace = open_da_config.namespace.clone().ok_or(anyhow!( + "namespace must have been initialed before creating OpenDAAdapterConfig" + ))?; + let mut scheme_config = open_da_config.config.clone(); + check_scheme_config(scheme.clone(), &mut scheme_config, namespace.clone())?; + + let (default_max_segment_size, default_max_retries) = match scheme { + OpenDAScheme::Avail => (DEFAULT_AVAIL_MAX_SEGMENT_SIZE, DEFAULT_AVAIL_MAX_RETRIES), + OpenDAScheme::Celestia => ( + DEFAULT_CELESTIA_MAX_SEGMENT_SIZE, + DEFAULT_CELESTIA_MAX_RETRIES, + ), + _ => (DEFAULT_MAX_SEGMENT_SIZE, DEFAULT_MAX_RETRY_TIMES), + }; + let max_retries = open_da_config.max_retries.unwrap_or(default_max_retries); + let max_segment_size = open_da_config + .max_segment_size + .unwrap_or(default_max_segment_size) as usize; + + Ok(OpenDAAdapterConfig { + namespace, + max_segment_size, + max_retries, + scheme, + scheme_config, + }) + } + + pub(crate) async fn build(&self) -> anyhow::Result> { + let max_retries = self.max_retries; + let scheme = self.scheme.clone(); + let scheme_config = self.scheme_config.clone(); + + let operator: Box = match scheme { + OpenDAScheme::Avail => { + let avail_fusion_config = + AvailFusionClientConfig::from_scheme_config(scheme_config, max_retries)?; + let avail_fusion_client = avail_fusion_config.build_client()?; + Box::new(avail_fusion_client) + } + OpenDAScheme::Celestia => { + let namespace = WrappedNamespace::from_string(&self.namespace.clone())?; + Box::new( + CelestiaAdapter::new( + namespace.into_inner(), + &scheme_config["endpoint"], + scheme_config.get("auth_token").map(|s| s.as_str()), + max_retries, + ) + .await?, + ) + } + _ => { + let mut op = opendal::Operator::via_iter(Scheme::from(scheme), scheme_config)?; + op = op + .layer( + RetryLayer::new() + .with_max_times(max_retries) + .with_min_delay(BACK_OFF_MIN_DELAY), + ) + .layer(LoggingLayer::default()); + op.check().await?; + Box::new(op) + } + }; + Ok(operator) + } +} + +fn check_scheme_config( + scheme: OpenDAScheme, + config: &mut HashMap, + namespace: String, +) -> anyhow::Result<()> { + match scheme { + OpenDAScheme::Fs => { + // root must be existed + check_config_exist(OpenDAScheme::Fs, config, "root")?; + } + OpenDAScheme::Gcs => { + retrieve_map_config_value(config, "bucket", Some("OPENDA_GCS_BUCKET"), None); + + retrieve_map_config_value(config, "credential", Some("OPENDA_GCS_CREDENTIAL"), None); + retrieve_map_config_value( + config, + "credential_path", + Some("OPENDA_GCS_CREDENTIAL_PATH"), + None, + ); + + retrieve_map_config_value( + config, + "default_storage_class", + Some("OPENDA_GCS_DEFAULT_STORAGE_CLASS"), + Some("STANDARD"), + ); + + check_config_exist(OpenDAScheme::Gcs, config, "bucket")?; + match ( + check_config_exist(OpenDAScheme::Gcs, config, "credential"), + check_config_exist(OpenDAScheme::Gcs, config, "credential_path"), + ) { + (Ok(_), Ok(_)) => Ok(()), + // credential existed + (Ok(_), Err(_)) => Ok(()), + // credential_path existed + (Err(_), Ok(_)) => Ok(()), + + (Err(_), Err(_)) => Err(anyhow!( + "credential no found in config for scheme {:?}", + OpenDAScheme::Gcs + )), + }?; + } + + OpenDAScheme::Celestia => { + check_config_exist(OpenDAScheme::Celestia, config, "endpoint")?; + } + _ => {} + }; + + // Set "root" in config for Filesystem-like backends (if not, `root` will be ignored directly) + // - If not set: + // - using /`namespace`. + // - If the `root` field is set in the `config`, set it to `/`. + let namespace_without_first_slash = namespace.trim_start_matches('/'); + if let Some(root) = config.get("root") { + let root = root.clone(); + config.insert( + "root".to_string(), + format!("{}/{}", root, namespace_without_first_slash), + ); + } else { + config.insert( + "root".to_string(), + format!("/{}", namespace_without_first_slash), + ); + } + Ok(()) +} + +fn check_config_exist( + scheme: OpenDAScheme, + config: &HashMap, + key: &str, +) -> anyhow::Result<()> { + if config.contains_key(key) { + Ok(()) + } else { + Err(anyhow!( + "key {} must be existed in config for scheme {:?}", + key, + scheme + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + const TEST_NAMESPACE: &str = "test_namespace"; + const TEST_NAMESPACE_SLASH: &str = "/test_namespace"; + + #[test] + fn check_scheme_config_fs() { + let scheme = OpenDAScheme::Fs; + let mut map_config = HashMap::new(); + let result = + check_scheme_config(scheme.clone(), &mut map_config, TEST_NAMESPACE.to_string()); + assert!( + result.is_err(), + "FS scheme should return Err if 'root' is missing" + ); + + map_config.insert("root".to_string(), "/some/path".to_string()); + let result = check_scheme_config(scheme, &mut map_config, TEST_NAMESPACE.to_string()); + assert!( + result.is_ok(), + "FS scheme should return Ok if 'root' is provided" + ); + assert_eq!(map_config.get("root").unwrap(), "/some/path/test_namespace"); + } + + #[test] + fn check_scheme_config_gcs() { + let scheme = OpenDAScheme::Gcs; + let mut map_config = HashMap::new(); + map_config.insert("credential".to_string(), "test_credential".to_string()); + let result = + check_scheme_config(scheme.clone(), &mut map_config, TEST_NAMESPACE.to_string()); + assert!( + result.is_err(), + "GCS scheme should return Err if 'bucket' is missing" + ); + + map_config.insert("bucket".to_string(), "test_bucket".to_string()); + let result = + check_scheme_config(scheme.clone(), &mut map_config, TEST_NAMESPACE.to_string()); + assert!( + result.is_ok(), + "GCS scheme should return Ok if 'bucket' and 'credential' are provided" + ); + + assert_eq!(map_config.get("root").unwrap(), "/test_namespace"); + map_config.insert("root".to_string(), "/some/path".to_string()); + let result = check_scheme_config( + scheme.clone(), + &mut map_config, + TEST_NAMESPACE_SLASH.to_string(), + ); + assert!(result.is_ok(), "{}", result.unwrap_err()); + assert_eq!(map_config.get("root").unwrap(), "/some/path/test_namespace"); + + map_config.remove("credential"); + map_config.insert( + "credential_path".to_string(), + "test_credential_path".to_string(), + ); + let result2 = + check_scheme_config(scheme.clone(), &mut map_config, TEST_NAMESPACE.to_string()); + assert!( + result2.is_ok(), + "GCS scheme should return Ok if 'bucket' and 'credential_path' are provided" + ); + + map_config.remove("credential_path"); + + let result3 = check_scheme_config(scheme, &mut map_config, TEST_NAMESPACE.to_string()); + assert!(result3.is_err(), "GCS scheme should return Err if neither 'credential' nor 'credential_path' are provided"); + + assert_eq!(map_config.get("default_storage_class").unwrap(), "STANDARD"); + } +} diff --git a/crates/rooch-da/src/backend/openda/avail.rs b/crates/rooch-da/src/backend/openda/avail.rs index 73d43005fe..0d282bf14a 100644 --- a/crates/rooch-da/src/backend/openda/avail.rs +++ b/crates/rooch-da/src/backend/openda/avail.rs @@ -1,7 +1,7 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::backend::openda::operator::Operator; +use crate::backend::openda::adapter::OpenDAAdapter; use anyhow::anyhow; use async_trait::async_trait; use base64::engine::general_purpose; @@ -15,58 +15,52 @@ use tokio::time::{sleep, Duration}; // small blob size for transaction to get included in a block quickly pub(crate) const DEFAULT_AVAIL_MAX_SEGMENT_SIZE: u64 = 256 * 1024; +// another mechanism guarantees eventual consistency, ok to retry once +pub(crate) const DEFAULT_AVAIL_MAX_RETRIES: usize = 1; +const MAX_BACKOFF_DELAY: Duration = Duration::from_secs(30); const MIN_BACKOFF_DELAY: Duration = Duration::from_millis(3000); const SUBMIT_API_PATH: &str = "v2/submit"; -// TurboDA provides relay service, -// so the delay is shorter. -// default retry duration(seconds): 0.5, 1, 2, 4, 8, 10 -const MIN_BACKOFF_DELAY_TURBO: Duration = Duration::from_millis(500); -const MAX_BACKOFF_DELAY_TURBO: Duration = Duration::from_secs(10); +const TURBO_MIN_BACKOFF_DELAY: Duration = Duration::from_millis(500); const TURBO_SUBMIT_API_PATH: &str = "user/submit_raw_data"; /// Avail client: A turbo and Light /// Turbo client has higher priority, if not available, use the Light client #[derive(Clone)] -pub struct AvailFusionClient { +pub struct AvailFusionAdapter { turbo_client: Option, light_client: Option, } #[async_trait] -impl Operator for AvailFusionClient { +impl OpenDAAdapter for AvailFusionAdapter { async fn submit_segment( &self, segment_id: SegmentID, - segment_bytes: Vec, - prefix: Option, + segment_bytes: &[u8], ) -> anyhow::Result<()> { - // Fallback to light_client if turbo_client is not available - if let Some(turbo_client) = &self.turbo_client { - let turbo_result = turbo_client - .submit_segment(segment_id, segment_bytes.clone(), prefix.clone()) - .await; - - if let Err(error) = turbo_result { - tracing::warn!( - "Failed to submit segment to Avail Turbo: {}, trying light_client if available", - error - ); - - if let Some(light_client) = &self.light_client { - return light_client - .submit_segment(segment_id, segment_bytes, prefix) - .await; - } else { - return Err(anyhow!("Light client is not available")); + match &self.turbo_client { + Some(turbo_client) => { + match turbo_client.submit_segment(segment_id, segment_bytes).await { + Ok(result) => return Ok(result), // No fallback needed + Err(error) => { + tracing::warn!( + "Failed to submit segment to Avail Turbo: {}, trying light_client if available", + error + ); + } } } + None => { + // No turbo_client, drop directly to light_client + } + } - turbo_result - } else if let Some(light_client) = &self.light_client { + // If it reaches here, try light_client if available + if let Some(light_client) = &self.light_client { light_client - .submit_segment(segment_id, segment_bytes, prefix) + .submit_segment(segment_id, segment_bytes) // Takes ownership here .await } else { Err(anyhow!("Both turbo and light clients are not available")) @@ -105,7 +99,7 @@ impl AvailFusionClientConfig { }) } - pub fn build_client(&self) -> anyhow::Result { + pub fn build_client(&self) -> anyhow::Result { let turbo_client = if let Some(endpoint) = &self.turbo_endpoint { Some(AvailTurboClient::new( endpoint, @@ -121,7 +115,7 @@ impl AvailFusionClientConfig { None }; - Ok(AvailFusionClient { + Ok(AvailFusionAdapter { turbo_client, light_client, }) @@ -180,17 +174,16 @@ pub struct AvailTurboClientSubmitResponse { } #[async_trait] -impl Operator for AvailTurboClient { +impl OpenDAAdapter for AvailTurboClient { async fn submit_segment( &self, segment_id: SegmentID, - segment_bytes: Vec, - _prefix: Option, + segment_bytes: &[u8], ) -> anyhow::Result<()> { let submit_url = format!("{}/{}", self.endpoint, TURBO_SUBMIT_API_PATH); let max_attempts = self.max_retries + 1; // max_attempts = max_retries + first attempt let mut attempts = 0; - let mut retry_delay = MIN_BACKOFF_DELAY_TURBO; + let mut retry_delay = TURBO_MIN_BACKOFF_DELAY; // token for turbo submit, // will support more tokens in the future @@ -204,7 +197,7 @@ impl Operator for AvailTurboClient { .query(&[("token", TOKEN.to_string())]) .bearer_auth(&self.auth_token) .header("Content-Type", "application/json") - .body(segment_bytes.clone()); + .body(segment_bytes.to_vec()); let response = request.send().await?; @@ -222,7 +215,7 @@ impl Operator for AvailTurboClient { retry_delay.as_millis(), ); sleep(retry_delay).await; - retry_delay = std::cmp::min(retry_delay * 2, MAX_BACKOFF_DELAY_TURBO); + retry_delay = std::cmp::min(retry_delay * 2, MAX_BACKOFF_DELAY); continue; } @@ -270,15 +263,14 @@ pub struct AvailLightClientSubmitResponse { } #[async_trait] -impl Operator for AvailLightClient { +impl OpenDAAdapter for AvailLightClient { async fn submit_segment( &self, segment_id: SegmentID, - segment_bytes: Vec, - _prefix: Option, + segment_bytes: &[u8], ) -> anyhow::Result<()> { let submit_url = format!("{}/{}", self.endpoint, SUBMIT_API_PATH); - let data = general_purpose::STANDARD.encode(&segment_bytes); + let data = general_purpose::STANDARD.encode(segment_bytes); let max_attempts = self.max_retries + 1; // max_attempts = max_retries + first attempt let mut attempts = 0; let mut retry_delay = MIN_BACKOFF_DELAY; diff --git a/crates/rooch-da/src/backend/openda/backend.rs b/crates/rooch-da/src/backend/openda/backend.rs deleted file mode 100644 index 108f8c0082..0000000000 --- a/crates/rooch-da/src/backend/openda/backend.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -use crate::backend::openda::avail::AvailFusionClientConfig; -use crate::backend::openda::celestia::{CelestiaClient, WrappedNamespace}; -use crate::backend::openda::opendal::BACK_OFF_MIN_DELAY; -use crate::backend::openda::operator::{Operator, OperatorConfig}; -use crate::backend::DABackend; -use async_trait::async_trait; -use opendal::layers::{LoggingLayer, RetryLayer}; -use opendal::Scheme; -use rooch_config::da_config::{DABackendOpenDAConfig, OpenDAScheme}; -use rooch_types::da::batch::DABatch; -use rooch_types::da::chunk::{Chunk, ChunkV0}; -use std::collections::HashMap; - -#[async_trait] -impl DABackend for OpenDABackend { - async fn submit_batch(&self, batch: DABatch) -> anyhow::Result<()> { - self.pub_batch(batch).await - } -} - -pub struct OpenDABackend { - operator_config: OperatorConfig, - operator: Box, -} - -impl OpenDABackend { - pub async fn new( - cfg: &DABackendOpenDAConfig, - genesis_namespace: String, - ) -> anyhow::Result { - let (operator_config, scheme_config) = - OperatorConfig::from_backend_config(cfg.clone(), genesis_namespace)?; - let operator = new_operator(operator_config.clone(), scheme_config).await?; - - Ok(Self { - operator_config, - operator, - }) - } - - pub async fn pub_batch(&self, batch: DABatch) -> anyhow::Result<()> { - let chunk: ChunkV0 = batch.into(); - - let scheme = self.operator_config.scheme.clone(); - let prefix = self.operator_config.namespace.clone(); - let max_segment_size = self.operator_config.max_segment_size; - - let segments = chunk.to_segments(max_segment_size); - for segment in segments { - let bytes = segment.to_bytes(); - - match self - .operator - .submit_segment(segment.get_id(), bytes, Some(prefix.clone())) - .await - { - Ok(_) => { - tracing::info!( - "submitted segment to open-da scheme: {:?}, segment_id: {:?}", - scheme, - segment.get_id(), - ); - } - Err(e) => { - tracing::warn!( - "failed to submit segment to open-da scheme: {:?}, segment_id: {:?}, error:{:?}", - scheme, - segment.get_id(), - e, - ); - return Err(e); - } - } - } - - Ok(()) - } -} - -async fn new_operator( - operator_config: OperatorConfig, - scheme_config: HashMap, -) -> anyhow::Result> { - let max_retries = operator_config.max_retries; - let scheme = operator_config.scheme.clone(); - - let operator: Box = match scheme { - OpenDAScheme::Avail => { - let avail_fusion_config = - AvailFusionClientConfig::from_scheme_config(scheme_config, max_retries)?; - let avail_fusion_client = avail_fusion_config.build_client()?; - Box::new(avail_fusion_client) - } - OpenDAScheme::Celestia => { - let namespace = WrappedNamespace::from_string(&operator_config.namespace.clone())?; - Box::new( - CelestiaClient::new( - namespace.into_inner(), - &scheme_config["endpoint"], - scheme_config.get("auth_token").map(|s| s.as_str()), - max_retries, - ) - .await?, - ) - } - _ => { - let mut op = opendal::Operator::via_iter(Scheme::from(scheme), scheme_config)?; - op = op - .layer( - RetryLayer::new() - .with_max_times(max_retries) - .with_min_delay(BACK_OFF_MIN_DELAY), - ) - .layer(LoggingLayer::default()); - op.check().await?; - Box::new(op) - } - }; - Ok(operator) -} diff --git a/crates/rooch-da/src/backend/openda/celestia.rs b/crates/rooch-da/src/backend/openda/celestia.rs index 1921fdf3ec..f502cb2d65 100644 --- a/crates/rooch-da/src/backend/openda/celestia.rs +++ b/crates/rooch-da/src/backend/openda/celestia.rs @@ -1,7 +1,7 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::backend::openda::operator::Operator; +use crate::backend::openda::adapter::OpenDAAdapter; use anyhow::anyhow; use async_trait::async_trait; use celestia_rpc::{BlobClient, Client}; @@ -15,19 +15,18 @@ use tokio::time::sleep; // small blob size for transaction to get included in a block quickly pub(crate) const DEFAULT_CELESTIA_MAX_SEGMENT_SIZE: u64 = 256 * 1024; -// default retry duration(seconds): 3, 9, 27, 81 -// 81s > 60s(5 blocks) for: -// By default, nodes will drop a transaction if it does not get included in 5 blocks (roughly 1 minute). -// At this point, the user must resubmit their transaction if they want it to eventually be included. +// another mechanism guarantees eventual consistency, ok to retry once +pub(crate) const DEFAULT_CELESTIA_MAX_RETRIES: usize = 1; const BACK_OFF_MIN_DELAY: Duration = Duration::from_millis(3000); +const MAX_BACKOFF_DELAY: Duration = Duration::from_secs(30); -pub(crate) struct CelestiaClient { +pub(crate) struct CelestiaAdapter { namespace: Namespace, client: Client, max_retries: usize, } -impl CelestiaClient { +impl CelestiaAdapter { pub async fn new( namespace: Namespace, endpoint: &str, @@ -35,7 +34,7 @@ impl CelestiaClient { max_retries: usize, ) -> anyhow::Result { let celestia_client = Client::new(endpoint, auth_token).await?; - Ok(CelestiaClient { + Ok(CelestiaAdapter { namespace, client: celestia_client, max_retries, @@ -44,14 +43,13 @@ impl CelestiaClient { } #[async_trait] -impl Operator for CelestiaClient { +impl OpenDAAdapter for CelestiaAdapter { async fn submit_segment( &self, segment_id: SegmentID, - segment_bytes: Vec, - _prefix: Option, + segment_bytes: &[u8], ) -> anyhow::Result<()> { - let blob = Blob::new(self.namespace, segment_bytes)?; + let blob = Blob::new(self.namespace, segment_bytes.to_vec())?; let max_attempts = self.max_retries + 1; // max_attempts = max_retries + first attempt let mut attempts = 0; let mut retry_delay = BACK_OFF_MIN_DELAY; @@ -82,7 +80,7 @@ impl Operator for CelestiaClient { retry_delay.as_millis(), ); sleep(retry_delay).await; - retry_delay *= 3; + retry_delay = std::cmp::min(retry_delay * 2, MAX_BACKOFF_DELAY); } else { return Err(anyhow!( "Failed to submit segment: {:?} to Celestia: {:?} after {} attempts", diff --git a/crates/rooch-da/src/backend/openda/manager.rs b/crates/rooch-da/src/backend/openda/manager.rs new file mode 100644 index 0000000000..904318b63e --- /dev/null +++ b/crates/rooch-da/src/backend/openda/manager.rs @@ -0,0 +1,66 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use crate::backend::openda::adapter::{OpenDAAdapter, OpenDAAdapterConfig}; +use crate::backend::openda::derive_identifier; +use crate::backend::DABackend; +use async_trait::async_trait; +use rooch_config::da_config::DABackendOpenDAConfig; +use rooch_types::da::batch::DABatch; +use rooch_types::da::chunk::{Chunk, ChunkV0}; +use std::sync::Arc; + +/// manage OpenDA backends while integrating specific adapter logic +pub struct OpenDABackendManager { + identifier: String, + adapter_config: OpenDAAdapterConfig, + adapter: Box, +} + +impl OpenDABackendManager { + pub async fn new( + open_da_config: &DABackendOpenDAConfig, + ) -> anyhow::Result { + let adapter_config = OpenDAAdapterConfig::derive_from_open_da_config(open_da_config)?; + let adapter = adapter_config.build().await?; + + Ok(Self { + identifier: derive_identifier(open_da_config.scheme.clone()), + adapter_config, + adapter, + }) + } +} + +#[async_trait] +impl DABackend for OpenDABackendManager { + async fn submit_batch(&self, batch: Arc) -> anyhow::Result<()> { + let chunk: ChunkV0 = (*batch).clone().into(); + + let max_segment_size = self.adapter_config.max_segment_size; + + let segments = chunk.to_segments(max_segment_size); + for segment in segments { + let bytes = segment.to_bytes(); + + match self.adapter.submit_segment(segment.get_id(), &bytes).await { + Ok(_) => {} + Err(e) => { + tracing::warn!( + "failed to submit segment to {:?}, segment_id: {:?}, error:{:?}", + self.identifier, + segment.get_id(), + e, + ); + return Err(e); + } + } + } + + Ok(()) + } + + fn get_identifier(&self) -> String { + self.identifier.clone() + } +} diff --git a/crates/rooch-da/src/backend/openda/mod.rs b/crates/rooch-da/src/backend/openda/mod.rs index 4c91cf8afc..b6bfc7795d 100644 --- a/crates/rooch-da/src/backend/openda/mod.rs +++ b/crates/rooch-da/src/backend/openda/mod.rs @@ -1,10 +1,15 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +mod adapter; mod avail; -mod backend; mod celestia; +mod manager; mod opendal; -mod operator; -pub use self::backend::OpenDABackend; +pub use self::manager::OpenDABackendManager; +use rooch_config::da_config::OpenDAScheme; + +pub fn derive_identifier(scheme: OpenDAScheme) -> String { + format!("openda-{}", scheme) +} diff --git a/crates/rooch-da/src/backend/openda/opendal.rs b/crates/rooch-da/src/backend/openda/opendal.rs index 3ddab1fc7b..1273712524 100644 --- a/crates/rooch-da/src/backend/openda/opendal.rs +++ b/crates/rooch-da/src/backend/openda/opendal.rs @@ -1,7 +1,7 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::backend::openda::operator::Operator; +use crate::backend::openda::adapter::OpenDAAdapter; use async_trait::async_trait; use rooch_types::da::segment::SegmentID; use std::time::Duration; @@ -9,19 +9,15 @@ use std::time::Duration; pub(crate) const BACK_OFF_MIN_DELAY: Duration = Duration::from_millis(300); #[async_trait] -impl Operator for opendal::Operator { +impl OpenDAAdapter for opendal::Operator { async fn submit_segment( &self, segment_id: SegmentID, - segment_bytes: Vec, - prefix: Option, + segment_bytes: &[u8], ) -> anyhow::Result<()> { - let path = match prefix { - Some(prefix) => format!("{}/{}", prefix, segment_id), - None => segment_id.to_string(), - }; + let path = segment_id.to_string(); let mut w = self.writer(&path).await?; - w.write(segment_bytes).await?; + w.write(segment_bytes.to_vec()).await?; w.close().await?; Ok(()) } diff --git a/crates/rooch-da/src/backend/openda/operator.rs b/crates/rooch-da/src/backend/openda/operator.rs deleted file mode 100644 index 1792687205..0000000000 --- a/crates/rooch-da/src/backend/openda/operator.rs +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -use crate::backend::openda::avail::DEFAULT_AVAIL_MAX_SEGMENT_SIZE; -use crate::backend::openda::celestia::DEFAULT_CELESTIA_MAX_SEGMENT_SIZE; -use anyhow::anyhow; -use async_trait::async_trait; -use rooch_config::da_config::{DABackendOpenDAConfig, OpenDAScheme}; -use rooch_config::retrieve_map_config_value; -use rooch_types::da::segment::SegmentID; -use std::collections::HashMap; - -const DEFAULT_MAX_SEGMENT_SIZE: u64 = 8 * 1024 * 1024; -pub(crate) const DEFAULT_MAX_RETRY_TIMES: usize = 4; - -#[async_trait] -pub(crate) trait Operator: Sync + Send { - async fn submit_segment( - &self, - segment_id: SegmentID, - segment_bytes: Vec, - prefix: Option, - ) -> anyhow::Result<()>; -} - -#[derive(Clone)] -pub(crate) struct OperatorConfig { - pub(crate) namespace: String, - pub(crate) scheme: OpenDAScheme, - pub(crate) max_segment_size: usize, - pub(crate) max_retries: usize, -} - -impl OperatorConfig { - pub(crate) fn from_backend_config( - cfg: DABackendOpenDAConfig, - genesis_namespace: String, - ) -> anyhow::Result<(Self, HashMap)> { - let backend_config = cfg.clone(); - let max_retries = backend_config - .max_retires - .unwrap_or(DEFAULT_MAX_RETRY_TIMES); - let scheme = backend_config.scheme; - if scheme == OpenDAScheme::Celestia && backend_config.namespace.is_none() { - return Err(anyhow!( - "namespace must be provided for scheme {:?}", - scheme - )); - } - let namespace = backend_config.namespace.unwrap_or(genesis_namespace); - let mut scheme_config = backend_config.config; - check_map_config(scheme.clone(), &mut scheme_config)?; - - let default_max_segment_size = match scheme { - OpenDAScheme::Avail => DEFAULT_AVAIL_MAX_SEGMENT_SIZE, - OpenDAScheme::Celestia => DEFAULT_CELESTIA_MAX_SEGMENT_SIZE, - _ => DEFAULT_MAX_SEGMENT_SIZE, - }; - - let max_segment_size = cfg.max_segment_size.unwrap_or(default_max_segment_size) as usize; - - Ok(( - OperatorConfig { - namespace, - scheme, - max_segment_size, - max_retries, - }, - scheme_config, - )) - } -} - -fn check_map_config( - scheme: OpenDAScheme, - map_config: &mut HashMap, -) -> anyhow::Result<()> { - match scheme { - OpenDAScheme::Fs => { - // root must be existed - check_config_exist(OpenDAScheme::Fs, map_config, "root") - } - OpenDAScheme::Gcs => { - retrieve_map_config_value(map_config, "bucket", Some("OPENDA_GCS_BUCKET"), None); - - retrieve_map_config_value( - map_config, - "credential", - Some("OPENDA_GCS_CREDENTIAL"), - None, - ); - retrieve_map_config_value( - map_config, - "credential_path", - Some("OPENDA_GCS_CREDENTIAL_PATH"), - None, - ); - - retrieve_map_config_value( - map_config, - "default_storage_class", - Some("OPENDA_GCS_DEFAULT_STORAGE_CLASS"), - Some("STANDARD"), - ); - - check_config_exist(OpenDAScheme::Gcs, map_config, "bucket")?; - match ( - check_config_exist(OpenDAScheme::Gcs, map_config, "credential"), - check_config_exist(OpenDAScheme::Gcs, map_config, "credential_path"), - ) { - (Ok(_), Ok(_)) => Ok(()), - // credential existed - (Ok(_), Err(_)) => Ok(()), - // credential_path existed - (Err(_), Ok(_)) => Ok(()), - - (Err(_), Err(_)) => Err(anyhow!( - "credential no found in config for scheme {:?}", - OpenDAScheme::Gcs - )), - } - } - OpenDAScheme::Avail => check_config_exist(OpenDAScheme::Avail, map_config, "endpoint"), - OpenDAScheme::Celestia => { - check_config_exist(OpenDAScheme::Celestia, map_config, "endpoint") - } - OpenDAScheme::S3 => { - todo!("s3 backend is not implemented yet"); - } - } -} - -fn check_config_exist( - scheme: OpenDAScheme, - config: &HashMap, - key: &str, -) -> anyhow::Result<()> { - if config.contains_key(key) { - Ok(()) - } else { - Err(anyhow!( - "key {} must be existed in config for scheme {:?}", - key, - scheme - )) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::collections::HashMap; - - #[test] - fn test_check_backend_config_fs() { - let scheme = OpenDAScheme::Fs; - let mut map_config = HashMap::new(); - let result = check_map_config(scheme.clone(), &mut map_config); - assert!( - result.is_err(), - "FS scheme should return Err if 'root' is missing" - ); - - map_config.insert("root".to_string(), "/some/path".to_string()); - let result = check_map_config(scheme, &mut map_config); - assert!( - result.is_ok(), - "FS scheme should return Ok if 'root' is provided" - ); - } - - #[test] - fn test_check_backend_config_gcs() { - let scheme = OpenDAScheme::Gcs; - let mut map_config = HashMap::new(); - map_config.insert("credential".to_string(), "test_credential".to_string()); - let result = check_map_config(scheme.clone(), &mut map_config); - assert!( - result.is_err(), - "GCS scheme should return Err if 'bucket' is missing" - ); - - map_config.insert("bucket".to_string(), "test_bucket".to_string()); - let result = check_map_config(scheme.clone(), &mut map_config); - assert!( - result.is_ok(), - "GCS scheme should return Ok if 'bucket' and 'credential' are provided" - ); - - map_config.remove("credential"); - map_config.insert( - "credential_path".to_string(), - "test_credential_path".to_string(), - ); - let result2 = check_map_config(scheme.clone(), &mut map_config); - assert!( - result2.is_ok(), - "GCS scheme should return Ok if 'bucket' and 'credential_path' are provided" - ); - - map_config.remove("credential_path"); - - let result3 = check_map_config(scheme, &mut map_config); - assert!(result3.is_err(), "GCS scheme should return Err if neither 'credential' nor 'credential_path' are provided"); - - assert_eq!(map_config.get("default_storage_class").unwrap(), "STANDARD"); - } -} diff --git a/crates/rooch-store/src/da_store/mod.rs b/crates/rooch-store/src/da_store/mod.rs index 6e021340ab..6fd775a152 100644 --- a/crates/rooch-store/src/da_store/mod.rs +++ b/crates/rooch-store/src/da_store/mod.rs @@ -540,7 +540,6 @@ impl DAMetaStore for DAMetaDBStore { Ok(blocks) } - // TODO combine set_submitting_block_done and set_background_submit_block_cursor: no more user-facing submit block fn set_submitting_block_done( &self, block_number: u128, diff --git a/crates/rooch-types/src/da/status.rs b/crates/rooch-types/src/da/status.rs index 61a1fad957..8e8b07d1d9 100644 --- a/crates/rooch-types/src/da/status.rs +++ b/crates/rooch-types/src/da/status.rs @@ -28,6 +28,6 @@ pub struct DAServerStatus { /// 1. DA backends collapse /// 2. RoochStore is not consistent (cannot get tx from DB by tx order) pub last_avail_block_update_time: Option, - /// The available backends names + /// The available backend_identifiers pub avail_backends: Vec, }