Skip to content

Commit

Permalink
feat(rooch-da): add async support for DA indexing and syncing
Browse files Browse the repository at this point in the history
Transitioned DA commands to support asynchronous operations, enhancing efficiency in chunk downloading, ledger transaction loading, and execution. Introduced auto-sync functionality and shutdown signal handling for smoother, interruptible execution.
  • Loading branch information
popcnt1 committed Jan 23, 2025
1 parent 1e096cd commit 2f1a0d4
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 101 deletions.
96 changes: 69 additions & 27 deletions crates/rooch/src/commands/da/commands/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ use rooch_types::sequencer::SequencerInfo;
use rooch_types::transaction::{
L1BlockWithBody, LedgerTransaction, LedgerTxData, TransactionSequenceInfo,
};
use std::cmp::{max, min};
use std::cmp::{max, min, PartialEq};
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::sync::watch;
use tokio::time;
use tokio::time::sleep;
use tracing::{info, warn};

/// exec LedgerTransaction List for verification.
Expand Down Expand Up @@ -109,7 +111,7 @@ pub struct ExecCommand {
pub enable_rocks_stats: bool,

#[clap(
long = "open-da-path",
long = "open-da",
help = "open da path for downloading chunks from DA. Workign with `mode=sync`"
)]
pub open_da_path: Option<String>,
Expand All @@ -133,6 +135,12 @@ pub enum ExecMode {
Sync,
}

impl PartialEq for ExecMode {
fn eq(&self, other: &Self) -> bool {
self.as_bits() == other.as_bits()
}
}

