Skip to content

Commit

Permalink
Merge branch 'EN-5624-Improve-BLS-consensus-process-commit-block' int…
Browse files Browse the repository at this point in the history
…o test-week4-deploy
  • Loading branch information
iulianpascalau committed Jan 20, 2020
2 parents b34dde3 + 53fb827 commit a45f73f
Show file tree
Hide file tree
Showing 19 changed files with 410 additions and 116 deletions.
21 changes: 21 additions & 0 deletions consensus/mock/sposWorkerMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type SposWorkerMock struct {
messageType consensus.MessageType,
receivedMessageCall func(cnsDta *consensus.Message) bool,
)
AddReceivedHeaderHandlerCalled func(handler func(data.HeaderHandler))
RemoveAllReceivedMessagesCallsCalled func()
ProcessReceivedMessageCalled func(message p2p.MessageP2P) error
SendConsensusMessageCalled func(cnsDta *consensus.Message) bool
Expand All @@ -19,13 +20,21 @@ type SposWorkerMock struct {
GetBroadcastBlockCalled func(data.BodyHandler, data.HeaderHandler) error
GetBroadcastHeaderCalled func(data.HeaderHandler) error
ExecuteStoredMessagesCalled func()
DisplayStatisticsCalled func()
ReceivedHeaderCalled func(headerHandler data.HeaderHandler, headerHash []byte)
}

func (sposWorkerMock *SposWorkerMock) AddReceivedMessageCall(messageType consensus.MessageType,
receivedMessageCall func(cnsDta *consensus.Message) bool) {
sposWorkerMock.AddReceivedMessageCallCalled(messageType, receivedMessageCall)
}

func (sposWorkerMock *SposWorkerMock) AddReceivedHeaderHandler(handler func(data.HeaderHandler)) {
if sposWorkerMock.AddReceivedHeaderHandlerCalled != nil {
sposWorkerMock.AddReceivedHeaderHandlerCalled(handler)
}
}

func (sposWorkerMock *SposWorkerMock) RemoveAllReceivedMessagesCalls() {
sposWorkerMock.RemoveAllReceivedMessagesCallsCalled()
}
Expand Down Expand Up @@ -54,6 +63,18 @@ func (sposWorkerMock *SposWorkerMock) ExecuteStoredMessages() {
sposWorkerMock.ExecuteStoredMessagesCalled()
}

func (sposWorkerMock *SposWorkerMock) DisplayStatistics() {
if sposWorkerMock.DisplayStatisticsCalled != nil {
sposWorkerMock.DisplayStatisticsCalled()
}
}

func (sposWorkerMock *SposWorkerMock) ReceivedHeader(headerHandler data.HeaderHandler, headerHash []byte) {
if sposWorkerMock.ReceivedHeaderCalled != nil {
sposWorkerMock.ReceivedHeaderCalled(headerHandler, headerHash)
}
}

