Skip to content

Commit

Permalink
refactor(rooch-da): refactor indexing logic in DA commands (#3199)
Browse files Browse the repository at this point in the history
Refactored data indexing and management, improving modularity and simplifying the DA execution flow.
  • Loading branch information
popcnt1 authored Jan 19, 2025
1 parent 493cc8a commit 2b1ff0b
Show file tree
Hide file tree
Showing 13 changed files with 871 additions and 375 deletions.
395 changes: 364 additions & 31 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ futures = "0.3.31"
futures-util = "0.3.31"
hdrhistogram = "7.5.4"
hex = "0.4.3"
heed = "0.21.0"
itertools = "0.13.0"
jsonrpsee = { version = "0.23.2", features = ["full"] }
jpst = "0.1.1"
Expand Down
1 change: 1 addition & 0 deletions crates/rooch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ codespan-reporting = { workspace = true }
termcolor = { workspace = true }
itertools = { workspace = true }
hdrhistogram = { workspace = true }
heed = { workspace = true }
hex = { workspace = true }
regex = { workspace = true }
parking_lot = { workspace = true }
Expand Down
72 changes: 22 additions & 50 deletions crates/rooch/src/commands/da/commands/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::commands::da::commands::{
build_rooch_db, LedgerTxGetter, SequencedTxStore, TxDAIndexer,
build_rooch_db, LedgerTxGetter, SequencedTxStore, TxMetaStore,
};
use anyhow::Context;
use bitcoin::hashes::Hash;
Expand Down Expand Up @@ -39,11 +39,7 @@ use rooch_types::transaction::{
L1BlockWithBody, LedgerTransaction, LedgerTxData, TransactionSequenceInfo,
};
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader, Read};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -65,15 +61,15 @@ pub struct ExecCommand {
#[clap(long = "segment-dir")]
pub segment_dir: PathBuf,
#[clap(
long = "order-state-path",
help = "Path to tx_order:state_root file(results from RoochNetwork), for fast verification avoiding blocking on RPC requests"
long = "tx-position",
help = "Path to tx_order:tx_hash:l2_block_number database directory"
)]
pub order_state_path: PathBuf,
pub tx_position_path: PathBuf,
#[clap(
long = "order-hash-path",
help = "Path to tx_order:tx_hash:l2_block_number file"
long = "exp-root",
help = "Path to tx_order:state_root:accumulator_root file(results from RoochNetwork), for fast verification avoiding blocking on RPC requests"
)]
pub order_hash_path: PathBuf,
pub exp_root_path: PathBuf,
#[clap(
long = "rollback",
help = "rollback to tx order. If not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order."
Expand Down Expand Up @@ -197,20 +193,18 @@ impl ExecCommand {
)
.await?;

let (order_state_pair, tx_order_end) = self.load_order_state_pair();
let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir.clone())?;
let tx_da_indexer = TxDAIndexer::load_from_file(
self.order_hash_path.clone(),
let tx_da_indexer = TxMetaStore::new(
self.tx_position_path.clone(),
self.exp_root_path.clone(),
moveos_store.transaction_store,
rooch_db.rooch_store.clone(),
)?;
Ok(ExecInner {
mode: self.mode,
force_align: self.force_align,
ledger_tx_getter: ledger_tx_loader,
tx_da_indexer,
order_state_pair,
tx_order_end,
tx_meta_store: tx_da_indexer,
sequenced_tx_store,
bitcoin_client_proxy,
executor,
Expand All @@ -221,36 +215,14 @@ impl ExecCommand {
rooch_db,
})
}

fn load_order_state_pair(&self) -> (HashMap<u64, (H256, H256)>, u64) {
let mut order_state_pair = HashMap::new();
let mut tx_order_end = 0;

let mut reader = BufReader::new(File::open(self.order_state_path.clone()).unwrap());
// collect all `tx_order:state_root` pairs
for line in reader.by_ref().lines() {
let line = line.unwrap();
let parts: Vec<&str> = line.split(':').collect();
let tx_order = parts[0].parse::<u64>().unwrap();
let state_root = H256::from_str(parts[1]).unwrap();
let accumulator_root = H256::from_str(parts[2]).unwrap();
order_state_pair.insert(tx_order, (state_root, accumulator_root));
if tx_order > tx_order_end {
tx_order_end = tx_order;
}
}
(order_state_pair, tx_order_end)
}
}