impl ExecMode {
pub fn as_bits(&self) -> u8 {
match self {
Expand Down Expand Up @@ -168,12 +176,39 @@ impl ExecMode {

impl ExecCommand {
pub async fn execute(self) -> RoochResult<()> {
let mut exec_inner = self.build_exec_inner().await?;
exec_inner.run().await?;
let (shutdown_tx, shutdown_rx) = watch::channel(());

let shutdown_tx_clone = shutdown_tx.clone();

tokio::spawn(async move {
let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to listen for SIGTERM");
let mut sigint = signal(SignalKind::interrupt()).expect("Failed to listen for SIGINT");

tokio::select! {
_ = sigterm.recv() => {
info!("SIGTERM received, shutting down...");
let _ = shutdown_tx_clone.send(());
}
_ = sigint.recv() => {
info!("SIGINT received (Ctrl+C), shutting down...");
let _ = shutdown_tx_clone.send(());
}
}
});

let mut exec_inner = self.build_exec_inner(shutdown_rx.clone()).await?;
exec_inner.run(shutdown_rx).await?;

let _ = shutdown_tx.send(());

Ok(())
}

async fn build_exec_inner(&self) -> anyhow::Result<ExecInner> {
async fn build_exec_inner(
&self,
shutdown_signal: watch::Receiver<()>,
) -> anyhow::Result<ExecInner> {
let actor_system = ActorSystem::global_system();

let row_cache_size = self
Expand Down Expand Up @@ -206,7 +241,14 @@ impl ExecCommand {
)
.await?;

let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir.clone())?;
let ledger_tx_loader = match self.mode {
ExecMode::Sync => LedgerTxGetter::new_with_auto_sync(
self.open_da_path.clone().unwrap(),
self.segment_dir.clone(),
shutdown_signal,
)?,
_ => LedgerTxGetter::new(self.segment_dir.clone())?,
};
let tx_meta_store = TxMetaStore::new(
self.tx_position_path.clone(),
self.exp_root_path.clone(),
Expand Down Expand Up @@ -312,9 +354,8 @@ impl ExecInner {
}
}

async fn run(&mut self) -> anyhow::Result<()> {
let (shutdown_tx, shutdown_rx) = watch::channel(());
self.start_logging_task(shutdown_rx);
async fn run(&mut self, shutdown_signal: watch::Receiver<()>) -> anyhow::Result<()> {
self.start_logging_task(shutdown_signal);

// larger buffer size to avoid rx starving caused by consumer has to access disks and request btc block.
// after consumer load data(ledger_tx) from disk/btc client, burst to executor, need large buffer to avoid blocking.
Expand All @@ -323,11 +364,7 @@ impl ExecInner {
let producer = self.produce_tx(tx);
let consumer = self.consume_tx(rx);

let result = self.join_producer_and_consumer(producer, consumer).await;

// Send shutdown signal and ensure logging task exits
let _ = shutdown_tx.send(());
result
self.join_producer_and_consumer(producer, consumer).await
}

fn update_startup_info_after_rollback(
Expand Down Expand Up @@ -466,8 +503,13 @@ impl ExecInner {
}
let tx_list = self
.ledger_tx_getter
.load_ledger_tx_list(next_block_number, false)?;
.load_ledger_tx_list(next_block_number, false)
.await?;
if tx_list.is_none() {
if self.mode == ExecMode::Sync {
sleep(Duration::from_secs(5 * 60)).await;
continue;
}
next_block_number -= 1; // no chunk belongs to this block_number
break;
}
Expand Down Expand Up @@ -776,23 +818,23 @@ mod tests {
#[test]
fn test_exec_mode() {
let mode = ExecMode::Exec;
assert_eq!(mode.need_exec(), true);
assert_eq!(mode.need_seq(), false);
assert_eq!(mode.need_all(), false);
assert!(mode.need_exec());
assert!(!mode.need_seq());
assert!(!mode.need_all());

let mode = ExecMode::Seq;
assert_eq!(mode.need_exec(), false);
assert_eq!(mode.need_seq(), true);
assert_eq!(mode.need_all(), false);
assert!(!mode.need_exec());
assert!(mode.need_seq());
assert!(!mode.need_all());

let mode = ExecMode::All;
assert_eq!(mode.need_exec(), true);
assert_eq!(mode.need_seq(), true);
assert_eq!(mode.need_all(), true);
assert!(mode.need_exec());
assert!(mode.need_seq());
assert!(mode.need_all());

let mode = ExecMode::Sync;
assert_eq!(mode.need_exec(), true);
assert_eq!(mode.need_seq(), true);
assert_eq!(mode.need_all(), true);
assert!(mode.need_exec());
assert!(mode.need_seq());
assert!(mode.need_all());
}
}
88 changes: 45 additions & 43 deletions crates/rooch/src/commands/da/commands/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::commands::da::commands::{LedgerTxGetter, TxPosition, TxPositionIndexer};
use anyhow::anyhow;
use rooch_types::error::{RoochError, RoochResult};
use std::cmp::max;
use std::path::PathBuf;
use tracing::info;
Expand All @@ -28,7 +29,7 @@ pub struct IndexCommand {
}

impl IndexCommand {
pub fn execute(self) -> anyhow::Result<()> {
async fn exec_inner(self) -> anyhow::Result<()> {
if self.index_file_path.is_some() {
return TxPositionIndexer::load_or_dump(
self.index_path,
Expand All @@ -40,74 +41,75 @@ impl IndexCommand {
let db_path = self.index_path.clone();
let reset_from = self.reset_from;
let mut indexer = TxPositionIndexer::new(db_path, reset_from)?;

info!("indexer stats after reset: {:?}", indexer.get_stats()?);
let stats_before_reset = indexer.get_stats()?;
info!("indexer stats after reset: {:?}", stats_before_reset);

let segment_dir = self.segment_dir.ok_or(anyhow!("segment-dir is required"))?;

let ledger_tx_loader = LedgerTxGetter::new(segment_dir)?;
let mut block_number = indexer.last_block_number; // avoiding partial indexing
let mut expected_tx_order = indexer.last_tx_order + 1;
let stop_at = if let Some(max_block_number) = self.max_block_number {
max(max_block_number, ledger_tx_loader.get_max_chunk_id())
} else {
ledger_tx_loader.get_max_chunk_id()
};

let db = indexer.db;
let mut wtxn = indexer.db_env.write_txn()?;

let mut block_number = indexer.last_block_number; // avoiding partial indexing
let mut expected_tx_order = indexer.last_tx_order + 1;
let mut done_block = 0;
loop {
if block_number > stop_at {
break;
}
let tx_list = ledger_tx_loader.load_ledger_tx_list(block_number, true)?;

while block_number <= stop_at {
let tx_list = ledger_tx_loader
.load_ledger_tx_list(block_number, true)
.await?;
let tx_list = tx_list.unwrap();
for mut ledger_tx in tx_list {
let tx_order = ledger_tx.sequence_info.tx_order;
if tx_order < expected_tx_order {
continue;
}
if tx_order == indexer.last_tx_order + 1 {
info!(
"begin to index block: {}, tx_order: {}",
block_number, tx_order
);
}
if tx_order != expected_tx_order {
return Err(anyhow!(
"tx_order not continuous, expect: {}, got: {}",
expected_tx_order,
tx_order
));
{
let db = indexer.db;
let mut wtxn = indexer.db_env.write_txn()?;
for mut ledger_tx in tx_list {
let tx_order = ledger_tx.sequence_info.tx_order;
if tx_order < expected_tx_order {
continue;
}
if tx_order == indexer.last_tx_order + 1 {
info!(
"begin to index block: {}, tx_order: {}",
block_number, tx_order
);
}
if tx_order != expected_tx_order {
return Err(anyhow!(
"tx_order not continuous, expect: {}, got: {}",
expected_tx_order,
tx_order
));
}
let tx_hash = ledger_tx.tx_hash();
let tx_position = TxPosition {
tx_order,
tx_hash,
block_number,
};
db.put(&mut wtxn, &tx_order, &tx_position)?;
expected_tx_order += 1;
}
let tx_hash = ledger_tx.tx_hash();
let tx_position = TxPosition {
tx_order,
tx_hash,
block_number,
};
db.put(&mut wtxn, &tx_order, &tx_position)?;
expected_tx_order += 1;
wtxn.commit()?;
}
block_number += 1;
done_block += 1;
if done_block % 1000 == 0 {
wtxn.commit()?;
wtxn = indexer.db_env.write_txn()?;
info!(
"done: block_cnt: {}; next_block_number: {}",
done_block, block_number
);
}
}
wtxn.commit()?;

indexer.init_cursor()?;
info!("indexer stats after job: {:?}", indexer.get_stats()?);
indexer.close()?;

Ok(())
}

pub async fn execute(self) -> RoochResult<()> {
self.exec_inner().await.map_err(RoochError::from)
}
}
Loading

0 comments on commit 2f1a0d4

Please sign in to comment.