Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rooch-da): refactor indexing logic in DA commands #3199

Merged
merged 13 commits into from
Jan 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
395 changes: 364 additions & 31 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ futures = "0.3.31"
futures-util = "0.3.31"
hdrhistogram = "7.5.4"
hex = "0.4.3"
heed = "0.21.0"
itertools = "0.13.0"
jsonrpsee = { version = "0.23.2", features = ["full"] }
jpst = "0.1.1"
Expand Down
1 change: 1 addition & 0 deletions crates/rooch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ codespan-reporting = { workspace = true }
termcolor = { workspace = true }
itertools = { workspace = true }
hdrhistogram = { workspace = true }
heed = { workspace = true }
hex = { workspace = true }
regex = { workspace = true }
parking_lot = { workspace = true }
Expand Down
72 changes: 22 additions & 50 deletions crates/rooch/src/commands/da/commands/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::commands::da::commands::{
build_rooch_db, LedgerTxGetter, SequencedTxStore, TxDAIndexer,
build_rooch_db, LedgerTxGetter, SequencedTxStore, TxMetaStore,
};
use anyhow::Context;
use bitcoin::hashes::Hash;
Expand Down Expand Up @@ -39,11 +39,7 @@ use rooch_types::transaction::{
L1BlockWithBody, LedgerTransaction, LedgerTxData, TransactionSequenceInfo,
};
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader, Read};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -65,15 +61,15 @@ pub struct ExecCommand {
#[clap(long = "segment-dir")]
pub segment_dir: PathBuf,
#[clap(
long = "order-state-path",
help = "Path to tx_order:state_root file(results from RoochNetwork), for fast verification avoiding blocking on RPC requests"
long = "tx-position",
help = "Path to tx_order:tx_hash:l2_block_number database directory"
)]
pub order_state_path: PathBuf,
pub tx_position_path: PathBuf,
#[clap(
long = "order-hash-path",
help = "Path to tx_order:tx_hash:l2_block_number file"
long = "exp-root",
help = "Path to tx_order:state_root:accumulator_root file(results from RoochNetwork), for fast verification avoiding blocking on RPC requests"
)]
pub order_hash_path: PathBuf,
pub exp_root_path: PathBuf,
#[clap(
long = "rollback",
help = "rollback to tx order. If not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order."
Expand Down Expand Up @@ -197,20 +193,18 @@ impl ExecCommand {
)
.await?;

let (order_state_pair, tx_order_end) = self.load_order_state_pair();
let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir.clone())?;
let tx_da_indexer = TxDAIndexer::load_from_file(
self.order_hash_path.clone(),
let tx_da_indexer = TxMetaStore::new(
self.tx_position_path.clone(),
self.exp_root_path.clone(),
moveos_store.transaction_store,
rooch_db.rooch_store.clone(),
)?;
Ok(ExecInner {
mode: self.mode,
force_align: self.force_align,
ledger_tx_getter: ledger_tx_loader,
tx_da_indexer,
order_state_pair,
tx_order_end,
tx_meta_store: tx_da_indexer,
sequenced_tx_store,
bitcoin_client_proxy,
executor,
Expand All @@ -221,36 +215,14 @@ impl ExecCommand {
rooch_db,
})
}

fn load_order_state_pair(&self) -> (HashMap<u64, (H256, H256)>, u64) {
let mut order_state_pair = HashMap::new();
let mut tx_order_end = 0;

let mut reader = BufReader::new(File::open(self.order_state_path.clone()).unwrap());
// collect all `tx_order:state_root` pairs
for line in reader.by_ref().lines() {
let line = line.unwrap();
let parts: Vec<&str> = line.split(':').collect();
let tx_order = parts[0].parse::<u64>().unwrap();
let state_root = H256::from_str(parts[1]).unwrap();
let accumulator_root = H256::from_str(parts[2]).unwrap();
order_state_pair.insert(tx_order, (state_root, accumulator_root));
if tx_order > tx_order_end {
tx_order_end = tx_order;
}
}
(order_state_pair, tx_order_end)
}
}

