Skip to content

Commit

Permalink
Fix/sync status (#1615)
Browse files Browse the repository at this point in the history
* [miner] Merge headblock pacemaker with ondemand pacemaker.
* [sync] Refactor SyncStatus, remove judgement about is_nearly_synced.
* [sync] Add a way to reuse blocks that have already been fetched in a previous sync task.
  • Loading branch information
jolestar authored Nov 10, 2020
1 parent 7e2a106 commit 01c7fcc
Show file tree
Hide file tree
Showing 16 changed files with 620 additions and 262 deletions.
27 changes: 17 additions & 10 deletions chain/chain-notify/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ impl ChainNotifyHandlerService {
sync_status: None,
}
}

pub fn is_synced(&self) -> bool {
match self.sync_status.as_ref() {
Some(sync_status) => sync_status.is_synced(),
None => false,
}
}
}

impl ServiceFactory<Self> for ChainNotifyHandlerService {
Expand Down Expand Up @@ -66,18 +73,18 @@ impl EventHandler<Self, NewHeadBlock> for ChainNotifyHandlerService {
item: NewHeadBlock,
ctx: &mut ServiceContext<ChainNotifyHandlerService>,
) {
if let Some(sync_status) = self.sync_status.as_ref() {
if sync_status.is_nearly_synced() {
let NewHeadBlock(block_detail) = item;
let block = block_detail.get_block();
// notify header.
self.notify_new_block(block, ctx);
if self.is_synced() {
let NewHeadBlock(block_detail) = item;
let block = block_detail.get_block();
// notify header.
self.notify_new_block(block, ctx);

// notify events
if let Err(e) = self.notify_events(block, self.store.clone(), ctx) {
error!(target: "pubsub", "fail to notify events to client, err: {}", &e);
}
// notify events
if let Err(e) = self.notify_events(block, self.store.clone(), ctx) {
error!(target: "pubsub", "fail to notify events to client, err: {}", &e);
}
} else {
debug!("[chain-notify] Ignore NewHeadBlock event because the node has not been synchronized yet.")
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,15 @@ impl BlockChain {
.storage
.get_block_info(block.id())?
.ok_or_else(|| format_err!("Can not find block info by hash {:?}", block.id()))?;
self.update_chain_head_with_info(block, block_info)
}

//TODO refactor update_chain_head and update_chain_head_with_info
pub fn update_chain_head_with_info(
&mut self,
block: Block,
block_info: BlockInfo,
) -> Result<()> {
let txn_accumulator_info = block_info.get_txn_accumulator_info();
let block_accumulator_info = block_info.get_block_accumulator_info();
let state_root = block.header().state_root();
Expand Down
4 changes: 2 additions & 2 deletions kube/manifest/starcoin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ spec:
spec:
containers:
- name: starcoin
image: starcoin/starcoin:v0.7.1
image: starcoin/starcoin:v0.7.1.7
imagePullPolicy: Always
command:
- bash
Expand Down Expand Up @@ -82,7 +82,7 @@ spec:
name: starcoin-config
key: start_disable_seed_node
- name: txfactory
image: starcoin/starcoin:v0.7.1
image: starcoin/starcoin:v0.7.1.7
imagePullPolicy: IfNotPresent
command:
- bash
Expand Down
80 changes: 80 additions & 0 deletions miner/src/generate_block_event_pacemaker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) The Starcoin Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::GenerateBlockEvent;
use anyhow::Result;
use logger::prelude::*;
use starcoin_service_registry::{ActorService, EventHandler, ServiceContext};
use tx_relay::PropagateNewTransactions;
use types::{
sync_status::SyncStatus,
system_events::{NewHeadBlock, SyncStatusChangeEvent},
};

#[derive(Default)]
pub struct GenerateBlockEventPacemaker {
sync_status: Option<SyncStatus>,
}

impl GenerateBlockEventPacemaker {
pub fn send_event(&mut self, force: bool, ctx: &mut ServiceContext<Self>) {
ctx.broadcast(GenerateBlockEvent::new(force));
}

pub fn is_synced(&self) -> bool {
match self.sync_status.as_ref() {
Some(sync_status) => sync_status.is_synced(),
None => false,
}
}
}

impl ActorService for GenerateBlockEventPacemaker {
fn started(&mut self, ctx: &mut ServiceContext<Self>) -> Result<()> {
ctx.subscribe::<SyncStatusChangeEvent>();
ctx.subscribe::<NewHeadBlock>();
ctx.subscribe::<PropagateNewTransactions>();
Ok(())
}

fn stopped(&mut self, ctx: &mut ServiceContext<Self>) -> Result<()> {
ctx.unsubscribe::<SyncStatusChangeEvent>();
ctx.unsubscribe::<NewHeadBlock>();
ctx.unsubscribe::<PropagateNewTransactions>();
Ok(())
}
}

impl EventHandler<Self, NewHeadBlock> for GenerateBlockEventPacemaker {
fn handle_event(
&mut self,
_msg: NewHeadBlock,
ctx: &mut ServiceContext<GenerateBlockEventPacemaker>,
) {
if self.is_synced() {
self.send_event(true, ctx)
} else {
debug!("[pacemaker] Ignore NewHeadBlock event because the node has not been synchronized yet.")
}
}
}

impl EventHandler<Self, PropagateNewTransactions> for GenerateBlockEventPacemaker {
fn handle_event(&mut self, _msg: PropagateNewTransactions, ctx: &mut ServiceContext<Self>) {
if self.is_synced() {
self.send_event(false, ctx)
} else {
debug!("[pacemaker] Ignore PropagateNewTransactions event because the node has not been synchronized yet.")
}
}
}

impl EventHandler<Self, SyncStatusChangeEvent> for GenerateBlockEventPacemaker {
fn handle_event(&mut self, msg: SyncStatusChangeEvent, ctx: &mut ServiceContext<Self>) {
let is_synced = msg.0.is_synced();
self.sync_status = Some(msg.0);
if is_synced {
self.send_event(false, ctx);
}
}
}
55 changes: 0 additions & 55 deletions miner/src/headblock_pacemaker.rs

This file was deleted.

3 changes: 1 addition & 2 deletions miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ use std::time::Duration;
use traits::ChainReader;

mod create_block_template;
pub mod headblock_pacemaker;
pub mod generate_block_event_pacemaker;
pub mod job_bus_client;
mod metrics;
pub mod ondemand_pacemaker;
pub mod task;

pub use create_block_template::{CreateBlockTemplateRequest, CreateBlockTemplateService};
Expand Down
57 changes: 0 additions & 57 deletions miner/src/ondemand_pacemaker.rs

This file was deleted.

18 changes: 4 additions & 14 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use starcoin_config::NodeConfig;
use starcoin_genesis::{Genesis, GenesisError};
use starcoin_logger::prelude::*;
use starcoin_logger::LoggerHandle;
use starcoin_miner::headblock_pacemaker::HeadBlockPacemaker;
use starcoin_miner::generate_block_event_pacemaker::GenerateBlockEventPacemaker;
use starcoin_miner::job_bus_client::JobBusClient;
use starcoin_miner::ondemand_pacemaker::OndemandPacemaker;
use starcoin_miner::{CreateBlockTemplateService, MinerClientService, MinerService};
use starcoin_network::{NetworkAsyncService, PeerMsgBroadcasterService};
use starcoin_network_rpc::NetworkRpcService;
Expand Down Expand Up @@ -90,19 +89,11 @@ impl ServiceHandler<Self, NodeRequest> for NodeService {
}
NodeRequest::StopPacemaker => NodeResponse::Result(
self.registry
.stop_service_sync(HeadBlockPacemaker::service_name())
.and_then(|_| {
self.registry
.stop_service_sync(OndemandPacemaker::service_name())
}),
.stop_service_sync(GenerateBlockEventPacemaker::service_name()),
),
NodeRequest::StartPacemaker => NodeResponse::Result(
self.registry
.start_service_sync(HeadBlockPacemaker::service_name())
.and_then(|_| {
self.registry
.start_service_sync(OndemandPacemaker::service_name())
}),
.start_service_sync(GenerateBlockEventPacemaker::service_name()),
),
}
}
Expand Down Expand Up @@ -262,8 +253,7 @@ impl NodeService {
info!("Config.miner.enable_miner_client is false, No in process MinerClient.");
}

registry.register::<OndemandPacemaker>().await?;
registry.register::<HeadBlockPacemaker>().await?;
registry.register::<GenerateBlockEventPacemaker>().await?;

// wait for service init.
Delay::new(Duration::from_millis(1000)).await;
Expand Down
2 changes: 1 addition & 1 deletion sync/src/block_connector/block_connector_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl EventHandler<Self, SyncStatusChangeEvent> for BlockConnectorService {
impl EventHandler<Self, PeerNewBlock> for BlockConnectorService {
fn handle_event(&mut self, msg: PeerNewBlock, ctx: &mut ServiceContext<Self>) {
if !self.is_synced() {
debug!("Ignore PeerNewBlock event, because node is not synced.");
debug!("[connector] Ignore PeerNewBlock event because the node has not been synchronized yet.");
return;
}
let peer_id = msg.get_peer_id();
Expand Down
2 changes: 1 addition & 1 deletion sync/src/sync2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl EventHandler<Self, PeerEvent> for SyncService2 {
.any(|peer| peer.peer_id == close_peer_id)
{
warn!(
"[sync] Current task handle may be failed because peer {} closed",
"[sync] Current sync task may be failed because peer {} closed",
close_peer_id
);
}
Expand Down
Loading

0 comments on commit 01c7fcc

Please sign in to comment.