Skip to content

Commit

Permalink
remove filed state for rebuild indexer cli (#1878)
Browse files Browse the repository at this point in the history
  • Loading branch information
baichuan3 authored Jun 13, 2024
1 parent 8974ab0 commit 0972be4
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 18 deletions.
2 changes: 1 addition & 1 deletion crates/rooch-indexer/src/actor/reader_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ impl Handler<QueryIndexerObjectStatesMessage> for IndexerReaderActor {
} = msg;
self.indexer_reader
.query_object_states_with_filter(filter, cursor, limit, descending_order)
.map_err(|e| anyhow!(format!("Failed to query indexer global states: {:?}", e)))
.map_err(|e| anyhow!(format!("Failed to query indexer object states: {:?}", e)))
}
}
29 changes: 27 additions & 2 deletions crates/rooch-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ impl IndexerReader {
main_where_clause, cursor_clause, order_clause, limit,
);

tracing::debug!("query global states: {}", query);
tracing::debug!("query object states: {}", query);
let stored_states = self
.get_inner_indexer_reader(INDEXER_OBJECT_STATES_TABLE_NAME)?
.run_query(|conn| diesel::sql_query(query).load::<StoredObjectState>(conn))?;
Expand All @@ -415,11 +415,36 @@ impl IndexerReader {
.map(|v| v.try_into_indexer_global_state())
.collect::<Result<Vec<_>>>()
.map_err(|e| {
IndexerError::SQLiteReadError(format!("Cast indexer global states failed: {:?}", e))
IndexerError::SQLiteReadError(format!("Cast indexer object states failed: {:?}", e))
})?;

Ok(result)
}

pub fn query_last_state_index_by_tx_order(&self, tx_order: u64) -> IndexerResult<u64> {
let where_clause = format!("{TX_ORDER_STR} = \"{}\"", tx_order as i64);
let order_clause = format!("{TX_ORDER_STR} DESC, {STATE_INDEX_STR} DESC");
let query = format!(
"
SELECT * FROM object_states \
WHERE {} \
ORDER BY {} \
LIMIT 1
",
where_clause, order_clause,
);

tracing::debug!("query last state index by tx order: {}", query);
let stored_states = self
.get_inner_indexer_reader(INDEXER_OBJECT_STATES_TABLE_NAME)?
.run_query(|conn| diesel::sql_query(query).load::<StoredObjectState>(conn))?;
let last_state_index = if stored_states.is_empty() {
0
} else {
stored_states[0].state_index as u64 + 1
};
Ok(last_state_index)
}
}