struct ExecInner {
mode: ExecMode,
force_align: bool,

ledger_tx_getter: LedgerTxGetter,
tx_da_indexer: TxDAIndexer,
order_state_pair: HashMap<u64, (H256, H256)>,
tx_order_end: u64,
tx_meta_store: TxMetaStore,

sequenced_tx_store: SequencedTxStore,

Expand Down Expand Up @@ -389,8 +361,8 @@ impl ExecInner {
}

async fn produce_tx(&self, tx: Sender<ExecMsg>) -> anyhow::Result<()> {
let last_executed_opt = self.tx_da_indexer.find_last_executed()?;
let last_executed_tx_order = match last_executed_opt.clone() {
let last_executed_opt = self.tx_meta_store.find_last_executed()?;
let last_executed_tx_order = match last_executed_opt {
Some(v) => v.tx_order,
None => 0,
};
Expand Down Expand Up @@ -427,8 +399,8 @@ impl ExecInner {
if let Some(rollback) = rollback_to {
if rollback < last_partial_executed_tx_order {
let new_last_and_rollback = self
.tx_da_indexer
.slice(rollback, last_partial_executed_tx_order)?;
.tx_meta_store
.get_tx_positions_in_range(rollback, last_partial_executed_tx_order)?;
// split into two parts, the first get execution info for new startup, all others rollback
let (new_last, rollback_part) = new_last_and_rollback.split_first().unwrap();
info!(
Expand All @@ -448,9 +420,9 @@ impl ExecInner {
})?;
}
let rollback_execution_info =
self.tx_da_indexer.get_execution_info(new_last.tx_hash)?;
self.tx_meta_store.get_execution_info(new_last.tx_hash)?;
let rollback_sequencer_info =
self.tx_da_indexer.get_sequencer_info(new_last.tx_hash)?;
self.tx_meta_store.get_sequencer_info(new_last.tx_hash)?;
self.update_startup_info_after_rollback(
rollback_execution_info,
rollback_sequencer_info,
Expand All @@ -461,20 +433,20 @@ impl ExecInner {
};

let mut next_block_number = last_executed_opt
.clone()
.map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block
.unwrap_or(0);

if !self.mode.need_exec() {
next_tx_order = last_sequenced_tx + 1;
next_block_number = self.tx_da_indexer.find_tx_block(next_tx_order).unwrap();
next_block_number = self.tx_meta_store.find_tx_block(next_tx_order).unwrap();
}
info!(
"Start to produce transactions from tx_order: {}, check from block: {}",
next_tx_order, next_block_number,
);
let mut produced_tx_order = 0;
let mut reach_end = false;
let max_verified_tx_order = self.tx_meta_store.get_max_verified_tx_order();
loop {
if reach_end {
break;
Expand All @@ -489,7 +461,7 @@ impl ExecInner {
let tx_list = tx_list.unwrap();
for ledger_tx in tx_list {
let tx_order = ledger_tx.sequence_info.tx_order;
if tx_order > self.tx_order_end {
if tx_order > max_verified_tx_order {
reach_end = true;
break;
}
Expand Down Expand Up @@ -583,7 +555,7 @@ impl ExecInner {

let is_l2_tx = ledger_tx.data.is_l2_tx();

let exp_root_opt = self.order_state_pair.get(&tx_order);
let exp_root_opt = self.tx_meta_store.get_exp_roots(tx_order);
let exp_state_root = exp_root_opt.map(|v| v.0);
let exp_accumulator_root = exp_root_opt.map(|v| v.1);

Expand Down
110 changes: 82 additions & 28 deletions crates/rooch/src/commands/da/commands/index.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,113 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::commands::da::commands::{LedgerTxGetter, TxDAIndex};
use rooch_types::error::{RoochError, RoochResult};
use std::fs::File;
use std::io::BufWriter;
use std::io::Write;
use crate::commands::da::commands::{LedgerTxGetter, TxPosition, TxPositionIndexer};
use anyhow::anyhow;
use std::cmp::max;
use std::path::PathBuf;
use tracing::info;

/// Index tx_order:tx_hash:block_number to a file from segments
/// Index tx_order:tx_hash:block_number
#[derive(Debug, clap::Parser)]
pub struct IndexCommand {
#[clap(long = "segment-dir", short = 's')]
pub segment_dir: PathBuf,
pub segment_dir: Option<PathBuf>,
#[clap(long = "index", short = 'i')]
pub index_path: PathBuf,
#[clap(
long = "reset-from",
help = "Reset from tx order(inclusive), all tx orders after this will be re-indexed"
)]
pub reset_from: Option<u64>,
#[clap(long = "max-block-number", help = "Max block number to index")]
pub max_block_number: Option<u128>,
#[clap(long = "file", help = "Load/dump file-based index")]
pub index_file_path: Option<PathBuf>,
#[clap(long = "dump", help = "Dump index to file")]
pub dump: bool,
}

impl IndexCommand {
pub fn execute(self) -> RoochResult<()> {
let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir)?;
let mut block_number = ledger_tx_loader.get_min_chunk_id();
let mut expected_tx_order = 0;
let file = File::create(self.index_path.clone())?;
let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone().unwrap());
pub fn execute(self) -> anyhow::Result<()> {
if self.index_file_path.is_some() {
return TxPositionIndexer::load_or_dump(
self.index_path,
self.index_file_path.unwrap(),
self.dump,
);
}

let db_path = self.index_path.clone();
let reset_from = self.reset_from;
let mut indexer = TxPositionIndexer::new(db_path, reset_from)?;

info!("indexer stats after reset: {:?}", indexer.get_stats()?);

let segment_dir = self.segment_dir.ok_or(anyhow!("segment-dir is required"))?;

let ledger_tx_loader = LedgerTxGetter::new(segment_dir)?;
let mut block_number = indexer.last_block_number; // avoiding partial indexing
let mut expected_tx_order = indexer.last_tx_order + 1;
let stop_at = if let Some(max_block_number) = self.max_block_number {
max(max_block_number, ledger_tx_loader.get_max_chunk_id())
} else {
ledger_tx_loader.get_max_chunk_id()
};

let db = indexer.db;
let mut wtxn = indexer.db_env.write_txn()?;

let mut done_block = 0;
loop {
if block_number > ledger_tx_loader.get_max_chunk_id() {
if block_number > stop_at {
break;
}
let tx_list = ledger_tx_loader.load_ledger_tx_list(block_number, true)?;
let tx_list = tx_list.unwrap();
for mut ledger_tx in tx_list {
let tx_order = ledger_tx.sequence_info.tx_order;
let tx_hash = ledger_tx.tx_hash();
if expected_tx_order == 0 {
expected_tx_order = tx_order;
} else if tx_order != expected_tx_order {
return Err(RoochError::from(anyhow::anyhow!(
"tx_order mismatch: expected {}, got {}",
if tx_order < expected_tx_order {
continue;
}
if tx_order == indexer.last_tx_order + 1 {
info!(
"begin to index block: {}, tx_order: {}",
block_number, tx_order
);
}
if tx_order != expected_tx_order {
return Err(anyhow!(
"tx_order not continuous, expect: {}, got: {}",
expected_tx_order,
tx_order
)));
));
}
writeln!(
writer,
"{}",
TxDAIndex::new(tx_order, tx_hash, block_number)
)?;
let tx_hash = ledger_tx.tx_hash();
let tx_position = TxPosition {
tx_order,
tx_hash,
block_number,
};
db.put(&mut wtxn, &tx_order, &tx_position)?;
expected_tx_order += 1;
}
block_number += 1;
done_block += 1;
if done_block % 1000 == 0 {
wtxn.commit()?;
wtxn = indexer.db_env.write_txn()?;
info!(
"done: block_cnt: {}; next_block_number: {}",
done_block, block_number
);
}
}
writer.flush()?;
file.sync_data()?;
wtxn.commit()?;

indexer.init_cursor()?;
info!("indexer stats after job: {:?}", indexer.get_stats()?);
indexer.close()?;

Ok(())
}
}
Loading

0 comments on commit 2b1ff0b

Please sign in to comment.