Skip to content

Commit

Permalink
Merge pull request #6762 from multiversx/cleanup_delayed_broadcast_of…
Browse files Browse the repository at this point in the history
…_proofs

Cleanup delayed broadcast of proofs
  • Loading branch information
sstanculeanu authored Jan 31, 2025
2 parents 3e24cfe + 030809f commit daeb170
Show file tree
Hide file tree
Showing 19 changed files with 24 additions and 489 deletions.
7 changes: 0 additions & 7 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -956,13 +956,6 @@
# the current machine will take over and propose/sign blocks. Used in both single-key and multi-key modes.
MaxRoundsOfInactivityAccepted = 3

# ConsensusGradualBroadcast defines how validators will broadcast the aggregated final info, based on their consensus index
[ConsensusGradualBroadcast]
GradualIndexBroadcastDelay = [
# All validators will broadcast the message right away
{ EndIndex = 0, DelayInMilliseconds = 0 },
]

[InterceptedDataVerifier]
CacheSpanInSec = 30
CacheExpiryInSec = 30
12 changes: 3 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,9 @@ type Config struct {
Requesters RequesterConfig
VMOutputCacher CacheConfig

PeersRatingConfig PeersRatingConfig
PoolsCleanersConfig PoolsCleanersConfig
Redundancy RedundancyConfig
ConsensusGradualBroadcast ConsensusGradualBroadcastConfig
PeersRatingConfig PeersRatingConfig
PoolsCleanersConfig PoolsCleanersConfig
Redundancy RedundancyConfig

InterceptedDataVerifier InterceptedDataVerifierConfig
}
Expand Down Expand Up @@ -683,11 +682,6 @@ type IndexBroadcastDelay struct {
DelayInMilliseconds uint64
}

// ConsensusGradualBroadcastConfig holds the configuration for the consensus final info gradual broadcast
type ConsensusGradualBroadcastConfig struct {
GradualIndexBroadcastDelay []IndexBroadcastDelay
}

// InterceptedDataVerifierConfig holds the configuration for the intercepted data verifier
type InterceptedDataVerifierConfig struct {
CacheSpanInSec uint64
Expand Down
15 changes: 0 additions & 15 deletions config/tomlConfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,6 @@ func TestTomlParser(t *testing.T) {
Redundancy: RedundancyConfig{
MaxRoundsOfInactivityAccepted: 3,
},
ConsensusGradualBroadcast: ConsensusGradualBroadcastConfig{
GradualIndexBroadcastDelay: []IndexBroadcastDelay{
{
EndIndex: 0,
DelayInMilliseconds: 0,
},
},
},
}
testString := `
[GeneralSettings]
Expand Down Expand Up @@ -276,13 +268,6 @@ func TestTomlParser(t *testing.T) {
# MaxRoundsOfInactivityAccepted defines the number of rounds missed by a main or higher level backup machine before
# the current machine will take over and propose/sign blocks. Used in both single-key and multi-key modes.
MaxRoundsOfInactivityAccepted = 3
# ConsensusGradualBroadcast defines how validators will broadcast the aggregated final info, based on their consensus index
[ConsensusGradualBroadcast]
GradualIndexBroadcastDelay = [
# All validators will broadcast the message right away
{ EndIndex = 0, DelayInMilliseconds = 0 },
]
`
cfg := Config{}

Expand Down
12 changes: 0 additions & 12 deletions consensus/broadcast/commonMessenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,18 +187,6 @@ func (cm *commonMessenger) BroadcastBlockData(
}
}

// PrepareBroadcastEquivalentProof sets the proof into the delayed block broadcaster
func (cm *commonMessenger) PrepareBroadcastEquivalentProof(
proof *block.HeaderProof,
consensusIndex int,
pkBytes []byte,
) {
err := cm.delayedBlockBroadcaster.SetFinalProofForValidator(proof, consensusIndex, pkBytes)
if err != nil {
log.Error("commonMessenger.PrepareBroadcastEquivalentProof", "error", err)
}
}

