diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 52bbabbbd8..1cf0f7c4da 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -22,8 +22,8 @@ use starcoin_open_block::OpenedBlock; use starcoin_state_api::{AccountStateReader, ChainStateReader, ChainStateWriter}; use starcoin_statedb::ChainStateDB; use starcoin_storage::flexi_dag::SyncFlexiDagSnapshot; -use starcoin_storage::Store; use starcoin_storage::storage::CodecKVStore; +use starcoin_storage::Store; use starcoin_time_service::TimeService; use starcoin_types::block::BlockIdAndNumber; use starcoin_types::contract_event::ContractEventInfo; @@ -108,7 +108,10 @@ impl BlockChain { )), None => None, }; - let dag_snapshot_tips = storage.get_accumulator_snapshot_storage().get(head_id)?.map(|snapshot| snapshot.child_hashes); + let dag_snapshot_tips = storage + .get_accumulator_snapshot_storage() + .get(head_id)? + .map(|snapshot| snapshot.child_hashes); let mut chain = Self { genesis_hash: genesis, time_service, @@ -123,11 +126,7 @@ impl BlockChain { storage.as_ref(), ), status: ChainStatusWithBlock { - status: ChainStatus::new( - head_block.header.clone(), - block_info, - dag_snapshot_tips, - ), + status: ChainStatus::new(head_block.header.clone(), block_info, dag_snapshot_tips), head: head_block, }, statedb: chain_state, @@ -638,21 +637,25 @@ impl BlockChain { ); Ok(()) } - + pub fn dag_parents_in_tips(&self, dag_parents: Vec) -> Result { - Ok(dag_parents.into_iter().all(|parent| { - match &self.status.status.tips_hash { + Ok(dag_parents + .into_iter() + .all(|parent| match &self.status.status.tips_hash { Some(tips) => tips.contains(&parent), None => false, - } - })) + })) } pub fn is_head_of_dag_accumulator(&self, next_tips: Vec) -> Result { let key = Self::calculate_dag_accumulator_key(next_tips)?; let next_tips_info = self.storage.get_dag_accumulator_info(key)?; - return Ok(next_tips_info == self.dag_accumulator.as_ref().map(|accumulator| accumulator.get_info())); + return Ok(next_tips_info + == self + .dag_accumulator + .as_ref() + .map(|accumulator| accumulator.get_info())); } } diff --git a/commons/stream-task/src/collector.rs b/commons/stream-task/src/collector.rs index 3e597fce95..cd0e317bbd 100644 --- a/commons/stream-task/src/collector.rs +++ b/commons/stream-task/src/collector.rs @@ -15,7 +15,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use thiserror::Error; -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum CollectorState { /// Collector is enough, do not feed more item, finish task. Enough, diff --git a/node/src/node.rs b/node/src/node.rs index d2cf1e13cb..7a2da4afbc 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -13,7 +13,6 @@ use futures::executor::block_on; use futures_timer::Delay; use network_api::{PeerProvider, PeerSelector, PeerStrategy}; use starcoin_account_service::{AccountEventService, AccountService, AccountStorage}; -use starcoin_accumulator::node::AccumulatorStoreType; use starcoin_block_relayer::BlockRelayer; use starcoin_chain_notify::ChainNotifyHandlerService; use starcoin_chain_service::ChainReaderService; @@ -46,7 +45,7 @@ use starcoin_storage::db_storage::DBStorage; use starcoin_storage::errors::StorageInitError; use starcoin_storage::metrics::StorageMetrics; use starcoin_storage::storage::StorageInstance; -use starcoin_storage::{BlockStore, Storage, Store}; +use starcoin_storage::{BlockStore, Storage}; use starcoin_stratum::service::{StratumService, StratumServiceFactory}; use starcoin_stratum::stratum::{Stratum, StratumFactory}; use starcoin_sync::announcement::AnnouncementService; diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 194a84920b..ec2e56dcd0 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -740,7 +740,10 @@ impl SyncFlexiDagStore for Storage { // for block chain new_tips.iter().try_fold((), |_, block_id| { - if let Some(t) = self.flexi_dag_storage.get_hashes_by_hash(block_id.clone())? { + if let Some(t) = self + .flexi_dag_storage + .get_hashes_by_hash(block_id.clone())? + { if t != snapshot { bail!("the key {} should not exists", block_id); } diff --git a/sync/api/src/lib.rs b/sync/api/src/lib.rs index 7d7db3d9d6..d541abfa25 100644 --- a/sync/api/src/lib.rs +++ b/sync/api/src/lib.rs @@ -66,15 +66,6 @@ pub struct SyncTarget { pub peers: Vec, } -#[derive(Debug, Clone)] -pub struct NewBlockChainRequest { - pub new_head_block: HashValue, -} - -impl ServiceRequest for NewBlockChainRequest { - type Response = anyhow::Result<()>; -} - #[derive(Debug, Clone)] pub struct SyncStatusRequest; diff --git a/sync/src/block_connector/block_connector_service.rs b/sync/src/block_connector/block_connector_service.rs index b5766e6bd5..17e572367f 100644 --- a/sync/src/block_connector/block_connector_service.rs +++ b/sync/src/block_connector/block_connector_service.rs @@ -3,10 +3,10 @@ use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService}; use crate::sync::{CheckSyncEvent, SyncService}; -use crate::tasks::{BlockConnectedEvent, BlockDiskCheckEvent}; -use anyhow::{format_err, Result, Ok}; +use crate::tasks::{BlockConnectedEvent, BlockDiskCheckEvent, BlockConnectedFinishEvent}; +use anyhow::{format_err, Ok, Result}; use network_api::PeerProvider; -use starcoin_chain_api::{ConnectBlockError, WriteableChainService, ChainReader}; +use starcoin_chain_api::{ChainReader, ConnectBlockError, WriteableChainService}; use starcoin_config::{NodeConfig, G_CRATE_VERSION}; use starcoin_consensus::BlockDAG; use starcoin_executor::VMMetrics; @@ -16,16 +16,14 @@ use starcoin_service_registry::{ ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, }; use starcoin_storage::{BlockStore, Storage}; -use starcoin_sync_api::{PeerNewBlock, NewBlockChainRequest}; +use starcoin_sync_api::PeerNewBlock; use starcoin_txpool::TxPoolService; use starcoin_types::block::ExecutedBlock; use starcoin_types::sync_status::SyncStatus; -use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown, NewHeadBlock}; +use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown}; use std::sync::{Arc, Mutex}; use sysinfo::{DiskExt, System, SystemExt}; -use super::BlockConnectedRequest; - const DISK_CHECKPOINT_FOR_PANIC: u64 = 1024 * 1024 * 1024 * 3; const DISK_CHECKPOINT_FOR_WARN: u64 = 1024 * 1024 * 1024 * 5; @@ -170,15 +168,28 @@ impl EventHandler for BlockConnectorService { fn handle_event( &mut self, msg: BlockConnectedEvent, - _ctx: &mut ServiceContext, + ctx: &mut ServiceContext, ) { //because this block has execute at sync task, so just try connect to select head chain. //TODO refactor connect and execute let block = msg.block; - if let Err(e) = self.chain_service.try_connect(block, msg.dag_parents) { - error!("Process connected block error: {:?}", e); + let feedback = msg.feedback; + + match msg.action { + crate::tasks::BlockConnectAction::ConnectNewBlock => { + if let Err(e) = self.chain_service.try_connect(block, msg.dag_parents) { + error!("Process connected new block from sync error: {:?}", e); + } + } + crate::tasks::BlockConnectAction::ConnectExecutedBlock => { + if let Err(e) = self.chain_service.switch_new_main(block.header().id(), ctx) { + error!("Process connected executed block from sync error: {:?}", e); + } + } } + + feedback.map(|f| f.unbounded_send(BlockConnectedFinishEvent)); } } @@ -222,7 +233,9 @@ impl EventHandler for BlockConnectorService { match connect_error { ConnectBlockError::FutureBlock(block) => { //TODO cache future block - if let std::result::Result::Ok(sync_service) = ctx.service_ref::() { + if let std::result::Result::Ok(sync_service) = + ctx.service_ref::() + { info!( "BlockConnector try connect future block ({:?},{}), peer_id:{:?}, notify Sync service check sync.", block.id(), @@ -279,18 +292,6 @@ impl ServiceHandler for BlockConnectorService { } } -impl ServiceHandler for BlockConnectorService { - fn handle( - &mut self, - msg: NewBlockChainRequest, - ctx: &mut ServiceContext, - ) -> Result<()> { - let (new_branch, dag_parents, next_tips) = self.chain_service.switch_new_main(msg.new_head_block)?; - ctx.broadcast(NewHeadBlock(Arc::new(new_branch.head_block()), Some(dag_parents), Some(next_tips))); - Ok(()) - } -} - impl ServiceHandler for BlockConnectorService { fn handle( &mut self, @@ -301,18 +302,3 @@ impl ServiceHandler for BlockConnectorService { .execute(msg.block, msg.dag_transaction_parent) } } - -impl ServiceHandler for BlockConnectorService { - fn handle( - &mut self, - msg: BlockConnectedRequest, - _ctx: &mut ServiceContext, - ) -> Result<()> { - //because this block has execute at sync task, so just try connect to select head chain. - //TODO refactor connect and execute - - let block = msg.block; - let result = self.chain_service.try_connect(block, msg.dag_parents); - result - } -} diff --git a/sync/src/block_connector/mod.rs b/sync/src/block_connector/mod.rs index 7ef4a4a41b..2f8d8af133 100644 --- a/sync/src/block_connector/mod.rs +++ b/sync/src/block_connector/mod.rs @@ -45,13 +45,3 @@ pub struct ExecuteRequest { impl ServiceRequest for ExecuteRequest { type Response = anyhow::Result; } - -#[derive(Debug, Clone)] -pub struct BlockConnectedRequest { - pub block: Block, - pub dag_parents: Option>, -} - -impl ServiceRequest for BlockConnectedRequest { - type Response = anyhow::Result<()>; -} \ No newline at end of file diff --git a/sync/src/block_connector/test_write_dag_block_chain.rs b/sync/src/block_connector/test_write_dag_block_chain.rs index bb1e3c53e9..c74c9aef83 100644 --- a/sync/src/block_connector/test_write_dag_block_chain.rs +++ b/sync/src/block_connector/test_write_dag_block_chain.rs @@ -48,8 +48,7 @@ pub fn gen_dag_blocks( } } - let result = writeable_block_chain_service - .execute_dag_block_pool(); + let result = writeable_block_chain_service.execute_dag_block_pool(); let result = result.unwrap(); match result { super::write_block_chain::ConnectOk::Duplicate(block) @@ -159,7 +158,7 @@ async fn test_block_chain_switch_main() { .get_main() .current_header() .id(), - last_block.unwrap() + last_block.unwrap() ); last_block = gen_fork_dag_block_chain( diff --git a/sync/src/block_connector/write_block_chain.rs b/sync/src/block_connector/write_block_chain.rs index 0ff446779f..fbe8226a7d 100644 --- a/sync/src/block_connector/write_block_chain.rs +++ b/sync/src/block_connector/write_block_chain.rs @@ -13,9 +13,9 @@ use starcoin_crypto::HashValue; use starcoin_executor::VMMetrics; use starcoin_logger::prelude::*; use starcoin_service_registry::bus::{Bus, BusService}; -use starcoin_service_registry::ServiceRef; -use starcoin_storage::Store; +use starcoin_service_registry::{ServiceContext, ServiceRef}; use starcoin_storage::storage::CodecKVStore; +use starcoin_storage::Store; use starcoin_time_service::{DagBlockTimeWindowService, TimeWindowResult}; use starcoin_txpool_api::TxPoolSyncService; use starcoin_types::block::BlockInfo; @@ -28,6 +28,8 @@ use starcoin_types::{ use std::fmt::Formatter; use std::sync::{Arc, Mutex}; +use super::BlockConnectorService; + const MAX_ROLL_BACK_BLOCK: usize = 10; pub struct WriteBlockChainService

@@ -223,26 +225,41 @@ where // switch by: // 1, update the startup info // 2, broadcast the new header - pub fn switch_new_main(&mut self, new_head_block: HashValue) -> Result<(BlockChain, Vec, Vec)> { - let new_branch = BlockChain::new(self.config - .net() - .time_service(), - new_head_block, - self.storage.clone(), - self.vm_metrics.clone())?; + pub fn switch_new_main( + &mut self, + new_head_block: HashValue, + ctx: &mut ServiceContext, + ) -> Result<()> { + let new_branch = BlockChain::new( + self.config.net().time_service(), + new_head_block, + self.storage.clone(), + self.vm_metrics.clone(), + )?; let main_total_difficulty = self.main.get_total_difficulty()?; let branch_total_difficulty = new_branch.get_total_difficulty()?; if branch_total_difficulty > main_total_difficulty { self.update_startup_info(new_branch.head_block().header())?; - } - let dag_parents = self.dag.lock().unwrap().get_parents(new_head_block)?; - let next_tips = self.storage.get_accumulator_snapshot_storage() - .get(new_head_block)? - .expect("the snapshot must exists!").child_hashes; - - Ok((new_branch, dag_parents, next_tips)) + let dag_parents = self.dag.lock().unwrap().get_parents(new_head_block)?; + let next_tips = self + .storage + .get_accumulator_snapshot_storage() + .get(new_head_block)? + .expect("the snapshot must exists!") + .child_hashes; + + ctx.broadcast(NewHeadBlock( + Arc::new(new_branch.head_block()), + Some(dag_parents), + Some(next_tips), + )); + + Ok(()) + } else { + bail!("no need to switch"); + } } pub fn select_head( @@ -259,17 +276,18 @@ where ); if branch_total_difficulty > main_total_difficulty { - let (enacted_count, enacted_blocks, retracted_count, retracted_blocks) = if dag_block_parents.is_some() { - // for dag - self.find_ancestors_from_dag_accumulator(&new_branch)? - } else { - // for single chain - if !parent_is_main_head { - self.find_ancestors_from_accumulator(&new_branch)? + let (enacted_count, enacted_blocks, retracted_count, retracted_blocks) = + if dag_block_parents.is_some() { + // for dag + self.find_ancestors_from_dag_accumulator(&new_branch)? } else { - (1, vec![executed_block.block.clone()], 0, vec![]) - } - }; + // for single chain + if !parent_is_main_head { + self.find_ancestors_from_accumulator(&new_branch)? + } else { + (1, vec![executed_block.block.clone()], 0, vec![]) + } + }; self.main = new_branch; self.do_new_head( @@ -305,7 +323,10 @@ where enacted_blocks.last().unwrap().header, executed_block.block().header); } debug_assert!(!enacted_blocks.is_empty()); - debug_assert_eq!(enacted_blocks.last().unwrap().header, executed_block.block().header); + debug_assert_eq!( + enacted_blocks.last().unwrap().header, + executed_block.block().header + ); self.update_startup_info(executed_block.block().header())?; if retracted_count > 0 { if let Some(metrics) = self.metrics.as_ref() { @@ -334,9 +355,7 @@ where Ok(()) } - pub fn do_new_head_with_broadcast() { - - } + pub fn do_new_head_with_broadcast() {} /// Reset the node to `block_id`, and replay blocks after the block pub fn reset( @@ -441,22 +460,29 @@ where } } - - fn find_ancestors_from_dag_accumulator(&self, new_branch: &BlockChain) -> Result<(u64, Vec, u64, Vec)> { - let mut min_leaf_index = std::cmp::min(self.main.get_dag_current_leaf_number()?, new_branch.get_dag_current_leaf_number()?) - 1; + fn find_ancestors_from_dag_accumulator( + &self, + new_branch: &BlockChain, + ) -> Result<(u64, Vec, u64, Vec)> { + let mut min_leaf_index = std::cmp::min( + self.main.get_dag_current_leaf_number()?, + new_branch.get_dag_current_leaf_number()?, + ) - 1; let mut retracted = vec![]; let mut enacted = vec![]; - let snapshot = new_branch.get_dag_accumulator_snapshot(new_branch.head_block().header().id())?; + let snapshot = + new_branch.get_dag_accumulator_snapshot(new_branch.head_block().header().id())?; let mut children = snapshot.child_hashes.clone(); children.sort(); for child in children { - match self - .storage - .get_block(child)? { + match self.storage.get_block(child)? { Some(block) => enacted.push(block), - None => bail!("the block{} dose not exist in new branch, ignore", child.clone()), + None => bail!( + "the block{} dose not exist in new branch, ignore", + child.clone() + ), } } enacted.reverse(); @@ -465,40 +491,59 @@ where if min_leaf_index == 0 { break; } - let main_snapshot = self.main.get_dag_accumulator_snapshot_by_index(min_leaf_index)?; - let new_branch_snapshot = new_branch.get_dag_accumulator_snapshot_by_index(min_leaf_index)?; - - if main_snapshot.accumulator_info.get_accumulator_root() == new_branch_snapshot.accumulator_info.get_accumulator_root() { + let main_snapshot = self + .main + .get_dag_accumulator_snapshot_by_index(min_leaf_index)?; + let new_branch_snapshot = + new_branch.get_dag_accumulator_snapshot_by_index(min_leaf_index)?; + + if main_snapshot.accumulator_info.get_accumulator_root() + == new_branch_snapshot.accumulator_info.get_accumulator_root() + { break; } let mut temp_retracted = vec![]; - temp_retracted.extend(main_snapshot.child_hashes.iter().try_fold(Vec::::new(), |mut rollback_blocks, child| { - let block = self - .storage - .get_block(child.clone()); - if let anyhow::Result::Ok(Some(block)) = block { - rollback_blocks.push(block); - } else { - bail!("the block{} dose not exist in main branch, ignore", child.clone()); - } - return Ok(rollback_blocks); - })?.into_iter()); + temp_retracted.extend( + main_snapshot + .child_hashes + .iter() + .try_fold(Vec::::new(), |mut rollback_blocks, child| { + let block = self.storage.get_block(child.clone()); + if let anyhow::Result::Ok(Some(block)) = block { + rollback_blocks.push(block); + } else { + bail!( + "the block{} dose not exist in main branch, ignore", + child.clone() + ); + } + return Ok(rollback_blocks); + })? + .into_iter(), + ); temp_retracted.sort_by(|a, b| b.header().id().cmp(&a.header().id())); retracted.extend(temp_retracted.into_iter()); let mut temp_enacted = vec![]; - temp_enacted.extend(new_branch_snapshot.child_hashes.iter().try_fold(Vec::::new(), |mut rollback_blocks, child| { - let block = self - .storage - .get_block(child.clone()); - if let anyhow::Result::Ok(Some(block)) = block { - rollback_blocks.push(block); - } else { - bail!("the block{} dose not exist in new branch, ignore", child.clone()); - } - return Ok(rollback_blocks); - })?.into_iter()); + temp_enacted.extend( + new_branch_snapshot + .child_hashes + .iter() + .try_fold(Vec::::new(), |mut rollback_blocks, child| { + let block = self.storage.get_block(child.clone()); + if let anyhow::Result::Ok(Some(block)) = block { + rollback_blocks.push(block); + } else { + bail!( + "the block{} dose not exist in new branch, ignore", + child.clone() + ); + } + return Ok(rollback_blocks); + })? + .into_iter(), + ); temp_enacted.sort_by(|a, b| b.header().id().cmp(&a.header().id())); enacted.extend(temp_enacted.into_iter()); @@ -506,7 +551,12 @@ where } enacted.reverse(); retracted.reverse(); - Ok((enacted.len() as u64, enacted, retracted.len() as u64, retracted)) + Ok(( + enacted.len() as u64, + enacted, + retracted.len() as u64, + retracted, + )) } fn find_ancestors_from_accumulator( @@ -625,7 +675,11 @@ where dag_block_next_parent: Option, next_tips: &mut Option>, ) -> Result { - let (block_info, fork) = self.find_or_fork(block.header(), dag_block_next_parent, dag_block_parents.clone())?; + let (block_info, fork) = self.find_or_fork( + block.header(), + dag_block_next_parent, + dag_block_parents.clone(), + )?; match (block_info, fork) { //block has been processed in some branch, so just trigger a head selection. (Some(block_info), Some(branch)) => { @@ -789,18 +843,18 @@ where // // TimeWindowResult::InTimeWindow => { // return Ok(ConnectOk::DagPending); // } else { - // TimeWindowResult::BeforeTimeWindow => { - // return Err(ConnectBlockError::DagBlockBeforeTimeWindow(Box::new(block)).into()) - // } - // TimeWindowResult::AfterTimeWindow => { - // dump the block in the time window pool and put the block into the next time window pool - // self.main.status().tips_hash = None; // set the tips to None, and in connect_to_main, the block will be added to the tips - - // 2, get the new tips and clear the blocks in the pool - let dag_blocks = self.dag_block_pool.lock().unwrap().clone(); - self.dag_block_pool.lock().unwrap().clear(); - - return self.execute_dag_block_in_pool(dag_blocks, dag_block_parents); + // TimeWindowResult::BeforeTimeWindow => { + // return Err(ConnectBlockError::DagBlockBeforeTimeWindow(Box::new(block)).into()) + // } + // TimeWindowResult::AfterTimeWindow => { + // dump the block in the time window pool and put the block into the next time window pool + // self.main.status().tips_hash = None; // set the tips to None, and in connect_to_main, the block will be added to the tips + + // 2, get the new tips and clear the blocks in the pool + let dag_blocks = self.dag_block_pool.lock().unwrap().clone(); + self.dag_block_pool.lock().unwrap().clear(); + + return self.execute_dag_block_in_pool(dag_blocks, dag_block_parents); // } } else { // normal block, just connect to main @@ -878,13 +932,12 @@ where if self.main.dag_parents_in_tips(new_tips.clone())? { // 1, write to disc if !connected { - self.main - .append_dag_accumulator_leaf(new_tips.clone())?; + self.main.append_dag_accumulator_leaf(new_tips.clone())?; connected = true; } } - if connected { + if connected { // 2, broadcast the blocks sorted by their id executed_blocks .iter() @@ -904,11 +957,17 @@ where .map(|(exe_block, _)| { if connected { ConnectOk::ExeConnectMain( - exe_block.as_ref().expect("exe block should not be None!").clone(), + exe_block + .as_ref() + .expect("exe block should not be None!") + .clone(), ) } else { ConnectOk::ExeConnectBranch( - exe_block.as_ref().expect("exe block should not be None!").clone(), + exe_block + .as_ref() + .expect("exe block should not be None!") + .clone(), ) } }) diff --git a/sync/src/sync.rs b/sync/src/sync.rs index f8fdd391f5..4ece3de4cb 100644 --- a/sync/src/sync.rs +++ b/sync/src/sync.rs @@ -95,22 +95,26 @@ impl SyncService { // let genesis = storage // .get_genesis()? // .ok_or_else(|| format_err!("Can not find genesis hash in storage."))?; - let dag_accumulator_info = match storage.get_dag_accumulator_info(head_block_info.block_id().clone())? { - Some(info) => Some(info), - None => { - warn!( - "Can not find dag accumulator info by head block id: {}, use genesis info.", - head_block_info.block_id(), - ); - None - } - }; + let dag_accumulator_info = + match storage.get_dag_accumulator_info(head_block_info.block_id().clone())? { + Some(info) => Some(info), + None => { + warn!( + "Can not find dag accumulator info by head block id: {}, use genesis info.", + head_block_info.block_id(), + ); + None + } + }; Ok(Self { - sync_status: SyncStatus::new(ChainStatus::new( - head_block.header.clone(), - head_block_info, - Some(storage.get_tips_by_block_id(head_block_hash)?), - ), dag_accumulator_info), + sync_status: SyncStatus::new( + ChainStatus::new( + head_block.header.clone(), + head_block_info, + Some(storage.get_tips_by_block_id(head_block_hash)?), + ), + dag_accumulator_info, + ), stage: SyncStage::NotStart, config, storage, diff --git a/sync/src/tasks/block_sync_task.rs b/sync/src/tasks/block_sync_task.rs index 333220da1a..24aa38af49 100644 --- a/sync/src/tasks/block_sync_task.rs +++ b/sync/src/tasks/block_sync_task.rs @@ -1,7 +1,7 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use crate::block_connector::{BlockConnectedRequest, BlockConnectorService}; +use crate::block_connector::BlockConnectorService; use crate::tasks::{BlockConnectedEventHandle, BlockFetcher, BlockLocalStore}; use crate::verified_rpc_client::RpcVerifyError; use anyhow::{format_err, Ok, Result}; @@ -16,14 +16,16 @@ use starcoin_config::G_CRATE_VERSION; use starcoin_consensus::BlockDAG; use starcoin_crypto::HashValue; use starcoin_logger::prelude::*; -use starcoin_service_registry::ServiceRef; use starcoin_storage::BARNARD_HARD_FORK_HASH; -use starcoin_sync_api::{SyncTarget, NewBlockChainRequest}; +use starcoin_sync_api::SyncTarget; use starcoin_types::block::{Block, BlockIdAndNumber, BlockInfo, BlockNumber}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use std::time::Duration; use stream_task::{CollectorState, TaskError, TaskResultCollector, TaskState}; +use super::{BlockConnectAction, BlockConnectedEvent, BlockConnectedFinishEvent}; + #[derive(Clone, Debug)] pub struct SyncBlockData { pub(crate) block: Block, @@ -238,7 +240,6 @@ pub struct BlockCollector { dag_block_pool: Vec, target_accumulator_root: HashValue, dag: Option>>, - block_chain_service: ServiceRef, } impl BlockCollector @@ -255,7 +256,6 @@ where skip_pow_verify: bool, target_accumulator_root: HashValue, dag: Option>>, - block_chain_service: ServiceRef, ) -> Self { if let Some(dag) = &dag { dag.lock() @@ -273,7 +273,6 @@ where dag_block_pool: Vec::new(), target_accumulator_root, dag: dag.clone(), - block_chain_service: block_chain_service.clone(), } } @@ -287,6 +286,71 @@ where self.apply_block(block, None, dag_parent, next_tips) } + fn notify_connected_block( + &mut self, + block: Block, + block_info: BlockInfo, + action: BlockConnectAction, + state: CollectorState, + dag_parents: Option> + ) -> Result { + let total_difficulty = block_info.get_total_difficulty(); + + // if the new block's total difficulty is smaller than the current, + // do nothing because we do not need to update the current chain in any other services. + if total_difficulty <= self.current_block_info.total_difficulty { + return Ok(state); // nothing to do + } + + // only try connect block when sync chain total_difficulty > node's current chain. + + // first, create the sender and receiver for ensuring that + // the last block is connected before the next synchronization is triggered. + // if the block is not the last one, we do not want to do this. + let (sender, mut receiver) = match state { + CollectorState::Enough => { + let (s, r) = futures::channel::mpsc::unbounded::(); + (Some(s), Some(r)) + } + CollectorState::Need => (None, None), + }; + + // second, construct the block connect event. + let block_connect_event = BlockConnectedEvent { + block, + dag_parents, + feedback: sender, + action, + }; + + // third, broadcast it. + if let Err(e) = self.event_handle.handle(block_connect_event.clone()) { + error!( + "Send BlockConnectedEvent error: {:?}, block_id: {}", + e, + block_info.block_id() + ); + } + + // finally, if it is the last one, wait for the last block to be processed. + if block_connect_event.feedback.is_some() && receiver.is_some() { + let mut count = 0; + while count < 3 { + count += 1; + match receiver.as_mut().unwrap().try_next() { + std::result::Result::Ok(_) => { + break; + } + Err(_) => { + info!("Waiting for last block to be processed"); + async_std::task::block_on(async_std::task::sleep(Duration::from_secs(10))); + } + } + } + } + return Ok(state); + } + fn apply_block( &mut self, block: Block, @@ -359,20 +423,31 @@ where } } - fn check_if_sync_complete_for_dag(&self) -> Result { - if self.last_accumulator_root == self.target_accumulator_root { - return Ok(CollectorState::Enough); + fn broadcast_dag_chain_block(&mut self, broadcast_blocks: Vec<(Block, BlockInfo, Option>, BlockConnectAction)>) -> Result { + let state = if self.last_accumulator_root == self.target_accumulator_root { + CollectorState::Enough } else { - return Ok(CollectorState::Need); - } + CollectorState::Need + }; + + let last_index = broadcast_blocks.len() - 1; + broadcast_blocks.into_iter().enumerate().for_each(|(index, (block, block_info, dag_parents, action))| { + if last_index == index && state == CollectorState::Enough { + let _ = self.notify_connected_block(block, block_info, action, CollectorState::Enough, dag_parents); + } else { + let _ = self.notify_connected_block(block, block_info, action, CollectorState::Need, dag_parents); + } + }); + + return Ok(state); } - fn check_if_sync_complete(&self, block_info: BlockInfo) -> Result { + fn broadcast_single_chain_block(&mut self, block: Block, block_info: BlockInfo, action: BlockConnectAction) -> Result { let target = self .target .as_ref() .expect("the process is for single chain"); - if block_info.block_accumulator_info.num_leaves + let state = if block_info.block_accumulator_info.num_leaves == target.block_info.block_accumulator_info.num_leaves { if block_info != target.block_info { @@ -393,14 +468,16 @@ where } } else { Ok(CollectorState::Need) - } + }; + + self.notify_connected_block(block, block_info, action, state?, None) } fn collect_item( &mut self, item: SyncBlockData, next_tips: &mut Option>, - ) -> Result { + ) -> Result<(Block, BlockInfo, Option>, BlockConnectAction)> { let (block, block_info, peer_id, dag_parents, dag_transaction_header) = item.into(); let block_id = block.id(); let timestamp = block.header().timestamp(); @@ -433,34 +510,17 @@ where next_tips, )?; let block_info = self.chain.status().info; - let total_difficulty = block_info.get_total_difficulty(); - // only try connect block when sync chain total_difficulty > node's current chain. - if total_difficulty > self.current_block_info.total_difficulty { - async_std::task::block_on(self.block_chain_service.send(NewBlockChainRequest { new_head_block: block_id }))??; - } - Ok(block_info) + Ok((block, block_info, dag_parents, BlockConnectAction::ConnectExecutedBlock)) } None => { self.apply_block(block.clone(), peer_id, dag_transaction_header, next_tips)?; self.chain.time_service().adjust(timestamp); let block_info = self.chain.status().info; - let total_difficulty = block_info.get_total_difficulty(); - // only try connect block when sync chain total_difficulty > node's current chain. - if total_difficulty > self.current_block_info.total_difficulty { - async_std::task::block_on(self.block_chain_service.send(BlockConnectedRequest { block, dag_parents }))??; - // if let Err(e) = self - // .event_handle - // .handle(BlockConnectedEvent { block, dag_parents }) - // { - // error!( - // "Send BlockConnectedEvent error: {:?}, block_id: {}", - // e, block_id - // ); - // } - } - Ok(block_info) + Ok((block, block_info, dag_parents, BlockConnectAction::ConnectNewBlock)) } }; + + } } @@ -476,6 +536,7 @@ where if item.accumulator_root.is_some() { self.dag_block_pool.push(item.clone()); self.last_accumulator_root = item.accumulator_root.unwrap(); + if item.count_in_leaf != self.dag_block_pool.len() as u64 { return Ok(CollectorState::Need); } else { @@ -495,16 +556,19 @@ where assert!(!process_block_pool.is_empty()); - let mut block_info = None; let mut next_tips = Some(Vec::::new()); + let mut block_to_broadcast = vec![]; for item in process_block_pool { - block_info = Some(self.collect_item(item, &mut next_tips)?); + block_to_broadcast.push(self.collect_item(item, &mut next_tips)?); } //verify target match self.target { Some(_) => { - self.check_if_sync_complete(block_info.expect("block_info should not be None")) + assert_eq!(block_to_broadcast.len(), 1, "in single chain , block_info should exist!"); + let (block, block_info, _, action) = block_to_broadcast.pop().unwrap(); + // self.check_if_sync_complete(block_info) + self.broadcast_single_chain_block(block, block_info, action) } None => { // dag @@ -515,7 +579,7 @@ where self.chain.append_dag_accumulator_leaf( next_tips.expect("next_tips should not be None"), )?; - self.check_if_sync_complete_for_dag() + self.broadcast_dag_chain_block(block_to_broadcast) } } } diff --git a/sync/src/tasks/inner_sync_task.rs b/sync/src/tasks/inner_sync_task.rs index 7c54bc8b01..43b53c16e2 100644 --- a/sync/src/tasks/inner_sync_task.rs +++ b/sync/src/tasks/inner_sync_task.rs @@ -1,7 +1,10 @@ -use crate::{tasks::{ - AccumulatorCollector, BlockAccumulatorSyncTask, BlockCollector, BlockConnectedEventHandle, - BlockFetcher, BlockIdFetcher, BlockSyncTask, PeerOperator, -}, block_connector::BlockConnectorService}; +use crate::{ + block_connector::BlockConnectorService, + tasks::{ + AccumulatorCollector, BlockAccumulatorSyncTask, BlockCollector, BlockConnectedEventHandle, + BlockFetcher, BlockIdFetcher, BlockSyncTask, PeerOperator, + }, +}; use anyhow::format_err; use network_api::PeerProvider; use starcoin_accumulator::node::AccumulatorStoreType; @@ -145,7 +148,6 @@ where skip_pow_verify_when_sync, HashValue::zero(), None, - block_chain_service.clone(), ); Ok(TaskGenerator::new( block_sync_task, diff --git a/sync/src/tasks/mod.rs b/sync/src/tasks/mod.rs index 6b4c9e5a77..5b093977f2 100644 --- a/sync/src/tasks/mod.rs +++ b/sync/src/tasks/mod.rs @@ -423,7 +423,7 @@ impl BlockLocalStore for Arc { Some(block) => { let id = block.id(); let block_info = self.get_block_info(id)?; - + Ok(Some(SyncBlockData::new( block, block_info, None, None, 1, None, None, ))) @@ -434,12 +434,23 @@ impl BlockLocalStore for Arc { } } +#[derive(Clone, Debug)] +pub enum BlockConnectAction { + ConnectNewBlock, + ConnectExecutedBlock, +} + #[derive(Clone, Debug)] pub struct BlockConnectedEvent { pub block: Block, pub dag_parents: Option>, + pub feedback: Option>, + pub action: BlockConnectAction, } +#[derive(Clone, Debug)] +pub struct BlockConnectedFinishEvent; + #[derive(Clone, Debug)] pub struct BlockDiskCheckEvent {} diff --git a/sync/src/tasks/sync_dag_block_task.rs b/sync/src/tasks/sync_dag_block_task.rs index 0a32d6e332..7187f25e23 100644 --- a/sync/src/tasks/sync_dag_block_task.rs +++ b/sync/src/tasks/sync_dag_block_task.rs @@ -67,19 +67,16 @@ impl SyncDagBlockTask { // the order must be the same between snapshot.child_hashes and block_with_infos let mut absent_block = vec![]; let mut result = vec![]; - snapshot - .child_hashes - .iter() - .for_each(|block_id| { - absent_block.push(block_id.clone()); - result.push(SyncDagBlockInfo { - block_id: block_id.clone(), - block: None, - peer_id: None, - dag_parents: vec![], - dag_transaction_header: None, - }); + snapshot.child_hashes.iter().for_each(|block_id| { + absent_block.push(block_id.clone()); + result.push(SyncDagBlockInfo { + block_id: block_id.clone(), + block: None, + peer_id: None, + dag_parents: vec![], + dag_transaction_header: None, }); + }); let fetched_block_info = self .fetcher @@ -130,29 +127,24 @@ impl SyncDagBlockTask { }); result.sort_by_key(|item| item.block_id); - - let block_info = self.local_store.get_block_infos(result.iter().map(|item| { - item.block_id - }).collect())?; + let block_info = self + .local_store + .get_block_infos(result.iter().map(|item| item.block_id).collect())?; Ok(result .into_iter() .zip(block_info) - .map(|(item, block_info)| { - SyncBlockData { - block: item.block.expect("block should exists"), - info: block_info, - peer_id: item.peer_id, - accumulator_root: Some( - snapshot.accumulator_info.get_accumulator_root().clone(), - ), - count_in_leaf: snapshot.child_hashes.len() as u64, - dag_block_headers: Some(item.dag_parents), - dag_transaction_header: Some( - item.dag_transaction_header - .expect("dag transaction header should exists"), - ), - } + .map(|(item, block_info)| SyncBlockData { + block: item.block.expect("block should exists"), + info: block_info, + peer_id: item.peer_id, + accumulator_root: Some(snapshot.accumulator_info.get_accumulator_root().clone()), + count_in_leaf: snapshot.child_hashes.len() as u64, + dag_block_headers: Some(item.dag_parents), + dag_transaction_header: Some( + item.dag_transaction_header + .expect("dag transaction header should exists"), + ), }) .collect()) } diff --git a/sync/src/tasks/sync_dag_full_task.rs b/sync/src/tasks/sync_dag_full_task.rs index b3da3b43b0..fd21affaad 100644 --- a/sync/src/tasks/sync_dag_full_task.rs +++ b/sync/src/tasks/sync_dag_full_task.rs @@ -1,14 +1,14 @@ use std::sync::{Arc, Mutex}; -use anyhow::{format_err, Ok, anyhow}; +use anyhow::{anyhow, format_err, Ok}; use async_std::task::Task; -use futures::{FutureExt, future::BoxFuture}; +use futures::{future::BoxFuture, FutureExt}; use network_api::PeerProvider; use starcoin_accumulator::{ accumulator_info::AccumulatorInfo, Accumulator, AccumulatorTreeStore, MerkleAccumulator, }; use starcoin_chain::BlockChain; -use starcoin_chain_api::{ChainWriter, ChainReader}; +use starcoin_chain_api::{ChainReader, ChainWriter}; use starcoin_consensus::BlockDAG; use starcoin_crypto::HashValue; use starcoin_executor::VMMetrics; @@ -17,7 +17,9 @@ use starcoin_network::NetworkServiceRef; use starcoin_service_registry::ServiceRef; use starcoin_storage::{flexi_dag::SyncFlexiDagSnapshotStorage, storage::CodecKVStore, Store}; use starcoin_time_service::TimeService; -use stream_task::{Generator, TaskEventCounterHandle, TaskGenerator, TaskError, TaskFuture, TaskHandle}; +use stream_task::{ + Generator, TaskError, TaskEventCounterHandle, TaskFuture, TaskGenerator, TaskHandle, +}; use crate::{block_connector::BlockConnectorService, verified_rpc_client::VerifiedRpcClient}; @@ -41,36 +43,36 @@ pub async fn find_dag_ancestor_task( let ext_error_handle = Arc::new(ExtSyncTaskErrorHandle::new(fetcher.clone())); - // here should compare the dag's node not accumulator leaf node - let sync_task = TaskGenerator::new( - FindAncestorTask::new( - local_accumulator_info.num_leaves - 1, - target_accumulator_info.num_leaves, - fetcher, - ), - 2, - max_retry_times, - delay_milliseconds_on_error, - AncestorCollector::new( - Arc::new(MerkleAccumulator::new_with_info( - local_accumulator_info, - accumulator_store.clone(), - )), - accumulator_snapshot.clone(), - ), - event_handle.clone(), - ext_error_handle.clone(), - ) - .generate(); - let (fut, _handle) = sync_task.with_handle(); - match fut.await { - anyhow::Result::Ok(ancestor) => { - return Ok(ancestor); - } - Err(error) => { - return Err(anyhow!(error)); - } + // here should compare the dag's node not accumulator leaf node + let sync_task = TaskGenerator::new( + FindAncestorTask::new( + local_accumulator_info.num_leaves - 1, + target_accumulator_info.num_leaves, + fetcher, + ), + 2, + max_retry_times, + delay_milliseconds_on_error, + AncestorCollector::new( + Arc::new(MerkleAccumulator::new_with_info( + local_accumulator_info, + accumulator_store.clone(), + )), + accumulator_snapshot.clone(), + ), + event_handle.clone(), + ext_error_handle.clone(), + ) + .generate(); + let (fut, _handle) = sync_task.with_handle(); + match fut.await { + anyhow::Result::Ok(ancestor) => { + return Ok(ancestor); } + Err(error) => { + return Err(anyhow!(error)); + } + } } async fn sync_accumulator( @@ -89,53 +91,53 @@ async fn sync_accumulator( let ext_error_handle = Arc::new(ExtSyncTaskErrorHandle::new(fetcher.clone())); - let sync_task = TaskGenerator::new( - SyncDagAccumulatorTask::new( - start_index.saturating_add(1), - 3, - target_accumulator_info.num_leaves, - fetcher.clone(), - ), - 2, - max_retry_times, - delay_milliseconds_on_error, - SyncDagAccumulatorCollector::new( - MerkleAccumulator::new_with_info(local_accumulator_info, accumulator_store.clone()), - accumulator_snapshot.clone(), - target_accumulator_info, - start_index, - ), - event_handle.clone(), - ext_error_handle, - ) - .generate(); - let (fut, handle) = sync_task.with_handle(); - match fut.await { - anyhow::Result::Ok((start_index, full_accumulator)) => { - return anyhow::Result::Ok((start_index, full_accumulator)); - } - Err(error) => { - return Err(anyhow!(error)); - } + let sync_task = TaskGenerator::new( + SyncDagAccumulatorTask::new( + start_index.saturating_add(1), + 3, + target_accumulator_info.num_leaves, + fetcher.clone(), + ), + 2, + max_retry_times, + delay_milliseconds_on_error, + SyncDagAccumulatorCollector::new( + MerkleAccumulator::new_with_info(local_accumulator_info, accumulator_store.clone()), + accumulator_snapshot.clone(), + target_accumulator_info, + start_index, + ), + event_handle.clone(), + ext_error_handle, + ) + .generate(); + let (fut, handle) = sync_task.with_handle(); + match fut.await { + anyhow::Result::Ok((start_index, full_accumulator)) => { + return anyhow::Result::Ok((start_index, full_accumulator)); + } + Err(error) => { + return Err(anyhow!(error)); } + } - // TODO: we need to talk about this - // .and_then(|sync_accumulator_result, event_handle| { - // let sync_dag_accumulator_task = TaskGenerator::new( - // SyncDagBlockTask::new(), - // 2, - // max_retry_times, - // delay_milliseconds_on_error, - // SyncDagAccumulatorCollector::new(), - // event_handle.clone(), - // ext_error_handle, - // ); - // Ok(sync_dag_accumulator_task) - // }); + // TODO: we need to talk about this + // .and_then(|sync_accumulator_result, event_handle| { + // let sync_dag_accumulator_task = TaskGenerator::new( + // SyncDagBlockTask::new(), + // 2, + // max_retry_times, + // delay_milliseconds_on_error, + // SyncDagAccumulatorCollector::new(), + // event_handle.clone(), + // ext_error_handle, + // ); + // Ok(sync_dag_accumulator_task) + // }); // return Ok(async_std::task::block_on(sync)); // match async_std::task::block_on(sync) { // std::result::Result::Ok((index, accumulator)) => { - // debug!("sync accumulator success, target accumulator info's leaf count = {}, root hash = {}, begin index = {}", + // debug!("sync accumulator success, target accumulator info's leaf count = {}, root hash = {}, begin index = {}", // accumulator.get_info().get_num_leaves(), accumulator.get_info().get_accumulator_root(), index); // return Ok((index, accumulator)); // } @@ -180,7 +182,7 @@ async fn sync_dag_block( dag: Arc>, block_chain_service: ServiceRef, vm_metrics: Option, -) -> anyhow::Result +) -> anyhow::Result where H: BlockConnectedEventHandle + Sync + 'static, N: PeerProvider + Clone + 'static, @@ -190,13 +192,15 @@ where let event_handle = Arc::new(TaskEventCounterHandle::new()); let ext_error_handle = Arc::new(ExtSyncTaskErrorHandle::new(fetcher.clone())); - let start_block_id = get_start_block_id(&accumulator, start_index, local_store.clone()).map_err(|err| TaskError::BreakError(anyhow!(err))); + let start_block_id = get_start_block_id(&accumulator, start_index, local_store.clone()) + .map_err(|err| TaskError::BreakError(anyhow!(err))); let chain = BlockChain::new( time_service.clone(), start_block_id?, local_store.clone(), vm_metrics, - ).map_err(|err| TaskError::BreakError(anyhow!(err))); + ) + .map_err(|err| TaskError::BreakError(anyhow!(err))); let leaf = accumulator .get_leaf(start_index) @@ -224,47 +228,46 @@ where let current_block_info = local_store .get_block_info(last_chain_block)? - .ok_or_else(|| format_err!("Can not find block info by id: {}", last_chain_block)).map_err(|err| TaskError::BreakError(anyhow!(err))); + .ok_or_else(|| format_err!("Can not find block info by id: {}", last_chain_block)) + .map_err(|err| TaskError::BreakError(anyhow!(err))); - - let accumulator_info = accumulator.get_info(); - let accumulator_root = accumulator.root_hash(); - let sync_task = TaskGenerator::new( - SyncDagBlockTask::new( - accumulator, - start_index.saturating_add(1), - accumulator_info, - fetcher.clone(), - accumulator_snapshot.clone(), - local_store.clone(), - ), - 2, - max_retry_times, - delay_milliseconds_on_error, - BlockCollector::new_with_handle( - current_block_info?.clone(), - None, - chain?, - block_event_handle.clone(), - network.clone(), - skip_pow_verify_when_sync, - accumulator_root, - Some(dag.clone()), - block_chain_service.clone(), - ), - event_handle.clone(), - ext_error_handle, - ) - .generate(); - let (fut, handle) = sync_task.with_handle(); - match fut.await { - anyhow::Result::Ok(block_chain) => { - return anyhow::Result::Ok(block_chain); - } - Err(error) => { - return Err(anyhow!(error)); - } - }; + let accumulator_info = accumulator.get_info(); + let accumulator_root = accumulator.root_hash(); + let sync_task = TaskGenerator::new( + SyncDagBlockTask::new( + accumulator, + start_index.saturating_add(1), + accumulator_info, + fetcher.clone(), + accumulator_snapshot.clone(), + local_store.clone(), + ), + 2, + max_retry_times, + delay_milliseconds_on_error, + BlockCollector::new_with_handle( + current_block_info?.clone(), + None, + chain?, + block_event_handle.clone(), + network.clone(), + skip_pow_verify_when_sync, + accumulator_root, + Some(dag.clone()), + ), + event_handle.clone(), + ext_error_handle, + ) + .generate(); + let (fut, handle) = sync_task.with_handle(); + match fut.await { + anyhow::Result::Ok(block_chain) => { + return anyhow::Result::Ok(block_chain); + } + Err(error) => { + return Err(anyhow!(error)); + } + }; } pub fn sync_dag_full_task( @@ -285,8 +288,7 @@ pub fn sync_dag_full_task( BoxFuture<'static, anyhow::Result>, TaskHandle, Arc, -)> - { +)> { let event_handle = Arc::new(TaskEventCounterHandle::new()); let task_event_handle = event_handle.clone(); let all_fut = async move { @@ -297,7 +299,9 @@ pub fn sync_dag_full_task( accumulator_store.clone(), accumulator_snapshot.clone(), task_event_handle.clone(), - ).await.map_err(|err| TaskError::BreakError(anyhow!(err)))?; + ) + .await + .map_err(|err| TaskError::BreakError(anyhow!(err)))?; let (start_index, accumulator) = sync_accumulator( ancestor, @@ -305,22 +309,26 @@ pub fn sync_dag_full_task( fetcher.clone(), accumulator_store.clone(), accumulator_snapshot.clone(), - ).await.map_err(|err| TaskError::BreakError(anyhow!(err)))?; - + ) + .await + .map_err(|err| TaskError::BreakError(anyhow!(err)))?; + let block_chain = sync_dag_block( - start_index, - accumulator, - fetcher.clone(), - accumulator_snapshot.clone(), - local_store.clone(), - time_service.clone(), - connector_service.clone(), - network, - skip_pow_verify_when_sync, - dag.clone(), - block_chain_service.clone(), - vm_metrics, - ).await.map_err(|err| TaskError::BreakError(anyhow!(err)))?; + start_index, + accumulator, + fetcher.clone(), + accumulator_snapshot.clone(), + local_store.clone(), + time_service.clone(), + connector_service.clone(), + network, + skip_pow_verify_when_sync, + dag.clone(), + block_chain_service.clone(), + vm_metrics, + ) + .await + .map_err(|err| TaskError::BreakError(anyhow!(err)))?; return anyhow::Result::Ok(block_chain); };