Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize chain #283

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/go_lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Go lint

on:
push:
branches: [ master ]
branches: [ main ]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pierluca @jbsv - not sure if you want to switch to main branch instead of master. We switched all our repos to main: https://www.theserverside.com/feature/Why-GitHub-renamed-its-master-branch-to-main

pull_request:
types:
- opened
Expand All @@ -14,10 +14,10 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Set up Go ^1.19
- name: Set up Go 1.20
uses: actions/setup-go@v3
with:
go-version: ^1.19
go-version: '1.20'

- name: Check out code into the Go module directory
uses: actions/checkout@v3
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/go_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Go test

on:
push:
branches: [ master ]
branches: [ main ]
pull_request:
types:
- opened
Expand All @@ -19,10 +19,10 @@ jobs:
env:
LLVL: trace
steps:
- name: Set up Go ^1.19
- name: Set up Go '1.20'
uses: actions/setup-go@v3
with:
go-version: ^1.19
go-version: '1.20'

- name: Check out code into the Go module directory
uses: actions/checkout@v3
Expand Down
9 changes: 9 additions & 0 deletions cli/node/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"time"
Expand Down Expand Up @@ -98,6 +99,14 @@ type socketDaemon struct {
// Listen implements node.Daemon. It starts the daemon by creating the unix
// socket file to the path.
func (d *socketDaemon) Listen() error {
_, err := os.Stat(d.socketpath)
if err == nil {
d.logger.Warn().Msg("Cleaning existing socket file")
err := os.Remove(d.socketpath)
if err != nil {
return xerrors.Errorf("couldn't clear tangling socketpath: %v", err)
}
}
socket, err := d.listenFn("unix", d.socketpath)
if err != nil {
return xerrors.Errorf("couldn't bind socket: %v", err)
Expand Down
48 changes: 39 additions & 9 deletions cli/node/memcoin/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"encoding/base64"
"fmt"
"io"
"net"
Expand All @@ -14,6 +15,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.dedis.ch/kyber/v3/pairing/bn256"
)

// This test creates a chain with initially 3 nodes. It then adds node 4 and 5
Expand Down Expand Up @@ -74,28 +76,49 @@ func TestMemcoin_Scenario_SetupAndTransactions(t *testing.T) {
args = append([]string{
os.Args[0],
"--config", node1, "ordering", "roster", "add",
"--wait", "60s"},
"--wait", "60s",
},
getExport(t, node4)...,
)

err = run(args)
require.NoError(t, err)

// Add the certificate and push two new blocks to make sure node4 is
// fully participating
shareCert(t, node4, node1, "//127.0.0.1:2111")
publicKey, err := bn256.NewSuiteG2().Point().MarshalBinary()
require.NoError(t, err)
publicKeyHex := base64.StdEncoding.EncodeToString(publicKey)
argsAccess := []string{
os.Args[0],
"--config", node1, "access", "add",
"--identity", publicKeyHex,
}
for i := 0; i < 2; i++ {
err = runWithCfg(argsAccess, config{})
require.NoError(t, err)
}

// Add node 5 which should be participating.
// This makes sure that node 4 is actually participating and caught up.
// If node 4 is not participating, there would be too many faulty nodes
// after adding node 5.
args = append([]string{
os.Args[0],
"--config", node1, "ordering", "roster", "add",
"--wait", "60s"},
"--wait", "60s",
},
getExport(t, node5)...,
)

err = run(args)
require.NoError(t, err)

// Run a few transactions.
for i := 0; i < 5; i++ {
err = runWithCfg(args, config{})
require.EqualError(t, err, "command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2115")
// Run 2 new transactions
for i := 0; i < 2; i++ {
err = runWithCfg(argsAccess, config{})
require.NoError(t, err)
}

// Test a timeout waiting for a transaction.
Expand Down Expand Up @@ -146,12 +169,14 @@ func TestMemcoin_Scenario_RestartNode(t *testing.T) {
args := append([]string{
os.Args[0],
"--config", node1, "ordering", "roster", "add",
"--wait", "60s"},
"--wait", "60s",
},
getExport(t, node1)...,
)

err = run(args)
require.EqualError(t, err, "command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2210")
require.EqualError(t, err,
"command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2210")
}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -230,7 +255,12 @@ func waitDaemon(t *testing.T, daemons []string) bool {

func makeNodeArg(path string, port uint16) []string {
return []string{
os.Args[0], "--config", path, "start", "--listen", "tcp://127.0.0.1:" + strconv.Itoa(int(port)),
os.Args[0],
"--config",
path,
"start",
"--listen",
"tcp://127.0.0.1:" + strconv.Itoa(int(port)),
}
}

Expand Down
2 changes: 2 additions & 0 deletions core/execution/native/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (ns *Service) Execute(snap store.Snapshot, step execution.Step) (execution.
err := contract.Execute(snap, step)
if err != nil {
res.Accepted = false
// LG: DEBUG - I'd like to keep this commented line, as it helps debugging.
// res.Message = fmt.Sprintf("%+v", err)
Comment on lines +87 to +88
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ineiti TODO: need to remove these lines...

res.Message = err.Error()
}

Expand Down
4 changes: 4 additions & 0 deletions core/ordering/cosipbft/blockstore/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/binary"
"sync"

"go.dedis.ch/dela"
"go.dedis.ch/dela/core"
"go.dedis.ch/dela/core/ordering/cosipbft/types"
"go.dedis.ch/dela/core/store"
Expand Down Expand Up @@ -87,6 +88,9 @@ func (s *InDisk) Load() error {
s.last = link
s.indices[link.GetBlock().GetHash()] = link.GetBlock().GetIndex()

if s.length%100 == 0 {
dela.Logger.Info().Msgf("Loaded %d blocks", s.length)
}
return nil
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestRegisterContract(t *testing.T) {
}

func TestNewTransaction(t *testing.T) {
mgr := NewManager(signed.NewManager(fake.NewSigner(), nil))
mgr := NewManager(signed.NewManager(fake.NewSigner(), fake.NewClient()))

tx, err := mgr.Make(authority.New(nil, nil))
require.NoError(t, err)
Expand Down
74 changes: 51 additions & 23 deletions core/ordering/cosipbft/cosipbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// protocol and the followers wait for incoming messages to update their own
// state machines and reply with signatures when the leader candidate is valid.
// If the leader fails to send a candidate, or finalize it, the followers will
// timeout after some time and move to a view change state.
// time out after some time and move to a view change state.
//
// The view change procedure is always waiting on the leader+1 confirmation
// before moving to leader+2, leader+3, etc. It means that if not enough nodes
Expand Down Expand Up @@ -43,6 +43,7 @@ import (
"go.dedis.ch/dela/core/ordering/cosipbft/blockstore"
"go.dedis.ch/dela/core/ordering/cosipbft/blocksync"
"go.dedis.ch/dela/core/ordering/cosipbft/contracts/viewchange"
"go.dedis.ch/dela/core/ordering/cosipbft/fastsync"
"go.dedis.ch/dela/core/ordering/cosipbft/pbft"
"go.dedis.ch/dela/core/ordering/cosipbft/types"
"go.dedis.ch/dela/core/store"
Expand All @@ -61,7 +62,7 @@ import (
const (
// DefaultRoundTimeout is the maximum round time the service waits
// for an event to happen.
DefaultRoundTimeout = 10 * time.Second
DefaultRoundTimeout = 10 * time.Minute

// DefaultFailedRoundTimeout is the maximum round time the service waits
// for an event to happen, after a round has failed, thus letting time
Expand All @@ -71,14 +72,17 @@ const (

// DefaultTransactionTimeout is the maximum allowed age of transactions
// before a view change is executed.
DefaultTransactionTimeout = 30 * time.Second
DefaultTransactionTimeout = 5 * time.Minute

// RoundWait is the constant value of the exponential backoff use between
// round failures.
RoundWait = 5 * time.Millisecond
RoundWait = 50 * time.Millisecond

// RoundMaxWait is the maximum amount for the backoff.
RoundMaxWait = 5 * time.Minute
RoundMaxWait = 10 * time.Minute

// DefaultFastSyncMessageSize defines when a fast sync message will be split.
DefaultFastSyncMessageSize = 1e6

rpcName = "cosipbft"
)
Expand Down Expand Up @@ -115,9 +119,10 @@ type Service struct {
}

type serviceTemplate struct {
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
syncMethod syncMethodType
}

// ServiceOption is the type of option to set some fields of the service.
Expand All @@ -144,8 +149,15 @@ func WithHashFactory(fac crypto.HashFactory) ServiceOption {
}
}

// WithBlockSync enables the old, slow syncing algorithm in the cosipbft module.
func WithBlockSync() ServiceOption {
return func(tmpl *serviceTemplate) {
tmpl.syncMethod = syncMethodBlock
}
}

// ServiceParam is the different components to provide to the service. All the
// fields are mandatory and it will panic if any is nil.
// fields are mandatory, and it will panic if any is nil.
type ServiceParam struct {
Mino mino.Mino
Cosi cosi.CollectiveSigning
Expand Down Expand Up @@ -220,10 +232,11 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro
ChainFactory: chainFac,
VerifierFactory: param.Cosi.GetVerifierFactory(),
}

bs := blocksync.NewSynchronizer(syncparam)

proc.sync = bs
if tmpl.syncMethod == syncMethodBlock {
proc.bsync = blocksync.NewSynchronizer(syncparam)
} else {
proc.fsync = fastsync.NewSynchronizer(syncparam)
}

fac := types.NewMessageFactory(
types.NewGenesisFactory(proc.rosterFac),
Expand Down Expand Up @@ -275,9 +288,20 @@ func NewServiceStart(s *Service) {
go s.watchBlocks()

if s.genesis.Exists() {
// If the genesis already exists, the service can start right away to
// participate in the chain.
// If the genesis already exists, and all blocks are loaded,
// the service can start right away to participate in the chain.
close(s.started)
if s.syncMethod() == syncMethodFast {
go func() {
roster, err := s.getCurrentRoster()
if err != nil {
s.logger.Err(err).Msg("Couldn't get roster")
} else {
s.logger.Info().Msg("Triggering catchup")
s.catchup <- roster
}
}()
}
}
}

Expand Down Expand Up @@ -541,17 +565,21 @@ func (s *Service) doLeaderRound(

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("round has started")

// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.sync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
// When using blocksync, the updates are sent before every new block, which
// uses a lot of bandwidth if there are more than just a few blocks.
if s.syncMethod() == syncMethodBlock {
// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.bsync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
}
}

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("pbft has started")

err = s.doPBFT(ctx)
err := s.doPBFT(ctx)
if err != nil {
return xerrors.Errorf("pbft failed: %v", err)
}
Expand Down Expand Up @@ -677,7 +705,7 @@ func (s *Service) doPBFT(ctx context.Context) error {
block, err = types.NewBlock(
data,
types.WithTreeRoot(root),
types.WithIndex(uint64(s.blocks.Len())),
types.WithIndex(s.blocks.Len()),
types.WithHashFactory(s.hashFactory))

if err != nil {
Expand Down
Loading
Loading