Skip to content

Commit

Permalink
feat(rooch-da): enhance execution and rollback logic in DA (#3177)
Browse files Browse the repository at this point in the history
Refactored TxDAIndexer to include RoochStore for advanced transaction sequencing. Improved the rollback process with enhanced handling of startup and sequencer information. Updated execution modes for better clarity and refined RocksDB cache size parsing.
```
popcnt1 authored Jan 12, 2025
1 parent 227f243 commit 17cfd78
Showing 7 changed files with 207 additions and 70 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/rooch-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -13,3 +13,4 @@ rust-version = { workspace = true }

[dependencies]
libc = { workspace = true }
anyhow = { workspace = true }
81 changes: 75 additions & 6 deletions crates/rooch-common/src/utils/humanize.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use anyhow::bail;

const LOWER_BYTES_UNITS: [&str; 7] = ["b", "k", "m", "g", "t", "p", "e"];

pub fn human_readable_bytes(bytes: u64) -> String {
@@ -15,6 +17,51 @@ pub fn human_readable_bytes(bytes: u64) -> String {
format!("{:.2}{}", v, LOWER_BYTES_UNITS[unit_index])
}

/// Parses a string like "1k", "10M", etc., into the equivalent value in bytes (u64).
pub fn parse_bytes(input: &str) -> anyhow::Result<u64> {
if input.is_empty() {
bail!("Input is empty");
}

let chars = input.chars();
let mut value_str = String::new();
let mut unit = None;

for c in chars {
if c.is_ascii_digit() || c == '.' {
value_str.push(c);
} else {
unit = Some(c.to_lowercase().to_string());
break;
}
}

// Parse the numeric value
let value: f64 = value_str
.parse()
.map_err(|_| anyhow::anyhow!("Invalid number format"))?;

// Match the unit
let multiplier = if let Some(unit) = unit {
match LOWER_BYTES_UNITS.iter().position(|&u| u == unit.as_str()) {
Some(index) => 1024u64.saturating_pow(index as u32),
None => bail!("Unrecognized unit: {}", unit),
}
} else {
1u64 // Default to bytes
};

// Compute the total value in bytes
let result = value * multiplier as f64;

// Ensure it's within the range of u64
if result > u64::MAX as f64 {
bail!("Value overflowed u64");
}

Ok(result as u64)
}

#[cfg(test)]
mod tests {
use super::*;
@@ -23,16 +70,38 @@ mod tests {
fn test_human_readable_bytes() {
let test_cases = [
(0, "0.00b"),
(1024, "1.00k"),
(1024 * 1024, "1.00m"),
(1024 * 1024 * 1024, "1.00g"),
(1024_u64 * 1024 * 1024 * 1024, "1.00t"),
(1024_u64 * 1024 * 1024 * 1024 * 1024, "1.00p"),
(1024_u64 * 1024 * 1024 * 1024 * 1024 * 1024, "1.00e"),
(1 << 10, "1.00k"),
(1 << 20, "1.00m"),
(1 << 30, "1.00g"),
(1 << 40, "1.00t"),
(1 << 50, "1.00p"),
(1 << 60, "1.00e"),
];

for (bytes, expected) in test_cases.iter() {
assert_eq!(human_readable_bytes(*bytes), *expected);
}
}

#[test]
fn test_parse_bytes() {
let test_cases = [
("0", 0),
("1", 1),
("1.5", 1),
("1.5K", (1 << 10) * 3 / 2),
("1.4K", 1433),
("1.5M", (1 << 20) * 3 / 2),
("1.5G", (1 << 30) * 3 / 2),
("1.5T", (1 << 40) * 3 / 2),
("1.5P", (1 << 50) * 3 / 2),
("1.5E", (1 << 60) * 3 / 2),
("1.5k", 1536),
("1.512k", 1548),
];

for (input, expected) in test_cases.iter() {
assert_eq!(parse_bytes(input).unwrap(), *expected);
}
}
}
171 changes: 111 additions & 60 deletions crates/rooch/src/commands/da/commands/exec.rs
Original file line number Diff line number Diff line change
@@ -21,18 +21,24 @@ use moveos_types::startup_info;
use moveos_types::transaction::{TransactionExecutionInfo, VerifiedMoveOSTransaction};
use raw_store::rocks::batch::WriteBatch;
use raw_store::traits::DBStore;
use rooch_common::humanize::parse_bytes;
use rooch_config::R_OPT_NET_HELP;
use rooch_db::RoochDB;
use rooch_event::actor::EventActor;
use rooch_executor::actor::executor::ExecutorActor;
use rooch_executor::actor::reader_executor::ReaderExecutorActor;
use rooch_executor::proxy::ExecutorProxy;
use rooch_pipeline_processor::actor::processor::is_vm_panic_error;
use rooch_store::meta_store::SEQUENCER_INFO_KEY;
use rooch_store::META_SEQUENCER_INFO_COLUMN_FAMILY_NAME;
use rooch_types::bitcoin::types::Block as BitcoinBlock;
use rooch_types::error::RoochResult;
use rooch_types::rooch_network::RoochChainID;
use rooch_types::transaction::{L1BlockWithBody, LedgerTransaction, LedgerTxData};
use std::cmp::min;
use rooch_types::sequencer::SequencerInfo;
use rooch_types::transaction::{
L1BlockWithBody, LedgerTransaction, LedgerTxData, TransactionSequenceInfo,
};
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader, Read};
@@ -52,7 +58,8 @@ use tracing::info;
pub struct ExecCommand {
#[clap(
long = "mode",
help = "execute mode: exec, seq, full. exec: only execute transactions, no update sequence related data; full: execute transactions and update sequence related data"
default_value = "all",
help = "Execution mode: exec, seq, all. Default is all"
)]
pub mode: ExecMode,
#[clap(long = "segment-dir")]
@@ -90,14 +97,18 @@ pub struct ExecCommand {
#[clap(long = "btc-local-block-store-dir")]
pub btc_local_block_store_dir: Option<PathBuf>,

#[clap(name = "rocksdb-row-cache-size", long, help = "rocksdb row cache size")]
pub row_cache_size: Option<u64>,
#[clap(
name = "rocksdb-row-cache-size",
long,
help = "rocksdb row cache size, default 128M"
)]
pub row_cache_size: Option<String>,
#[clap(
name = "rocksdb-block-cache-size",
long,
help = "rocksdb block cache size"
help = "rocksdb block cache size, default 4G"
)]
pub block_cache_size: Option<u64>,
pub block_cache_size: Option<String>,
#[clap(long = "enable-rocks-stats", help = "rocksdb-enable-statistics")]
pub enable_rocks_stats: bool,
}
@@ -106,35 +117,35 @@ pub struct ExecCommand {
pub enum ExecMode {
Exec, // Only execute transactions, no sequence updates
Seq, // Only update sequence data, no execution
Both, // Execute transactions and update sequence data
All, // Execute transactions and update sequence data
}

impl ExecMode {
pub fn as_bits(&self) -> u8 {
match self {
ExecMode::Exec => 0b10, // Execute
ExecMode::Seq => 0b01, // Sequence
ExecMode::Both => 0b11, // Both
ExecMode::All => 0b11, // All
}
}

pub fn is_exec(&self) -> bool {
pub fn need_exec(&self) -> bool {
self.as_bits() & 0b10 != 0
}

pub fn is_seq(&self) -> bool {
pub fn need_seq(&self) -> bool {
self.as_bits() & 0b01 != 0
}

pub fn is_both(&self) -> bool {
pub fn need_all(&self) -> bool {
self.as_bits() == 0b11
}

pub fn get_verify_targets(&self) -> String {
match self {
ExecMode::Exec => "state root",
ExecMode::Seq => "accumulator root",
ExecMode::Both => "state+accumulator root",
ExecMode::All => "state+accumulator root",
}
.to_string()
}
@@ -150,13 +161,22 @@ impl ExecCommand {
async fn build_exec_inner(&self) -> anyhow::Result<ExecInner> {
let actor_system = ActorSystem::global_system();

let row_cache_size = self
.row_cache_size
.clone()
.and_then(|v| parse_bytes(&v).ok());
let block_cache_size = self
.block_cache_size
.clone()
.and_then(|v| parse_bytes(&v).ok());

let (executor, moveos_store, rooch_db) = build_executor_and_store(
self.base_data_dir.clone(),
self.chain_id.clone(),
&actor_system,
self.enable_rocks_stats,
self.row_cache_size,
self.block_cache_size,
row_cache_size,
block_cache_size,
)
.await?;

@@ -176,6 +196,7 @@ impl ExecCommand {
let tx_da_indexer = TxDAIndexer::load_from_file(
self.order_hash_path.clone(),
moveos_store.transaction_store,
rooch_db.rooch_store.clone(),
)?;
Ok(ExecInner {
mode: self.mode,
@@ -318,67 +339,83 @@ impl ExecInner {

fn update_startup_info_after_rollback(
&self,
execution_info: TransactionExecutionInfo,
execution_info: Option<TransactionExecutionInfo>,
sequencer_info: Option<TransactionSequenceInfo>,
) -> anyhow::Result<()> {
let rollback_startup_info =
startup_info::StartupInfo::new(execution_info.state_root, execution_info.size);
let rollback_sequencer_info = if let Some(sequencer_info) = sequencer_info {
Some(SequencerInfo::new(
sequencer_info.tx_order,
sequencer_info.tx_accumulator_info(),
))
} else {
None
};
let rollback_startup_info = if let Some(execution_info) = execution_info {
Some(startup_info::StartupInfo::new(
execution_info.state_root,
execution_info.size,
))
} else {
None
};

let inner_store = &self.rooch_db.rooch_store.store_instance;
let mut write_batch = WriteBatch::new();
let cf_names = vec![CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME];

write_batch.put(
to_bytes(STARTUP_INFO_KEY).unwrap(),
to_bytes(&rollback_startup_info).unwrap(),
)?;
let mut cf_names = Vec::new();
if let Some(rollback_sequencer_info) = rollback_sequencer_info {
cf_names.push(META_SEQUENCER_INFO_COLUMN_FAMILY_NAME);
write_batch.put(
to_bytes(SEQUENCER_INFO_KEY).unwrap(),
to_bytes(&rollback_sequencer_info).unwrap(),
)?;
}
if let Some(rollback_startup_info) = rollback_startup_info {
cf_names.push(CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME);
write_batch.put(
to_bytes(STARTUP_INFO_KEY).unwrap(),
to_bytes(&rollback_startup_info).unwrap(),
)?;
}

inner_store.write_batch_across_cfs(cf_names, write_batch, true)
}

async fn produce_tx(&self, tx: Sender<ExecMsg>) -> anyhow::Result<()> {
let last_executed_opt = self.tx_da_indexer.find_last_executed()?;
let last_executed_tx_order = match last_executed_opt.clone() {
Some(v) => v.tx_order,
None => 0,
};
let mut next_tx_order = last_executed_tx_order + 1;

let last_sequenced_tx = self.sequenced_tx_store.get_last_tx_order();
let mut next_tx_order = last_executed_opt
.clone()
.map(|v| v.tx_order + 1)
.unwrap_or(1);
let next_sequence_tx = last_sequenced_tx + 1;

if self.mode.is_both() && next_tx_order != last_sequenced_tx + 1 {
let last_executed_tx_order = match last_executed_opt {
Some(v) => v.tx_order,
None => 0,
};
let last_full_executed_tx_order = min(last_sequenced_tx, last_executed_tx_order);
let last_partial_executed_tx_order = max(last_sequenced_tx, last_executed_tx_order);

let mut rollback_to = self.rollback;
if self.mode.need_all() && next_tx_order != next_sequence_tx {
info! {
"Last executed tx order: {}, last sequenced tx order: {}, need rollback to tx order: {}",
last_executed_tx_order,
last_sequenced_tx,
min(last_sequenced_tx, last_executed_tx_order)
last_full_executed_tx_order
};
return Ok(());
}

let mut next_block_number = last_executed_opt
.clone()
.map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block
.unwrap_or(0);

if !self.mode.is_exec() {
next_tx_order = last_sequenced_tx + 1;
next_block_number = self.tx_da_indexer.find_tx_block(next_tx_order).unwrap();
if rollback_to.is_none() {
rollback_to = Some(last_full_executed_tx_order);
} else {
rollback_to = Some(min(rollback_to.unwrap(), last_full_executed_tx_order));
}
}

info!(
"next_tx_order: {:?}. need rollback soon: {:?}",
next_tx_order,
self.rollback.is_some()
);

// If rollback not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order
if let (Some(rollback), Some(last_executed)) = (self.rollback, last_executed_opt.clone()) {
let last_executed_tx_order = last_executed.tx_order;
if rollback < last_executed_tx_order {
let new_last_and_rollback =
self.tx_da_indexer.slice(rollback, last_executed_tx_order)?;
// If rollback not set or ge `last_partial_executed_tx_order`: nothing to do;
// otherwise, rollback to this order
if let Some(rollback) = rollback_to {
if rollback < last_partial_executed_tx_order {
let new_last_and_rollback = self
.tx_da_indexer
.slice(rollback, last_partial_executed_tx_order)?;
// split into two parts, the first get execution info for new startup, all others rollback
let (new_last, rollback_part) = new_last_and_rollback.split_first().unwrap();
info!(
@@ -399,12 +436,26 @@ impl ExecInner {
}
let rollback_execution_info =
self.tx_da_indexer.get_execution_info(new_last.tx_hash)?;
self.update_startup_info_after_rollback(rollback_execution_info.unwrap())?;
let rollback_sequencer_info =
self.tx_da_indexer.get_sequencer_info(new_last.tx_hash)?;
self.update_startup_info_after_rollback(
rollback_execution_info,
rollback_sequencer_info,
)?;
info!("Rollback transactions done. Please RESTART process without rollback.");
return Ok(()); // rollback done, need to restart to get new state_root for startup rooch store
}
};

let mut next_block_number = last_executed_opt
.clone()
.map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block
.unwrap_or(0);

if !self.mode.need_exec() {
next_tx_order = last_sequenced_tx + 1;
next_block_number = self.tx_da_indexer.find_tx_block(next_tx_order).unwrap();
}
info!(
"Start to produce transactions from tx_order: {}, check from block: {}",
next_tx_order, next_block_number,
@@ -523,12 +574,12 @@ impl ExecInner {
let exp_state_root = exp_root_opt.map(|v| v.0);
let exp_accumulator_root = exp_root_opt.map(|v| v.1);

if self.mode.is_seq() {
if self.mode.need_seq() {
self.sequenced_tx_store
.store_tx(ledger_tx.clone(), exp_accumulator_root)?;
}

if self.mode.is_exec() {
if self.mode.need_exec() {
let moveos_tx = self
.validate_ledger_transaction(ledger_tx, l1_block_with_body)
.await?;
21 changes: 17 additions & 4 deletions crates/rooch/src/commands/da/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ use rooch_types::da::chunk::chunk_from_segments;
use rooch_types::da::segment::{segment_from_bytes, SegmentID};
use rooch_types::rooch_network::RoochChainID;
use rooch_types::sequencer::SequencerInfo;
use rooch_types::transaction::LedgerTransaction;
use rooch_types::transaction::{LedgerTransaction, TransactionSequenceInfo};
use std::collections::HashMap;
use std::fs;
use std::fs::File;
@@ -299,12 +299,14 @@ impl std::str::FromStr for TxDAIndex {
pub struct TxDAIndexer {
tx_order_hash_blocks: Vec<TxDAIndex>,
transaction_store: TransactionDBStore,
rooch_store: RoochStore,
}

impl TxDAIndexer {
pub fn load_from_file(
file_path: PathBuf,
transaction_store: TransactionDBStore,
rooch_store: RoochStore,
) -> anyhow::Result<Self> {
let mut tx_order_hashes = Vec::with_capacity(70000000);
let mut reader = BufReader::new(File::open(file_path)?);
@@ -314,13 +316,14 @@ impl TxDAIndexer {
tx_order_hashes.push(item);
}
tx_order_hashes.sort_by(|a, b| a.tx_order.cmp(&b.tx_order)); // avoiding wrong order
tracing::info!(
info!(
"tx_order:tx_hash:block indexer loaded, tx cnt: {}",
tx_order_hashes.len()
);
Ok(TxDAIndexer {
tx_order_hash_blocks: tx_order_hashes,
transaction_store,
rooch_store,
})
}

@@ -383,8 +386,18 @@ impl TxDAIndexer {
&self,
tx_hash: H256,
) -> anyhow::Result<Option<TransactionExecutionInfo>> {
let execution_info = self.transaction_store.get_tx_execution_info(tx_hash)?;
Ok(execution_info)
self.transaction_store.get_tx_execution_info(tx_hash)
}

pub fn get_sequencer_info(
&self,
tx_hash: H256,
) -> anyhow::Result<Option<TransactionSequenceInfo>> {
Ok(self
.rooch_store
.transaction_store
.get_transaction_by_hash(tx_hash)?
.map(|transaction| transaction.sequence_info))
}

pub fn get_execution_info_by_order(
1 change: 1 addition & 0 deletions crates/rooch/src/commands/db/commands/dump_tx_root.rs
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@ impl DumpTxRootCommand {
let tx_da_indexer = TxDAIndexer::load_from_file(
self.order_hash_path.clone(),
moveos_store.transaction_store,
rooch_db.rooch_store.clone(),
)?;

let file = File::create(self.output.clone())?;
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ impl GetExecutionInfoByOrderCommand {
let tx_da_indexer = TxDAIndexer::load_from_file(
self.order_hash_path.clone(),
moveos_store.transaction_store,
rooch_db.rooch_store.clone(),
)?;

let tx_order = self.order;

0 comments on commit 17cfd78

Please sign in to comment.