func (cm *commonMessenger) extractMetaMiniBlocksAndTransactions(
miniBlocks map[uint32][]byte,
transactions map[string][][]byte,
Expand Down
127 changes: 0 additions & 127 deletions consensus/broadcast/delayedBroadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/multiversx/mx-chain-core-go/data/block"

"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/config"
"github.com/multiversx/mx-chain-go/consensus"
"github.com/multiversx/mx-chain-go/consensus/broadcast/shared"
"github.com/multiversx/mx-chain-go/consensus/spos"
Expand All @@ -26,7 +25,6 @@ import (

const prefixHeaderAlarm = "header_"
const prefixDelayDataAlarm = "delay_"
const prefixConsensusMessageAlarm = "message_"
const sizeHeadersCache = 1000 // 1000 hashes in cache

// ArgsDelayedBlockBroadcaster holds the arguments to create a delayed block broadcaster
Expand All @@ -37,7 +35,6 @@ type ArgsDelayedBlockBroadcaster struct {
LeaderCacheSize uint32
ValidatorCacheSize uint32
AlarmScheduler timersScheduler
Config config.ConsensusGradualBroadcastConfig
}

// timersScheduler exposes functionality for scheduling multiple timers
Expand All @@ -53,11 +50,6 @@ type headerDataForValidator struct {
prevRandSeed []byte
}

type validatorProof struct {
proof *block.HeaderProof
pkBytes []byte
}

type delayedBlockBroadcaster struct {
alarm timersScheduler
interceptorsContainer process.InterceptorsContainer
Expand All @@ -72,14 +64,9 @@ type delayedBlockBroadcaster struct {
broadcastMiniblocksData func(mbData map[uint32][]byte, pkBytes []byte) error
broadcastTxsData func(txData map[string][][]byte, pkBytes []byte) error
broadcastHeader func(header data.HeaderHandler, pkBytes []byte) error
broadcastEquivalentProof func(proof *block.HeaderProof, pkBytes []byte) error
broadcastConsensusMessage func(message *consensus.Message) error
cacheHeaders storage.Cacher
mutHeadersCache sync.RWMutex
config config.ConsensusGradualBroadcastConfig
mutBroadcastFinalProof sync.RWMutex
valBroadcastFinalProof map[string]*validatorProof
cacheConsensusMessages storage.Cacher
}

// NewDelayedBlockBroadcaster create a new instance of a delayed block data broadcaster
Expand All @@ -102,11 +89,6 @@ func NewDelayedBlockBroadcaster(args *ArgsDelayedBlockBroadcaster) (*delayedBloc
return nil, err
}

cacheConsensusMessages, err := cache.NewLRUCache(sizeHeadersCache)
if err != nil {
return nil, err
}

dbb := &delayedBlockBroadcaster{
alarm: args.AlarmScheduler,
shardCoordinator: args.ShardCoordinator,
Expand All @@ -115,14 +97,11 @@ func NewDelayedBlockBroadcaster(args *ArgsDelayedBlockBroadcaster) (*delayedBloc
valHeaderBroadcastData: make([]*shared.ValidatorHeaderBroadcastData, 0),
valBroadcastData: make([]*shared.DelayedBroadcastData, 0),
delayedBroadcastData: make([]*shared.DelayedBroadcastData, 0),
valBroadcastFinalProof: make(map[string]*validatorProof, 0),
maxDelayCacheSize: args.LeaderCacheSize,
maxValidatorDelayCacheSize: args.ValidatorCacheSize,
mutDataForBroadcast: sync.RWMutex{},
cacheHeaders: cacheHeaders,
mutHeadersCache: sync.RWMutex{},
config: args.Config,
cacheConsensusMessages: cacheConsensusMessages,
}

dbb.headersSubscriber.RegisterHandler(dbb.headerReceived)
Expand Down Expand Up @@ -254,60 +233,11 @@ func (dbb *delayedBlockBroadcaster) SetValidatorData(broadcastData *shared.Delay
return nil
}

// SetFinalProofForValidator sets the header proof to be broadcast by validator when its turn comes
func (dbb *delayedBlockBroadcaster) SetFinalProofForValidator(
proof *block.HeaderProof,
consensusIndex int,
pkBytes []byte,
) error {
if proof == nil {
return spos.ErrNilHeaderProof
}

// set alarm only for validators that are aware that the block was finalized
isProofValid := len(proof.AggregatedSignature) > 0 &&
len(proof.PubKeysBitmap) > 0 &&
len(proof.HeaderHash) > 0
if !isProofValid {
log.Trace("delayedBlockBroadcaster.SetFinalProofForValidator: consensus message alarm has not been set",
"validatorConsensusOrder", consensusIndex,
)

return nil
}

if dbb.cacheConsensusMessages.Has(proof.HeaderHash) {
return nil
}

duration := dbb.getBroadcastDelayForIndex(consensusIndex)
alarmID := prefixConsensusMessageAlarm + hex.EncodeToString(proof.HeaderHash)

vProof := &validatorProof{
proof: proof,
pkBytes: pkBytes,
}
dbb.mutBroadcastFinalProof.Lock()
dbb.valBroadcastFinalProof[alarmID] = vProof
dbb.mutBroadcastFinalProof.Unlock()

dbb.alarm.Add(dbb.finalProofAlarmExpired, duration, alarmID)
log.Trace("delayedBlockBroadcaster.SetFinalProofForValidator: final proof alarm has been set",
"validatorConsensusOrder", consensusIndex,
"headerHash", proof.HeaderHash,
"alarmID", alarmID,
"duration", duration,
)

return nil
}

// SetBroadcastHandlers sets the broadcast handlers for miniBlocks and transactions
func (dbb *delayedBlockBroadcaster) SetBroadcastHandlers(
mbBroadcast func(mbData map[uint32][]byte, pkBytes []byte) error,
txBroadcast func(txData map[string][][]byte, pkBytes []byte) error,
headerBroadcast func(header data.HeaderHandler, pkBytes []byte) error,
equivalentProofBroadcast func(proof *block.HeaderProof, pkBytes []byte) error,
consensusMessageBroadcast func(message *consensus.Message) error,
) error {
if mbBroadcast == nil || txBroadcast == nil || headerBroadcast == nil || consensusMessageBroadcast == nil {
Expand All @@ -320,7 +250,6 @@ func (dbb *delayedBlockBroadcaster) SetBroadcastHandlers(
dbb.broadcastMiniblocksData = mbBroadcast
dbb.broadcastTxsData = txBroadcast
dbb.broadcastHeader = headerBroadcast
dbb.broadcastEquivalentProof = equivalentProofBroadcast
dbb.broadcastConsensusMessage = consensusMessageBroadcast

return nil
Expand Down Expand Up @@ -697,19 +626,6 @@ func (dbb *delayedBlockBroadcaster) interceptedHeader(_ string, headerHash []byt
dbb.cacheHeaders.Put(headerHash, struct{}{}, 0)
dbb.mutHeadersCache.Unlock()

// TODO: should be handled from interceptor
proof := headerHandler.GetPreviousProof()
var aggSig, bitmap []byte
if !check.IfNilReflect(proof) {
aggSig, bitmap = proof.GetAggregatedSignature(), proof.GetPubKeysBitmap()
}

// TODO: add common check for verifying proof validity
isFinalInfo := len(aggSig) > 0 && len(bitmap) > 0
if isFinalInfo {
dbb.cacheConsensusMessages.Put(headerHash, struct{}{}, 0)
}

log.Trace("delayedBlockBroadcaster.interceptedHeader",
"headerHash", headerHash,
"round", headerHandler.GetRound(),
Expand Down Expand Up @@ -818,49 +734,6 @@ func (dbb *delayedBlockBroadcaster) extractMbsFromMeTo(header data.HeaderHandler
return mbHashesForShard
}

func (dbb *delayedBlockBroadcaster) getBroadcastDelayForIndex(index int) time.Duration {
for i := 0; i < len(dbb.config.GradualIndexBroadcastDelay); i++ {
entry := dbb.config.GradualIndexBroadcastDelay[i]
if index > entry.EndIndex {
continue
}

return time.Duration(entry.DelayInMilliseconds) * time.Millisecond
}

return 0
}

func (dbb *delayedBlockBroadcaster) finalProofAlarmExpired(alarmID string) {
headerHash, err := hex.DecodeString(strings.TrimPrefix(alarmID, prefixConsensusMessageAlarm))
if err != nil {
log.Error("delayedBlockBroadcaster.finalProofAlarmExpired", "error", err.Error(),
"headerHash", headerHash,
"alarmID", alarmID,
)
return
}

dbb.mutBroadcastFinalProof.Lock()
defer dbb.mutBroadcastFinalProof.Unlock()
if dbb.cacheConsensusMessages.Has(headerHash) {
delete(dbb.valBroadcastFinalProof, alarmID)
return
}

vProof, ok := dbb.valBroadcastFinalProof[alarmID]
if !ok {
return
}

err = dbb.broadcastEquivalentProof(vProof.proof, vProof.pkBytes)
if err != nil {
log.Error("finalProofAlarmExpired.broadcastEquivalentProof", "error", err)
}

delete(dbb.valBroadcastFinalProof, alarmID)
}

// IsInterfaceNil returns true if there is no value under the interface
func (dbb *delayedBlockBroadcaster) IsInterfaceNil() bool {
return dbb == nil
Expand Down
Loading

0 comments on commit daeb170

Please sign in to comment.