struct ExecInner {
mode: ExecMode,
force_align: bool,

ledger_tx_getter: LedgerTxGetter,
tx_da_indexer: TxDAIndexer,
order_state_pair: HashMap<u64, (H256, H256)>,
tx_order_end: u64,
tx_meta_store: TxMetaStore,

sequenced_tx_store: SequencedTxStore,

Expand Down Expand Up @@ -389,8 +361,8 @@ impl ExecInner {
}

async fn produce_tx(&self, tx: Sender<ExecMsg>) -> anyhow::Result<()> {
let last_executed_opt = self.tx_da_indexer.find_last_executed()?;
let last_executed_tx_order = match last_executed_opt.clone() {
let last_executed_opt = self.tx_meta_store.find_last_executed()?;
let last_executed_tx_order = match last_executed_opt {
Some(v) => v.tx_order,
None => 0,
};
Expand Down Expand Up @@ -427,8 +399,8 @@ impl ExecInner {
if let Some(rollback) = rollback_to {
if rollback < last_partial_executed_tx_order {
let new_last_and_rollback = self
.tx_da_indexer
.slice(rollback, last_partial_executed_tx_order)?;
.tx_meta_store
.get_tx_positions_in_range(rollback, last_partial_executed_tx_order)?;
// split into two parts, the first get execution info for new startup, all others rollback
let (new_last, rollback_part) = new_last_and_rollback.split_first().unwrap();
info!(
Expand All @@ -448,9 +420,9 @@ impl ExecInner {
})?;
}
let rollback_execution_info =
self.tx_da_indexer.get_execution_info(new_last.tx_hash)?;
self.tx_meta_store.get_execution_info(new_last.tx_hash)?;
let rollback_sequencer_info =
self.tx_da_indexer.get_sequencer_info(new_last.tx_hash)?;
self.tx_meta_store.get_sequencer_info(new_last.tx_hash)?;
self.update_startup_info_after_rollback(
rollback_execution_info,
rollback_sequencer_info,
Expand All @@ -461,20 +433,20 @@ impl ExecInner {
};

let mut next_block_number = last_executed_opt
.clone()
.map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block
.unwrap_or(0);

if !self.mode.need_exec() {
next_tx_order = last_sequenced_tx + 1;
next_block_number = self.tx_da_indexer.find_tx_block(next_tx_order).unwrap();
next_block_number = self.tx_meta_store.find_tx_block(next_tx_order).unwrap();
}
info!(
"Start to produce transactions from tx_order: {}, check from block: {}",
next_tx_order, next_block_number,
);
let mut produced_tx_order = 0;
let mut reach_end = false;
let max_verified_tx_order = self.tx_meta_store.get_max_verified_tx_order();
loop {
if reach_end {
break;
Expand All @@ -489,7 +461,7 @@ impl ExecInner {
let tx_list = tx_list.unwrap();
for ledger_tx in tx_list {
let tx_order = ledger_tx.sequence_info.tx_order;
if tx_order > self.tx_order_end {
if tx_order > max_verified_tx_order {
reach_end = true;
break;
}
Expand Down Expand Up @@ -583,7 +555,7 @@ impl ExecInner {

let is_l2_tx = ledger_tx.data.is_l2_tx();

let exp_root_opt = self.order_state_pair.get(&tx_order);
let exp_root_opt = self.tx_meta_store.get_exp_roots(tx_order);
let exp_state_root = exp_root_opt.map(|v| v.0);
let exp_accumulator_root = exp_root_opt.map(|v| v.1);

Expand Down
110 changes: 82 additions & 28 deletions crates/rooch/src/commands/da/commands/index.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,113 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::commands::da::commands::{LedgerTxGetter, TxDAIndex};
use rooch_types::error::{RoochError, RoochResult};
use std::fs::File;
use std::io::BufWriter;
use std::io::Write;
use crate::commands::da::commands::{LedgerTxGetter, TxPosition, TxPositionIndexer};
use anyhow::anyhow;
use std::cmp::max;
use std::path::PathBuf;
use tracing::info;

/// Index tx_order:tx_hash:block_number to a file from segments
/// Index tx_order:tx_hash:block_number
#[derive(Debug, clap::Parser)]
pub struct IndexCommand {
#[clap(long = "segment-dir", short = 's')]
pub segment_dir: PathBuf,
pub segment_dir: Option<PathBuf>,
#[clap(long = "index", short = 'i')]
pub index_path: PathBuf,
#[clap(
long = "reset-from",
help = "Reset from tx order(inclusive), all tx orders after this will be re-indexed"
)]
pub reset_from: Option<u64>,
#[clap(long = "max-block-number", help = "Max block number to index")]
pub max_block_number: Option<u128>,
#[clap(long = "file", help = "Load/dump file-based index")]
pub index_file_path: Option<PathBuf>,
#[clap(long = "dump", help = "Dump index to file")]
pub dump: bool,
}

impl IndexCommand {
pub fn execute(self) -> RoochResult<()> {
let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir)?;
let mut block_number = ledger_tx_loader.get_min_chunk_id();
let mut expected_tx_order = 0;
let file = File::create(self.index_path.clone())?;
let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone().unwrap());
pub fn execute(self) -> anyhow::Result<()> {
if self.index_file_path.is_some() {
return TxPositionIndexer::load_or_dump(
self.index_path,
self.index_file_path.unwrap(),
self.dump,
);
}

let db_path = self.index_path.clone();
let reset_from = self.reset_from;
let mut indexer = TxPositionIndexer::new(db_path, reset_from)?;

info!("indexer stats after reset: {:?}", indexer.get_stats()?);

let segment_dir = self.segment_dir.ok_or(anyhow!("segment-dir is required"))?;

let ledger_tx_loader = LedgerTxGetter::new(segment_dir)?;
let mut block_number = indexer.last_block_number; // avoiding partial indexing
let mut expected_tx_order = indexer.last_tx_order + 1;
let stop_at = if let Some(max_block_number) = self.max_block_number {
max(max_block_number, ledger_tx_loader.get_max_chunk_id())
} else {
ledger_tx_loader.get_max_chunk_id()
};

let db = indexer.db;
let mut wtxn = indexer.db_env.write_txn()?;

let mut done_block = 0;
loop {
if block_number > ledger_tx_loader.get_max_chunk_id() {
if block_number > stop_at {
break;
}
let tx_list = ledger_tx_loader.load_ledger_tx_list(block_number, true)?;
let tx_list = tx_list.unwrap();
for mut ledger_tx in tx_list {
let tx_order = ledger_tx.sequence_info.tx_order;
let tx_hash = ledger_tx.tx_hash();
if expected_tx_order == 0 {
expected_tx_order = tx_order;
} else if tx_order != expected_tx_order {
return Err(RoochError::from(anyhow::anyhow!(
"tx_order mismatch: expected {}, got {}",
if tx_order < expected_tx_order {
continue;
}
if tx_order == indexer.last_tx_order + 1 {
info!(
"begin to index block: {}, tx_order: {}",
block_number, tx_order
);
}
if tx_order != expected_tx_order {
return Err(anyhow!(
"tx_order not continuous, expect: {}, got: {}",
expected_tx_order,
tx_order
)));
));
}
writeln!(
writer,
"{}",
TxDAIndex::new(tx_order, tx_hash, block_number)
)?;
let tx_hash = ledger_tx.tx_hash();
let tx_position = TxPosition {
tx_order,
tx_hash,
block_number,
};
db.put(&mut wtxn, &tx_order, &tx_position)?;
expected_tx_order += 1;
}
block_number += 1;
done_block += 1;
if done_block % 1000 == 0 {
wtxn.commit()?;
wtxn = indexer.db_env.write_txn()?;
info!(
"done: block_cnt: {}; next_block_number: {}",
done_block, block_number
);
}
}
writer.flush()?;
file.sync_data()?;
wtxn.commit()?;

indexer.init_cursor()?;
info!("indexer stats after job: {:?}", indexer.get_stats()?);
indexer.close()?;

Ok(())
}
}
Loading
Loading