Skip to content

Commit

Permalink
feat(consensus): confirm votes on vote bit set receipt
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Apr 23, 2024
1 parent e9e3aca commit 976f2ff
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 36 deletions.
8 changes: 1 addition & 7 deletions internal/consensus/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (g *msgGossiper) GossipVote(ctx context.Context, rs cstypes.RoundState, prs
"proto_vote_size", protoVote.Size(),
})
logger.Trace("syncing vote message")
err := g.sync(ctx, protoVote, updatePeerVote(g.ps, vote))
err := g.sync(ctx, protoVote, nil)
if err != nil {
logger.Error("failed to sync vote message to the peer", "error", err)
}
Expand Down Expand Up @@ -403,12 +403,6 @@ func updatePeerCommit(ps *PeerState, commit *types.Commit) func() error {
}
}

func updatePeerVote(ps *PeerState, vote *types.Vote) func() error {
return func() error {
return ps.SetHasVote(vote)
}
}

func updatePeerProposalBlockPart(ps *PeerState, height int64, round int32, index int) func() error {
return func() error {
ps.SetHasProposalBlockPart(height, round, index)
Expand Down
83 changes: 54 additions & 29 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,38 +521,55 @@ func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope

// Respond with a VoteSetBitsMessage showing which votes we have and
// consequently shows which we don't have.
var ourVotes *bits.BitArray
switch vsmMsg.Type {
case tmproto.PrevoteType:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(vsmMsg.BlockID)
if err := r.sendVoteSetBits(ctx, envelope.From, vsmMsg.Height, vsmMsg.Round, vsmMsg.BlockID, vsmMsg.Type, voteSetCh); err != nil {
return fmt.Errorf("send vote set bits to %s failed: %w", envelope.From, err)
}
default:
return fmt.Errorf("received unknown message on StateChannel: %T", msg)
}

case tmproto.PrecommitType:
ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(vsmMsg.BlockID)
return nil
}

default:
panic("bad VoteSetBitsMessage field type; forgot to add a check in ValidateBasic?")
}
// sendVoteSetBits notifies the peer about votes we have for a given height, round, and blockID.
func (r *Reactor) sendVoteSetBits(ctx context.Context, to types.NodeID, height int64, round int32, blockID types.BlockID, voteType tmproto.SignedMsgType, voteSetBitsCh p2p.Channel) error {
stateData := r.state.GetStateData()
ourHeight, votes := stateData.HeightVoteSet()

eMsg := &tmcons.VoteSetBits{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
}
r.logger.Trace("sending VoteSetBits", "peer", to, "height", height, "round", round, "blockID", blockID, "voteType", voteType)

if votesProto := ourVotes.ToProto(); votesProto != nil {
eMsg.Votes = *votesProto
}
if height != ourHeight {
return fmt.Errorf("invalid height; we are at %d, but requestd vote set bits at %d", ourHeight, height)
}

if err := voteSetCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: eMsg,
}); err != nil {
return err
}
var ourVotes *bits.BitArray
switch voteType {
case tmproto.PrevoteType:
ourVotes = votes.Prevotes(round).BitArrayByBlockID(blockID)

case tmproto.PrecommitType:
ourVotes = votes.Precommits(round).BitArrayByBlockID(blockID)

default:
return fmt.Errorf("received unknown message on StateChannel: %T", msg)
panic("bad VoteSetBitsMessage field type; forgot to add a check in ValidateBasic?")
}

voteBitsMsg := &tmcons.VoteSetBits{
Height: height,
Round: round,
Type: voteType,
BlockID: blockID.ToProto(),
}

if votesProto := ourVotes.ToProto(); votesProto != nil {
voteBitsMsg.Votes = *votesProto
}

if err := voteSetBitsCh.Send(ctx, p2p.Envelope{
To: to,
Message: voteBitsMsg,
}); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -603,7 +620,7 @@ func (r *Reactor) handleDataMessage(ctx context.Context, envelope *p2p.Envelope,
// fail to find the peer state for the envelope sender, we perform a no-op and
// return. This can happen when we process the envelope after the peer is
// removed.
func (r *Reactor) handleVoteMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message) error {
func (r *Reactor) handleVoteMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh p2p.Channel) error {
logger := r.logger.With("peer", envelope.From, "ch_id", "VoteChannel")

ps, ok := r.GetPeerState(envelope.From)
Expand Down Expand Up @@ -640,16 +657,23 @@ func (r *Reactor) handleVoteMessage(ctx context.Context, envelope *p2p.Envelope,

if isValidator { // ignore votes on non-validator nodes; TODO don't even send it
vMsg := msgI.(*VoteMessage)
vote := vMsg.Vote

if err := vMsg.Vote.ValidateBasic(); err != nil {
if err := vote.ValidateBasic(); err != nil {
return fmt.Errorf("invalid vote received from %s: %w", envelope.From, err)
}

ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastValSize)
if err := ps.SetHasVote(vMsg.Vote); err != nil {
if err := ps.SetHasVote(vote); err != nil {
return err
}

// let's respond with our current votes
if err := r.sendVoteSetBits(ctx, envelope.From, vote.Height, vote.Round, vote.BlockID, vote.Type, voteSetCh); err != nil {
return fmt.Errorf("send vote set bits to %s failed: %w", envelope.From, err)
}

return r.state.sendMessage(ctx, vMsg, envelope.From)
}
default:
Expand All @@ -667,6 +691,7 @@ func (r *Reactor) handleVoteSetBitsMessage(_ context.Context, envelope *p2p.Enve
logger := r.logger.With("peer", envelope.From, "ch_id", "VoteSetBitsChannel")

ps, ok := r.GetPeerState(envelope.From)
logger.Trace("received vote set bits message", "peer", envelope.From, "msg", envelope.Message)
if !ok || ps == nil {
r.logger.Debug("failed to find peer state")
return nil
Expand Down Expand Up @@ -743,7 +768,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha
case p2p.ConsensusDataChannel:
err = r.handleDataMessage(ctx, envelope, msg)
case p2p.ConsensusVoteChannel:
err = r.handleVoteMessage(ctx, envelope, msg)
err = r.handleVoteMessage(ctx, envelope, msg, chans.voteSet)
case p2p.VoteSetBitsChannel:
err = r.handleVoteSetBitsMessage(ctx, envelope, msg)
default:
Expand Down

0 comments on commit 976f2ff

Please sign in to comment.