fn object_type_query(object_type: &StructTag) -> String {
Expand Down
8 changes: 4 additions & 4 deletions crates/rooch-indexer/src/store/sqlite_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,16 @@ impl SqliteIndexerStore {
// ))
// .execute(&mut connection)
// .map_err(|e| IndexerError::SQLiteWriteError(e.to_string()))
// .context("Failed to write or update global states to SQLiteDB");
// .context("Failed to write or update object states to SQLiteDB");

// Execute the raw SQL query
diesel::sql_query(query.clone())
.execute(&mut connection)
.map_err(|e| {
log::error!("Upsert global states Executing Query error: {}", query);
log::error!("Upsert object states Executing Query error: {}", query);
IndexerError::SQLiteWriteError(e.to_string())
})
.context("Failed to write or update global states to SQLiteDB")?;
.context("Failed to write or update object states to SQLiteDB")?;

Ok(())
}
Expand All @@ -117,7 +117,7 @@ impl SqliteIndexerStore {
)
.execute(&mut connection)
.map_err(|e| IndexerError::SQLiteWriteError(e.to_string()))
.context("Failed to delete global states to SQLiteDB")?;
.context("Failed to delete object states to SQLiteDB")?;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/rooch-indexer/src/tests/test_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ fn test_state_store() -> Result<()> {
let mut update_object_states = random_update_object_states(new_object_states.clone());
let remove_object_states = random_remove_object_states();

//Merge new global states and update global states
//Merge new object states and update object states
new_object_states.append(&mut update_object_states);
indexer_store.persist_or_update_object_states(new_object_states.clone())?;
indexer_store.delete_object_states(remove_object_states)?;
Expand Down
19 changes: 14 additions & 5 deletions crates/rooch/src/commands/indexer/commands/rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ impl RebuildCommand {
pub async fn execute(self) -> RoochResult<()> {
let input_path = self.input.clone();
let batch_size = self.batch_size.unwrap();
let (indexer_store, _indexer_reader, start_time) = self.init();
let (indexer_store, indexer_reader, start_time) = self.init();
let (tx, rx) = mpsc::sync_channel(2);

let produce_updates_thread =
thread::spawn(move || produce_updates(tx, input_path, batch_size));
thread::spawn(move || produce_updates(tx, indexer_reader, input_path, batch_size));
let apply_updates_thread =
thread::spawn(move || apply_updates(rx, indexer_store, start_time));
let _ = produce_updates_thread
Expand Down Expand Up @@ -94,13 +94,22 @@ struct BatchUpdates {
object_states: Vec<IndexerObjectState>,
}

fn produce_updates(tx: SyncSender<BatchUpdates>, input: PathBuf, batch_size: usize) -> Result<()> {
fn produce_updates(
tx: SyncSender<BatchUpdates>,
indexer_reader: IndexerReader,
input: PathBuf,
batch_size: usize,
) -> Result<()> {
let mut csv_reader = BufReader::new(File::open(input).unwrap());
let mut last_state_type = None;

// set genesis tx_order and state_index_generator
// set genesis tx_order and state_index_generator for indexer rebuild
let tx_order: u64 = 0;
let mut state_index_generator: u64 = 0;
let mut state_index_generator = indexer_reader.query_last_state_index_by_tx_order(tx_order)?;
println!(
"Indexer rebuild produce_updates state_index_generator start from: {}",
state_index_generator
);

loop {
let mut updates = BatchUpdates {
Expand Down
2 changes: 1 addition & 1 deletion crates/rooch/src/commands/statedb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ rooch statedb export --output {your file} -d {your rooch data dir} -n main -m {e
3. rooch statedb import

```shell
rooch statedb statedb --input {your file} -d {your rooch data dir} -n main
rooch statedb import --input {your file} -d {your rooch data dir} -n main
```

### Config
Expand Down
13 changes: 9 additions & 4 deletions crates/rooch/src/commands/statedb/commands/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ impl ExportCommand {
root_state_root,
obj.id,
false,
true,
writer,
)?;
}
Expand Down Expand Up @@ -295,6 +296,7 @@ impl ExportCommand {
root_state_root,
object_id.clone(),
false,
true,
writer,
)?;

Expand Down Expand Up @@ -322,8 +324,9 @@ impl ExportCommand {
state_root: H256,
parent_state_root: H256,
object_id: ObjectID,
// export child object as object state under indexer mode
is_child_object_as_object_state: bool,
// export child field as object state under indexer mode
is_child_field_as_object_state: bool,
is_recursive_export_child_field: bool,
writer: &mut Writer<W>,
) -> Result<()> {
let starting_key = None;
Expand All @@ -333,7 +336,7 @@ impl ExportCommand {
.get_state_store()
.iter(state_root, starting_key.clone())?;

if object_id.has_child() {
if is_recursive_export_child_field && object_id.has_child() {
for item in iter {
let (_k, v) = item?;
if v.is_object() {
Expand All @@ -345,6 +348,7 @@ impl ExportCommand {
state_root,
object.id,
false,
false,
writer,
)?;
}
Expand All @@ -359,7 +363,7 @@ impl ExportCommand {

// write csv header.
{
let state_type = if is_child_object_as_object_state {
let state_type = if is_child_field_as_object_state {
GLOBAL_STATE_TYPE_OBJECT
} else {
GLOBAL_STATE_TYPE_FIELD
Expand Down Expand Up @@ -420,6 +424,7 @@ impl ExportCommand {
root_state_root,
obj.id,
true,
false,
writer,
)?;
}
Expand Down

0 comments on commit 0972be4

Please sign in to comment.