From 2f1a0d465a9765e5638045d89fd87ab14cf27547 Mon Sep 17 00:00:00 2001 From: popcnt1 <142196625+popcnt1@users.noreply.github.com> Date: Fri, 24 Jan 2025 04:31:25 +0800 Subject: [PATCH] feat(rooch-da): add async support for DA indexing and syncing Transitioned DA commands to support asynchronous operations, enhancing efficiency in chunk downloading, ledger transaction loading, and execution. Introduced auto-sync functionality and shutdown signal handling for smoother, interruptible execution. --- crates/rooch/src/commands/da/commands/exec.rs | 96 +++++++++++++------ .../rooch/src/commands/da/commands/index.rs | 88 ++++++++--------- crates/rooch/src/commands/da/commands/mod.rs | 80 ++++++++++------ crates/rooch/src/commands/da/mod.rs | 2 +- 4 files changed, 165 insertions(+), 101 deletions(-) diff --git a/crates/rooch/src/commands/da/commands/exec.rs b/crates/rooch/src/commands/da/commands/exec.rs index c8b532a7fa..5c4bb1a04a 100644 --- a/crates/rooch/src/commands/da/commands/exec.rs +++ b/crates/rooch/src/commands/da/commands/exec.rs @@ -38,15 +38,17 @@ use rooch_types::sequencer::SequencerInfo; use rooch_types::transaction::{ L1BlockWithBody, LedgerTransaction, LedgerTxData, TransactionSequenceInfo, }; -use std::cmp::{max, min}; +use std::cmp::{max, min, PartialEq}; use std::path::PathBuf; use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Duration; +use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::sync::watch; use tokio::time; +use tokio::time::sleep; use tracing::{info, warn}; /// exec LedgerTransaction List for verification. @@ -109,7 +111,7 @@ pub struct ExecCommand { pub enable_rocks_stats: bool, #[clap( - long = "open-da-path", + long = "open-da", help = "open da path for downloading chunks from DA. Workign with `mode=sync`" )] pub open_da_path: Option, @@ -133,6 +135,12 @@ pub enum ExecMode { Sync, } +impl PartialEq for ExecMode { + fn eq(&self, other: &Self) -> bool { + self.as_bits() == other.as_bits() + } +} + impl ExecMode { pub fn as_bits(&self) -> u8 { match self { @@ -168,12 +176,39 @@ impl ExecMode { impl ExecCommand { pub async fn execute(self) -> RoochResult<()> { - let mut exec_inner = self.build_exec_inner().await?; - exec_inner.run().await?; + let (shutdown_tx, shutdown_rx) = watch::channel(()); + + let shutdown_tx_clone = shutdown_tx.clone(); + + tokio::spawn(async move { + let mut sigterm = + signal(SignalKind::terminate()).expect("Failed to listen for SIGTERM"); + let mut sigint = signal(SignalKind::interrupt()).expect("Failed to listen for SIGINT"); + + tokio::select! { + _ = sigterm.recv() => { + info!("SIGTERM received, shutting down..."); + let _ = shutdown_tx_clone.send(()); + } + _ = sigint.recv() => { + info!("SIGINT received (Ctrl+C), shutting down..."); + let _ = shutdown_tx_clone.send(()); + } + } + }); + + let mut exec_inner = self.build_exec_inner(shutdown_rx.clone()).await?; + exec_inner.run(shutdown_rx).await?; + + let _ = shutdown_tx.send(()); + Ok(()) } - async fn build_exec_inner(&self) -> anyhow::Result { + async fn build_exec_inner( + &self, + shutdown_signal: watch::Receiver<()>, + ) -> anyhow::Result { let actor_system = ActorSystem::global_system(); let row_cache_size = self @@ -206,7 +241,14 @@ impl ExecCommand { ) .await?; - let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir.clone())?; + let ledger_tx_loader = match self.mode { + ExecMode::Sync => LedgerTxGetter::new_with_auto_sync( + self.open_da_path.clone().unwrap(), + self.segment_dir.clone(), + shutdown_signal, + )?, + _ => LedgerTxGetter::new(self.segment_dir.clone())?, + }; let tx_meta_store = TxMetaStore::new( self.tx_position_path.clone(), self.exp_root_path.clone(), @@ -312,9 +354,8 @@ impl ExecInner { } } - async fn run(&mut self) -> anyhow::Result<()> { - let (shutdown_tx, shutdown_rx) = watch::channel(()); - self.start_logging_task(shutdown_rx); + async fn run(&mut self, shutdown_signal: watch::Receiver<()>) -> anyhow::Result<()> { + self.start_logging_task(shutdown_signal); // larger buffer size to avoid rx starving caused by consumer has to access disks and request btc block. // after consumer load data(ledger_tx) from disk/btc client, burst to executor, need large buffer to avoid blocking. @@ -323,11 +364,7 @@ impl ExecInner { let producer = self.produce_tx(tx); let consumer = self.consume_tx(rx); - let result = self.join_producer_and_consumer(producer, consumer).await; - - // Send shutdown signal and ensure logging task exits - let _ = shutdown_tx.send(()); - result + self.join_producer_and_consumer(producer, consumer).await } fn update_startup_info_after_rollback( @@ -466,8 +503,13 @@ impl ExecInner { } let tx_list = self .ledger_tx_getter - .load_ledger_tx_list(next_block_number, false)?; + .load_ledger_tx_list(next_block_number, false) + .await?; if tx_list.is_none() { + if self.mode == ExecMode::Sync { + sleep(Duration::from_secs(5 * 60)).await; + continue; + } next_block_number -= 1; // no chunk belongs to this block_number break; } @@ -776,23 +818,23 @@ mod tests { #[test] fn test_exec_mode() { let mode = ExecMode::Exec; - assert_eq!(mode.need_exec(), true); - assert_eq!(mode.need_seq(), false); - assert_eq!(mode.need_all(), false); + assert!(mode.need_exec()); + assert!(!mode.need_seq()); + assert!(!mode.need_all()); let mode = ExecMode::Seq; - assert_eq!(mode.need_exec(), false); - assert_eq!(mode.need_seq(), true); - assert_eq!(mode.need_all(), false); + assert!(!mode.need_exec()); + assert!(mode.need_seq()); + assert!(!mode.need_all()); let mode = ExecMode::All; - assert_eq!(mode.need_exec(), true); - assert_eq!(mode.need_seq(), true); - assert_eq!(mode.need_all(), true); + assert!(mode.need_exec()); + assert!(mode.need_seq()); + assert!(mode.need_all()); let mode = ExecMode::Sync; - assert_eq!(mode.need_exec(), true); - assert_eq!(mode.need_seq(), true); - assert_eq!(mode.need_all(), true); + assert!(mode.need_exec()); + assert!(mode.need_seq()); + assert!(mode.need_all()); } } diff --git a/crates/rooch/src/commands/da/commands/index.rs b/crates/rooch/src/commands/da/commands/index.rs index 7e9c8d821e..133790f0ec 100644 --- a/crates/rooch/src/commands/da/commands/index.rs +++ b/crates/rooch/src/commands/da/commands/index.rs @@ -3,6 +3,7 @@ use crate::commands::da::commands::{LedgerTxGetter, TxPosition, TxPositionIndexer}; use anyhow::anyhow; +use rooch_types::error::{RoochError, RoochResult}; use std::cmp::max; use std::path::PathBuf; use tracing::info; @@ -28,7 +29,7 @@ pub struct IndexCommand { } impl IndexCommand { - pub fn execute(self) -> anyhow::Result<()> { + async fn exec_inner(self) -> anyhow::Result<()> { if self.index_file_path.is_some() { return TxPositionIndexer::load_or_dump( self.index_path, @@ -40,69 +41,66 @@ impl IndexCommand { let db_path = self.index_path.clone(); let reset_from = self.reset_from; let mut indexer = TxPositionIndexer::new(db_path, reset_from)?; - - info!("indexer stats after reset: {:?}", indexer.get_stats()?); + let stats_before_reset = indexer.get_stats()?; + info!("indexer stats after reset: {:?}", stats_before_reset); let segment_dir = self.segment_dir.ok_or(anyhow!("segment-dir is required"))?; - let ledger_tx_loader = LedgerTxGetter::new(segment_dir)?; - let mut block_number = indexer.last_block_number; // avoiding partial indexing - let mut expected_tx_order = indexer.last_tx_order + 1; let stop_at = if let Some(max_block_number) = self.max_block_number { max(max_block_number, ledger_tx_loader.get_max_chunk_id()) } else { ledger_tx_loader.get_max_chunk_id() }; - - let db = indexer.db; - let mut wtxn = indexer.db_env.write_txn()?; - + let mut block_number = indexer.last_block_number; // avoiding partial indexing + let mut expected_tx_order = indexer.last_tx_order + 1; let mut done_block = 0; - loop { - if block_number > stop_at { - break; - } - let tx_list = ledger_tx_loader.load_ledger_tx_list(block_number, true)?; + + while block_number <= stop_at { + let tx_list = ledger_tx_loader + .load_ledger_tx_list(block_number, true) + .await?; let tx_list = tx_list.unwrap(); - for mut ledger_tx in tx_list { - let tx_order = ledger_tx.sequence_info.tx_order; - if tx_order < expected_tx_order { - continue; - } - if tx_order == indexer.last_tx_order + 1 { - info!( - "begin to index block: {}, tx_order: {}", - block_number, tx_order - ); - } - if tx_order != expected_tx_order { - return Err(anyhow!( - "tx_order not continuous, expect: {}, got: {}", - expected_tx_order, - tx_order - )); + { + let db = indexer.db; + let mut wtxn = indexer.db_env.write_txn()?; + for mut ledger_tx in tx_list { + let tx_order = ledger_tx.sequence_info.tx_order; + if tx_order < expected_tx_order { + continue; + } + if tx_order == indexer.last_tx_order + 1 { + info!( + "begin to index block: {}, tx_order: {}", + block_number, tx_order + ); + } + if tx_order != expected_tx_order { + return Err(anyhow!( + "tx_order not continuous, expect: {}, got: {}", + expected_tx_order, + tx_order + )); + } + let tx_hash = ledger_tx.tx_hash(); + let tx_position = TxPosition { + tx_order, + tx_hash, + block_number, + }; + db.put(&mut wtxn, &tx_order, &tx_position)?; + expected_tx_order += 1; } - let tx_hash = ledger_tx.tx_hash(); - let tx_position = TxPosition { - tx_order, - tx_hash, - block_number, - }; - db.put(&mut wtxn, &tx_order, &tx_position)?; - expected_tx_order += 1; + wtxn.commit()?; } block_number += 1; done_block += 1; if done_block % 1000 == 0 { - wtxn.commit()?; - wtxn = indexer.db_env.write_txn()?; info!( "done: block_cnt: {}; next_block_number: {}", done_block, block_number ); } } - wtxn.commit()?; indexer.init_cursor()?; info!("indexer stats after job: {:?}", indexer.get_stats()?); @@ -110,4 +108,8 @@ impl IndexCommand { Ok(()) } + + pub async fn execute(self) -> RoochResult<()> { + self.exec_inner().await.map_err(RoochError::from) + } } diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index e72f656d25..a223fbcd98 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -28,8 +28,9 @@ use std::io::{BufRead, BufReader, BufWriter, Read, Write}; use std::path::PathBuf; use std::str::FromStr; use std::sync::atomic::AtomicU64; +use std::sync::Arc; use std::time::Duration; -use tokio::sync::watch; +use tokio::sync::{watch, RwLock}; use tokio::time; use tracing::{error, info, warn}; @@ -233,6 +234,7 @@ pub(crate) struct SegmentDownloader { open_da_path: String, segment_dir: PathBuf, next_chunk_id: u128, + chunks: Arc>>>, } impl SegmentDownloader { @@ -240,11 +242,13 @@ impl SegmentDownloader { open_da_path: String, segment_dir: PathBuf, next_chunk_id: u128, + chunks: Arc>>>, ) -> anyhow::Result { Ok(SegmentDownloader { open_da_path, segment_dir, next_chunk_id, + chunks, }) } @@ -253,7 +257,7 @@ impl SegmentDownloader { segment_dir: PathBuf, segment_tmp_dir: PathBuf, chunk_id: u128, - ) -> anyhow::Result { + ) -> anyhow::Result>> { let tmp_dir = segment_tmp_dir; let mut done_segments = Vec::new(); for segment_number in 0.. { @@ -268,7 +272,7 @@ impl SegmentDownloader { } else { if res.status() == StatusCode::NOT_FOUND { if segment_number == 0 { - return Ok(false); + return Ok(None); } else { break; // no more segments for this chunk } @@ -282,13 +286,13 @@ impl SegmentDownloader { } } - for segment_number in done_segments.into_iter().rev() { + for segment_number in done_segments.clone().into_iter().rev() { let tmp_path = tmp_dir.join(format!("{}_{}", chunk_id, segment_number)); let dst_path = segment_dir.join(format!("{}_{}", chunk_id, segment_number)); fs::rename(tmp_path, dst_path)?; } - Ok(true) + Ok(Some(done_segments)) } pub(crate) fn run_in_background( @@ -320,17 +324,18 @@ impl SegmentDownloader { loop { let res = Self::download_chunk(base_url.clone(), segment_dir.clone(), tmp_dir.clone(), chunk_id).await; match res { - Ok(true) => { + Ok(Some(segments)) => { + let mut chunks = self.chunks.write().await; + chunks.insert(chunk_id, segments); chunk_id += 1; } - Ok(false) => { - break; - } Err(e) => { warn!("Failed to download chunk: {}, error: {}", chunk_id, e); break; } - } + _ => { + break; + }} } } } @@ -342,7 +347,7 @@ impl SegmentDownloader { pub(crate) struct LedgerTxGetter { segment_dir: PathBuf, - chunks: HashMap>, + chunks: Arc>>>, max_chunk_id: u128, } @@ -352,46 +357,61 @@ impl LedgerTxGetter { Ok(LedgerTxGetter { segment_dir, - chunks, + chunks: Arc::new(RwLock::new(chunks)), max_chunk_id, }) } - pub(crate) fn new_with_open_da( + pub(crate) fn new_with_auto_sync( open_da_path: String, segment_dir: PathBuf, shutdown_signal: watch::Receiver<()>, ) -> anyhow::Result { let (chunks, _min_chunk_id, max_chunk_id) = collect_chunks(segment_dir.clone())?; - let downloader = - SegmentDownloader::new(open_da_path, segment_dir.clone(), max_chunk_id + 1)?; + let chunks_to_sync = Arc::new(RwLock::new(chunks.clone())); + + let downloader = SegmentDownloader::new( + open_da_path, + segment_dir.clone(), + max_chunk_id + 1, + chunks_to_sync.clone(), + )?; downloader.run_in_background(shutdown_signal)?; Ok(LedgerTxGetter { segment_dir, - chunks, + chunks: chunks_to_sync, max_chunk_id, }) } - pub(crate) fn load_ledger_tx_list( + pub(crate) async fn load_ledger_tx_list( &self, chunk_id: u128, must_has: bool, ) -> anyhow::Result>> { - let segments = self.chunks.get(&chunk_id); - if segments.is_none() { - if must_has { - return Err(anyhow::anyhow!("No segment found in chunk {}", chunk_id)); - } - return Ok(None); - } - let tx_list = get_tx_list_from_chunk( - self.segment_dir.clone(), - chunk_id, - segments.unwrap().clone(), - )?; - Ok(Some(tx_list)) + self.chunks + .read() + .await + .get(&chunk_id) + .cloned() + .map_or_else( + || { + if must_has { + Err(anyhow::anyhow!("No segment found in chunk {}", chunk_id)) + } else { + Ok(None) + } + }, + |segment_numbers| { + let tx_list = get_tx_list_from_chunk( + self.segment_dir.clone(), + chunk_id, + segment_numbers.clone(), + )?; + Ok(Some(tx_list)) + }, + ) } // only valid for no segments sync diff --git a/crates/rooch/src/commands/da/mod.rs b/crates/rooch/src/commands/da/mod.rs index ff040ae6cd..596e7402ba 100644 --- a/crates/rooch/src/commands/da/mod.rs +++ b/crates/rooch/src/commands/da/mod.rs @@ -27,7 +27,7 @@ impl CommandAction for DA { DACommand::Namespace(namespace) => namespace.execute().map(|_| "".to_owned()), DACommand::Exec(exec) => exec.execute().await.map(|_| "".to_owned()), DACommand::Index(index) => { - index.execute()?; + index.execute().await?; Ok("".to_owned()) } }