// IsInterfaceNil returns true if there is no value under the interface
func (sposWorkerMock *SposWorkerMock) IsInterfaceNil() bool {
if sposWorkerMock == nil {
Expand Down
4 changes: 4 additions & 0 deletions consensus/spos/bls/blsSubroundsFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ func (fct *factory) generateEndRoundSubround() error {
subroundEndRound, err := NewSubroundEndRound(
subround,
fct.worker.Extend,
spos.MaxThresholdPercent,
getSubroundName,
fct.worker.DisplayStatistics,
)
if err != nil {
return err
Expand All @@ -267,6 +270,7 @@ func (fct *factory) generateEndRoundSubround() error {
return err
}

fct.worker.AddReceivedHeaderHandler(subroundEndRound.receivedHeader)
fct.consensusCore.Chronology().AddSubround(subroundEndRound)

return nil
Expand Down
8 changes: 4 additions & 4 deletions consensus/spos/bls/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
)

// processingThresholdPercent specifies the max allocated time for processing the block as a percentage of the total time of the round
const processingThresholdPercent = 65
const processingThresholdPercent = 85

// srStartStartTime specifies the start time, from the total time of the round, of Subround Start
const srStartStartTime = 0.0
Expand All @@ -48,13 +48,13 @@ const srBlockEndTime = 0.25
const srSignatureStartTime = 0.25

// srSignatureEndTime specifies the end time, from the total time of the round, of Subround Signature
const srSignatureEndTime = 0.65
const srSignatureEndTime = 0.85

// srEndStartTime specifies the start time, from the total time of the round, of Subround End
const srEndStartTime = 0.65
const srEndStartTime = 0.85

// srEndEndTime specifies the end time, from the total time of the round, of Subround End
const srEndEndTime = 0.75
const srEndEndTime = 0.95

const (
BlockBodyStringValue = "(BLOCK_BODY)"
Expand Down
143 changes: 139 additions & 4 deletions consensus/spos/bls/subroundEndRound.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
package bls

import (
"bytes"
"fmt"
"sync"
"time"

"github.com/ElrondNetwork/elrond-go/consensus/spos"
"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/display"
"github.com/ElrondNetwork/elrond-go/statusHandler"
)

type subroundEndRound struct {
*spos.Subround
processingThresholdPercentage int
getSubroundName func(subroundId int) string
displayStatistics func()

appStatusHandler core.AppStatusHandler

mutProcessingEndRound sync.Mutex
}

// SetAppStatusHandler method set appStatusHandler
Expand All @@ -30,6 +39,9 @@ func (sr *subroundEndRound) SetAppStatusHandler(ash core.AppStatusHandler) error
func NewSubroundEndRound(
baseSubround *spos.Subround,
extend func(subroundId int),
processingThresholdPercentage int,
getSubroundName func(subroundId int) string,
displayStatistics func(),
) (*subroundEndRound, error) {
err := checkNewSubroundEndRoundParams(
baseSubround,
Expand All @@ -40,7 +52,11 @@ func NewSubroundEndRound(

srEndRound := subroundEndRound{
baseSubround,
processingThresholdPercentage,
getSubroundName,
displayStatistics,
statusHandler.NewNilStatusHandler(),
sync.Mutex{},
}
srEndRound.Job = srEndRound.doEndRoundJob
srEndRound.Check = srEndRound.doEndRoundConsensusCheck
Expand All @@ -64,12 +80,24 @@ func checkNewSubroundEndRoundParams(
return err
}

func (sr *subroundEndRound) receivedHeader(headerHandler data.HeaderHandler) {
sr.AddReceivedHeader(headerHandler)

if !sr.IsSelfLeaderInCurrentRound() {
sr.doEndRoundJobByParticipant()
}
}

// doEndRoundJob method does the job of the subround EndRound
func (sr *subroundEndRound) doEndRoundJob() bool {
if !sr.IsSelfLeaderInCurrentRound() { // is NOT self leader in this round?
return false
if !sr.IsSelfLeaderInCurrentRound() {
return sr.doEndRoundJobByParticipant()
}

return sr.doEndRoundJobByLeader()
}

func (sr *subroundEndRound) doEndRoundJobByLeader() bool {
bitmap := sr.GenerateBitmap(SrSignature)
err := sr.checkSignaturesValidity(bitmap)
if err != nil {
Expand Down Expand Up @@ -106,7 +134,7 @@ func (sr *subroundEndRound) doEndRoundJob() bool {
return false
}

sr.SetStatus(SrEndRound, spos.SsFinished)
sr.SetStatus(sr.Current(), spos.SsFinished)

// broadcast section

Expand All @@ -116,6 +144,8 @@ func (sr *subroundEndRound) doEndRoundJob() bool {
debugError("BroadcastBlock", err)
}

sr.displayStatistics()

log.Debug("step 3: BlockBody and Header has been committed and broadcast",
"type", "spos/bls",
"time [s]", sr.SyncTimer().FormattedCurrentTime())
Expand All @@ -133,6 +163,94 @@ func (sr *subroundEndRound) doEndRoundJob() bool {
return true
}

func (sr *subroundEndRound) doEndRoundJobByParticipant() bool {
sr.mutProcessingEndRound.Lock()
defer sr.mutProcessingEndRound.Unlock()

if sr.RoundCanceled {
return false
}
if !sr.IsConsensusDataSet() {
return false
}
if !sr.IsSubroundFinished(sr.Previous()) {
return false
}
if sr.IsSubroundFinished(sr.Current()) {
return false
}
isConsensusHeaderReceived, header := sr.isConsensusHeaderReceived()
if !isConsensusHeaderReceived {
return false
}

defer func() {
sr.SetProcessingBlock(false)
}()

sr.SetProcessingBlock(true)

if sr.isOutOfTime() {
return false
}

startTime := time.Now()
err := sr.BlockProcessor().CommitBlock(sr.Blockchain(), header, sr.BlockBody)
elapsedTime := time.Since(startTime)
log.Debug("elapsed time to commit block",
"time [s]", elapsedTime,
)
if err != nil {
debugError("CommitBlock", err)
return false
}

sr.SetStatus(sr.Current(), spos.SsFinished)

sr.displayStatistics()

log.Debug("step 3: BlockBody and Header has been committed",
"type", "spos/bls",
"time [s]", sr.SyncTimer().FormattedCurrentTime())

msg := fmt.Sprintf("Added received block with nonce %d in blockchain", header.GetNonce())
log.Debug(display.Headline(msg, sr.SyncTimer().FormattedCurrentTime(), "-"))
return true
}

func (sr *subroundEndRound) isConsensusHeaderReceived() (bool, data.HeaderHandler) {
if check.IfNil(sr.Header) {
return false, nil
}

consensusHeaderHash, err := core.CalculateHash(sr.Marshalizer(), sr.Hasher(), sr.Header)
if err != nil {
log.Debug("isConsensusHeaderReceived: calculate consensus header hash", "error", err.Error())
return false, nil
}

receivedHeaders := sr.GetReceivedHeaders()

for index := range receivedHeaders {
receivedHeader := receivedHeaders[index].Clone()
receivedHeader.SetLeaderSignature(nil)
receivedHeader.SetPubKeysBitmap(nil)
receivedHeader.SetSignature(nil)

receivedHeaderHash, err := core.CalculateHash(sr.Marshalizer(), sr.Hasher(), receivedHeader)
if err != nil {
log.Debug("isConsensusHeaderReceived: calculate received header hash", "error", err.Error())
return false, nil
}

if bytes.Equal(receivedHeaderHash, consensusHeaderHash) {
return true, receivedHeaders[index]
}
}

return false, nil
}

func (sr *subroundEndRound) signBlockHeader() ([]byte, error) {
headerClone := sr.Header.Clone()
headerClone.SetLeaderSignature(nil)
Expand Down Expand Up @@ -176,7 +294,7 @@ func (sr *subroundEndRound) doEndRoundConsensusCheck() bool {
return false
}

if sr.Status(SrEndRound) == spos.SsFinished {
if sr.IsSubroundFinished(sr.Current()) {
return true
}

Expand Down Expand Up @@ -222,3 +340,20 @@ func (sr *subroundEndRound) checkSignaturesValidity(bitmap []byte) error {

return nil
}

func (sr *subroundEndRound) isOutOfTime() bool {
startTime := sr.RoundTimeStamp
maxTime := sr.Rounder().TimeDuration() * time.Duration(sr.processingThresholdPercentage) / 100
if sr.Rounder().RemainingTime(startTime, maxTime) < 0 {
log.Debug("canceled round, time is out",
"time [s]", sr.SyncTimer().FormattedCurrentTime(),
"round", sr.SyncTimer().FormattedCurrentTime(), sr.Rounder().Index(),
"subround", sr.getSubroundName(sr.Current()))

sr.RoundCanceled = true

return true
}

return false
}
Loading

0 comments on commit a45f73f

Please sign in to comment.