Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

feat: gossiping the latest valset in P2P layer in case they get pruned #560

Merged
merged 28 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9270c89
feat: gossiping the latest valset in P2P layer in case they get pruned
rach-id Oct 25, 2023
96642c3
Update p2p/dht.go
rach-id Oct 26, 2023
7f5f74f
Update p2p/dht.go
rach-id Oct 26, 2023
bc15575
Update p2p/dht.go
rach-id Oct 26, 2023
479cab4
Update p2p/dht.go
rach-id Oct 26, 2023
620b0bb
Update p2p/dht.go
rach-id Oct 26, 2023
7b0f5fb
Update p2p/dht.go
rach-id Oct 26, 2023
2a0d5e5
Update p2p/errors.go
rach-id Oct 26, 2023
25fdfe4
Update p2p/keys.go
rach-id Oct 26, 2023
697e0c2
Update types/latest_valset.go
rach-id Oct 26, 2023
21e32ab
Update p2p/validators.go
rach-id Oct 26, 2023
16b712f
Update p2p/validators_test.go
rach-id Oct 26, 2023
a5b2e50
Update p2p/validators_test.go
rach-id Oct 26, 2023
09e6954
Update p2p/validators_test.go
rach-id Oct 26, 2023
e6b61d2
Update p2p/validators.go
rach-id Oct 26, 2023
d69f14c
Update p2p/validators.go
rach-id Oct 26, 2023
8124d1c
Update p2p/validators.go
rach-id Oct 26, 2023
42e1f02
Update p2p/validators.go
rach-id Oct 26, 2023
e13e23a
Update p2p/validators.go
rach-id Oct 26, 2023
24469ea
Update p2p/validators.go
rach-id Oct 26, 2023
cc536e3
Update p2p/validators.go
rach-id Oct 26, 2023
9b5baf8
Update p2p/validators.go
rach-id Oct 26, 2023
5e726f0
fix: nil pointer
rach-id Oct 26, 2023
545ccee
chore: type2 rename
rach-id Oct 26, 2023
4df2e5a
Update types/latest_valset_test.go
rach-id Oct 26, 2023
755ff25
fix: introduce LatestValset type to fix serialization
rach-id Oct 26, 2023
d4e6c41
feat: authenticate the valset to the Blobstream contract
rach-id Oct 27, 2023
e5a05a5
feat: add log message for deployer
rach-id Oct 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/blobstream/deploy/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func Command() *cobra.Command {

vs, err := getStartingValset(cmd.Context(), querier, config.startingNonce)
if err != nil {
logger.Error("couldn't get valset from state (probably pruned). connect to an archive node to be able to deploy the contract")
return errors.Wrap(
err,
"cannot initialize the Blobstream contract without having a valset request: %s",
Expand Down
8 changes: 8 additions & 0 deletions evm/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ func (ec *Client) StateLastEventNonce(opts *bind.CallOpts) (uint64, error) {
return nonce.Uint64(), nil
}

func (ec *Client) StateLastValidatorSetCheckpoint(opts *bind.CallOpts) ([32]byte, error) {
checkpoint, err := ec.Wrapper.StateLastValidatorSetCheckpoint(opts)
if err != nil {
return [32]byte{}, err
}
return checkpoint, nil
}

func (ec *Client) WaitForTransaction(
ctx context.Context,
backend bind.DeployBackend,
Expand Down
7 changes: 7 additions & 0 deletions orchestrator/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,10 @@ func (b Broadcaster) ProvideValsetConfirm(ctx context.Context, nonce uint64, con
}
return b.BlobstreamDHT.PutValsetConfirm(ctx, p2p.GetValsetConfirmKey(nonce, confirm.EthAddress, signBytes), confirm)
}

func (b Broadcaster) ProvideLatestValset(ctx context.Context, latestValset types.LatestValset) error {
if len(b.BlobstreamDHT.RoutingTable().ListPeers()) == 0 {
return ErrEmptyPeersTable
}
return b.BlobstreamDHT.PutLatestValset(ctx, latestValset)
}
36 changes: 36 additions & 0 deletions orchestrator/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"encoding/hex"
"math/big"
"testing"
"time"

celestiatypes "github.com/celestiaorg/celestia-app/x/qgb/types"

"github.com/celestiaorg/orchestrator-relayer/evm"
"github.com/ethereum/go-ethereum/accounts/keystore"
Expand Down Expand Up @@ -97,6 +100,39 @@ func TestBroadcastValsetConfirm(t *testing.T) {
assert.Equal(t, *expectedConfirm, actualConfirm)
}

func TestBroadcastLatestValset(t *testing.T) {
network := blobstreamtesting.NewDHTNetwork(context.Background(), 4)
defer network.Stop()

// create a test Valset
expectedValset := celestiatypes.Valset{
Time: time.UnixMicro(10),
Height: 5,
Members: []celestiatypes.BridgeValidator{
{
Power: 100,
EvmAddress: "evm_addr1",
},
{
Power: 200,
EvmAddress: "evm_addr2",
},
},
}

// Broadcast the valset
broadcaster := orchestrator.NewBroadcaster(network.DHTs[1])
err := broadcaster.ProvideLatestValset(context.Background(), *types.ToLatestValset(expectedValset))
assert.NoError(t, err)

// try to get the valset from another peer
actualValset, err := network.DHTs[3].GetLatestValset(context.Background())
assert.NoError(t, err)
assert.NotNil(t, actualValset)

assert.True(t, types.IsValsetEqualToLatestValset(expectedValset, actualValset))
}

// TestEmptyPeersTable tests that values are not broadcasted if the DHT peers
// table is empty.
func TestEmptyPeersTable(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error {
orch.Logger.Debug("validator not part of valset. won't sign", "nonce", nonce)
return nil
}

if err == nil && previousValset != nil {
// add the valset to the p2p network
// it's alright if this fails, we can expect other nodes to do it successfully
_ = orch.Broadcaster.ProvideLatestValset(ctx, *types.ToLatestValset(*previousValset))
}
}

switch castedAtt := att.(type) {
Expand Down Expand Up @@ -341,6 +347,10 @@ func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error {
}

func (orch Orchestrator) ProcessValsetEvent(ctx context.Context, valset celestiatypes.Valset) error {
// add the valset to the p2p network
// it's alright if this fails, we can expect other nodes to do it successfully
_ = orch.Broadcaster.ProvideLatestValset(ctx, *types.ToLatestValset(valset))

signBytes, err := valset.SignBytes()
if err != nil {
return err
Expand Down
26 changes: 26 additions & 0 deletions orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,32 @@ func (s *OrchestratorTestSuite) TestProcessValsetEvent() {
assert.Equal(t, s.Orchestrator.EvmAccount.Address.Hex(), confirm.EthAddress)
}

func (s *OrchestratorTestSuite) TestProcessValsetEventAndProvideValset() {
t := s.T()
_, err := s.Node.CelestiaNetwork.WaitForHeight(50)
require.NoError(t, err)

vs, err := celestiatypes.NewValset(
2,
10,
[]*celestiatypes.InternalBridgeValidator{{
Power: 10,
EVMAddress: s.Orchestrator.EvmAccount.Address,
}},
time.Now(),
)
require.NoError(t, err)

// signing and submitting the signature and also providing the valset event
err = s.Orchestrator.ProcessValsetEvent(s.Node.Context, *vs)
require.NoError(t, err)

// retrieving the valset
actualVs, err := s.Node.DHTNetwork.DHTs[0].GetLatestValset(s.Node.Context)
require.NoError(t, err)
assert.Equal(t, vs.Nonce, actualVs.Nonce)
}

func TestValidatorPartOfValset(t *testing.T) {
tests := []struct {
name string
Expand Down
42 changes: 38 additions & 4 deletions p2p/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"time"

"github.com/libp2p/go-libp2p-kad-dht/providers"

"github.com/celestiaorg/orchestrator-relayer/types"
ds "github.com/ipfs/go-datastore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
tmlog "github.com/tendermint/tendermint/libs/log"
Expand All @@ -17,6 +18,7 @@ const (
ProtocolPrefix = "/blobstream/0.1.0" // TODO "/blobstream/<version>" ?
DataCommitmentConfirmNamespace = "dcc"
ValsetConfirmNamespace = "vc"
LatestValsetNamespace = "lv"
)

// BlobstreamDHT wrapper around the `IpfsDHT` implementation.
Expand All @@ -29,9 +31,9 @@ type BlobstreamDHT struct {
// NewBlobstreamDHT create a new IPFS DHT using a suitable configuration for the Blobstream.
// If nil is passed for bootstrappers, the DHT will not try to connect to any existing peer.
func NewBlobstreamDHT(ctx context.Context, h host.Host, store ds.Batching, bootstrappers []peer.AddrInfo, logger tmlog.Logger) (*BlobstreamDHT, error) {
// this value is set to 23 days, which is the unbonding period.
// we want to have the signatures available for this whole period.
providers.ProvideValidity = time.Hour * 24 * 23
// this values is set to a year, so that even in super-stable networks, we have at least
// one valset in store for a year.
providers.ProvideValidity = time.Hour * 24 * 365

router, err := dht.New(
ctx,
Expand All @@ -41,6 +43,7 @@ func NewBlobstreamDHT(ctx context.Context, h host.Host, store ds.Batching, boots
dht.ProtocolPrefix(ProtocolPrefix),
dht.NamespacedValidator(DataCommitmentConfirmNamespace, DataCommitmentConfirmValidator{}),
dht.NamespacedValidator(ValsetConfirmNamespace, ValsetConfirmValidator{}),
dht.NamespacedValidator(LatestValsetNamespace, LatestValsetValidator{}),
dht.BootstrapPeers(bootstrappers...),
dht.DisableProviders(),
)
Expand Down Expand Up @@ -157,3 +160,34 @@ func (q BlobstreamDHT) GetValsetConfirm(ctx context.Context, key string) (types.
}
return confirm, nil
}

// PutLatestValset encodes a valset then puts its value to the DHT.
// The key will be returned by the `GetValsetKey` method.
// If the valset is not the latest, it will fail.
// Returns an error if it fails.
func (q BlobstreamDHT) PutLatestValset(ctx context.Context, v types.LatestValset) error {
encodedData, err := types.MarshalLatestValset(v)
if err != nil {
return err
}
err = q.PutValue(ctx, GetLatestValsetKey(), encodedData)
if err != nil {
return err
}
return nil
}

// GetLatestValset looks for the latest valset in the DHT.
// The key will be returned by the `GetValsetKey` method.
// Returns an error if it fails.
func (q BlobstreamDHT) GetLatestValset(ctx context.Context) (types.LatestValset, error) {
encoded, err := q.GetValue(ctx, GetLatestValsetKey()) // this is a blocking call, we should probably use timeout and channel
if err != nil {
return types.LatestValset{}, err
}
valset, err := types.UnmarshalLatestValset(encoded)
if err != nil {
return types.LatestValset{}, err
}
return valset, nil
}
105 changes: 105 additions & 0 deletions p2p/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

celestiatypes "github.com/celestiaorg/celestia-app/x/qgb/types"

"github.com/celestiaorg/orchestrator-relayer/evm"
"github.com/ethereum/go-ethereum/accounts/keystore"

Expand Down Expand Up @@ -95,6 +97,109 @@ func TestPutDataCommitmentConfirm(t *testing.T) {
assert.Equal(t, expectedConfirm, actualConfirm)
}

func TestPutLatestValset(t *testing.T) {
network := blobstreamtesting.NewDHTNetwork(context.Background(), 2)
defer network.Stop()

// create a test Valset
expectedValset := celestiatypes.Valset{
Nonce: 10,
Time: time.UnixMicro(10),
Height: 5,
Members: []celestiatypes.BridgeValidator{
{
Power: 100,
EvmAddress: "evm_addr1",
},
{
Power: 200,
EvmAddress: "evm_addr2",
},
},
}

// put the test Valset in the DHT
err := network.DHTs[0].PutLatestValset(context.Background(), *types.ToLatestValset(expectedValset))
assert.NoError(t, err)

// try to get the latest valset from the same peer
actualValset, err := network.DHTs[0].GetLatestValset(context.Background())
assert.NoError(t, err)
assert.NotNil(t, actualValset)

assert.True(t, types.IsValsetEqualToLatestValset(expectedValset, actualValset))
}

func TestPutMultipleLatestValset(t *testing.T) {
network := blobstreamtesting.NewDHTNetwork(context.Background(), 3)
defer network.Stop()

// create test Valsets
valset1 := celestiatypes.Valset{
Nonce: 10,
Time: time.UnixMicro(10),
Height: 5,
Members: []celestiatypes.BridgeValidator{
{
Power: 100,
EvmAddress: "evm_addr1",
},
{
Power: 200,
EvmAddress: "evm_addr2",
},
},
}
valset2 := celestiatypes.Valset{
Nonce: 11,
Time: time.UnixMicro(10),
Height: 5,
Members: []celestiatypes.BridgeValidator{
{
Power: 100,
EvmAddress: "evm_addr1",
},
{
Power: 200,
EvmAddress: "evm_addr2",
},
},
}
valset3 := celestiatypes.Valset{
Nonce: 9,
Time: time.UnixMicro(10),
Height: 5,
Members: []celestiatypes.BridgeValidator{
{
Power: 100,
EvmAddress: "evm_addr1",
},
{
Power: 200,
EvmAddress: "evm_addr2",
},
},
}

// put the valsets in the DHT
err := network.DHTs[0].PutLatestValset(context.Background(), *types.ToLatestValset(valset1))
assert.NoError(t, err)

err = network.DHTs[1].PutLatestValset(context.Background(), *types.ToLatestValset(valset2))
assert.NoError(t, err)

// this one should fail since it puts an older valset than ones in store
err = network.DHTs[2].PutLatestValset(context.Background(), *types.ToLatestValset(valset3))
assert.Error(t, err)

// try to get the valset from the same peer
actualValset, err := network.DHTs[0].GetLatestValset(context.Background())
assert.NoError(t, err)
assert.NotNil(t, actualValset)

assert.True(t, types.IsValsetEqualToLatestValset(valset2, actualValset))
}

func TestNetworkPutDataCommitmentConfirm(t *testing.T) {
network := blobstreamtesting.NewDHTNetwork(context.Background(), 10)
defer network.Stop()
Expand Down
6 changes: 5 additions & 1 deletion p2p/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package p2p

import "errors"
import (
"errors"
)

var (
ErrPeersTimeout = errors.New("timeout while waiting for peers")
Expand All @@ -17,4 +19,6 @@ var (
ErrEmptyNamespace = errors.New("empty namespace")
ErrEmptyEVMAddr = errors.New("empty evm address")
ErrEmptyDigest = errors.New("empty digest")
ErrEmptyValset = errors.New("empty valset")
ErrInvalidLatestValsetKey = errors.New("invalid latest valset key")
)
5 changes: 5 additions & 0 deletions p2p/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func GetValsetConfirmKey(nonce uint64, evmAddr string, signBytes string) string
evmAddr + ":" + signBytes
}

// GetLatestValsetKey creates the latest valset key.
func GetLatestValsetKey() string {
return "/" + LatestValsetNamespace + "/latest"
}

// ParseKey parses a key and returns its fields.
// Will return an error if the key is missing some fields, some fields are empty, or otherwise invalid.
func ParseKey(key string) (namespace string, nonce uint64, evmAddr string, digest string, err error) {
Expand Down
11 changes: 11 additions & 0 deletions p2p/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,14 @@ func (q Querier) QueryValsetConfirms(ctx context.Context, nonce uint64, valset c
}
return confirms, nil
}

// QueryLatestValset get the latest valset from the p2p network.
func (q Querier) QueryLatestValset(
ctx context.Context,
) (*types.LatestValset, error) {
latestValset, err := q.BlobstreamDHT.GetLatestValset(ctx)
if err != nil {
return nil, err
}
return &latestValset, nil
}
Loading