Skip to content

Commit

Permalink
Merge pull request #2481 from dedis/subleader_selection
Browse files Browse the repository at this point in the history
Fix subleader handling
  • Loading branch information
ineiti authored Dec 16, 2021
2 parents ac9c4ee + af3d7bf commit 0d1b33d
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 51 deletions.
11 changes: 5 additions & 6 deletions blscosi/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,12 @@ func DefaultThreshold(n int) int {
}

// DefaultSubLeaders returns the number of sub-leaders, which is the
// cube-root of the number of nodes.
// square root of the number of nodes - 1.
func DefaultSubLeaders(nodes int) int {
// As `math.Pow` calculates `8 ** (1/3) < 2`,
// we add 0.0001 for the rounding error.
// This works for up to 57 ** 3 = 185'193, which should be enough
// nodes.
return int(math.Pow(float64(nodes), 1.0/3.0) + 0.0001)
if nodes == 1 {
return 1
}
return int(math.Pow(float64(nodes-1), 1.0/2.0))
}

// NewBlsCosi method is used to define the blscosi protocol.
Expand Down
2 changes: 1 addition & 1 deletion blscosi/protocol/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestProtocol_FailingLeaves_25_9(t *testing.T) {
func TestDefaultSubLeaders(t *testing.T) {
require.Equal(t, DefaultSubLeaders(1), 1)
for subleaders := 2; subleaders < 58; subleaders++ {
nodes := int(math.Pow(float64(subleaders), 3.0))
nodes := int(math.Pow(float64(subleaders), 2.0)) + 1
require.Equal(t, DefaultSubLeaders(nodes-1), subleaders-1)
require.Equal(t, DefaultSubLeaders(nodes), subleaders)
}
Expand Down
78 changes: 45 additions & 33 deletions blscosi/protocol/sub_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/rand"
"errors"
"fmt"
"golang.org/x/xerrors"
"sync"
"time"

Expand Down Expand Up @@ -190,36 +191,42 @@ func (p *SubBlsCosi) dispatchRoot() error {
return nil
}

// Only one child anyway
err := p.SendToChildren(&Announcement{
Msg: p.Msg,
Data: p.Data,
Timeout: p.Timeout,
Threshold: p.Threshold,
})
if err != nil {
// Only log what happened so we can try to finish the protocol
// (e.g. one child is offline)
log.Warnf("Error when broadcasting to children: %s", err.Error())
}
subLeaderActive := make(chan error, 1)
// Because SendToChildren blocks on some firewalls instead of returning
// an error, this call is put in a go-routine.
go func() {
subLeaderActive <- p.SendToChildren(&Announcement{
Msg: p.Msg,
Data: p.Data,
Timeout: p.Timeout,
Threshold: p.Threshold,
})
}()

select {
case <-p.closeChan:
return nil
case reply := <-p.ChannelResponse:
if reply.Equal(p.Root().Children[0]) {
// Transfer the response to the parent protocol
p.subResponse <- reply
for {
select {
case err := <-subLeaderActive:
if err != nil {
p.subleaderNotResponding <- true
return xerrors.Errorf("Couldn't contact subleader: %v", err)
}
case <-p.closeChan:
return nil
case reply := <-p.ChannelResponse:
if reply.Equal(p.Root().Children[0]) {
// Transfer the response to the parent protocol
p.subResponse <- reply
}
return nil
case <-time.After(p.Timeout):
// It might be only the subleader then we send a notification
// to let the parent protocol take actions
log.Warnf("%s: timed out while waiting for subleader response while %s",
p.ServerIdentity(), p.Tree().Dump())
p.subleaderNotResponding <- true
return nil
}
case <-time.After(p.Timeout):
// It might be only the subleader then we send a notification
// to let the parent protocol take actions
log.Warnf("%s: timed out while waiting for subleader response while %s",
p.ServerIdentity(), p.Tree().Dump())
p.subleaderNotResponding <- true
}

return nil
}

// dispatchSubLeader takes care of synchronizing the children
Expand All @@ -238,9 +245,16 @@ func (p *SubBlsCosi) dispatchSubLeader() error {
return err
}

errs := p.SendToChildrenInParallel(a)
if len(errs) > 0 {
log.Error(errs)
if len(p.Children()) > 0 {
for _, node := range p.Children() {
go func(node *onet.TreeNode) {
err := p.SendTo(node, a)
if err != nil {
log.Warnf("Error while sending to leaf %s: %v",
node.Name(), err)
}
}(node)
}
}

responses := make(ResponseMap)
Expand All @@ -264,9 +278,7 @@ func (p *SubBlsCosi) dispatchSubLeader() error {
// we need to timeout the children faster than the root timeout to let it
// know the subleader is alive, but some children are failing
timeout := time.After(p.Timeout / 2)
// If an error happens when sending the announcement, we can assume there
// will be a timeout from this node
done := len(errs)
done := 0
for done < len(p.Children()) {
select {
case <-p.closeChan:
Expand Down
2 changes: 1 addition & 1 deletion byzcoinx/byzcoinx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func runProtocol(t *testing.T, nbrHosts int, nbrFault int, refuseIndex int, prot
log.Lvl3("Added counter", counters.size()-1, refuseIndex)

require.True(t, int(math.Pow(float64(bftCosiProto.nSubtrees),
3.0)) <= nbrHosts)
2.0)) <= nbrHosts)

// kill the leafs first
nbrFault = min(nbrFault, len(servers))
Expand Down
28 changes: 18 additions & 10 deletions messaging/propagate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"go.dedis.ch/cothority/v3/blscosi/protocol"
"reflect"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -173,6 +172,7 @@ func (p *Propagate) Dispatch() error {
var gotSendData bool
var errs []error
subtreeCount := p.TreeNode().SubtreeCount()
errsChan := make(chan error, subtreeCount)

for process {
p.Lock()
Expand Down Expand Up @@ -208,15 +208,18 @@ func (p *Propagate) Dispatch() error {
process = false
}
log.Lvl3(p.ServerIdentity(), "Sending to children", p.Children())
if errs = p.SendToChildrenInParallel(&msg.PropagateSendData); len(errs) != 0 {
errsStr := make([]string, len(errs))
for i, e := range errs {
errsStr[i] = e.Error()
}
log.Lvl2("Error while sending to children:", errsStr)
if len(errs) > p.allowedFailures {
return errors.New(strings.Join(errsStr, "\n"))
}

// Just blindly send to the children - we don't care if they receive it or
// not. If they don't receive it, they will complain later.
for _, c := range p.Children() {
go func(tn *onet.TreeNode) {
err := p.SendTo(tn, &msg.PropagateSendData)
if err != nil {
log.Warnf("Error while sending to child %s: %v",
tn.Name(), err)
errsChan <- err
}
}(c)
}
case <-p.ChannelReply:
if !gotSendData {
Expand All @@ -230,6 +233,11 @@ func (p *Propagate) Dispatch() error {
return err
}
}
errsChan <- nil
case err := <-errsChan:
if err != nil {
errs = append(errs, err)
}
// Only wait for the number of children that successfully received our message.
if received == subtreeCount-len(errs) && received >= subtreeCount-p.allowedFailures {
process = false
Expand Down

0 comments on commit 0d1b33d

Please sign in to comment.