Skip to content

Commit

Permalink
✨ [persistence] Add PeersV to Channel
Browse files Browse the repository at this point in the history
When fixing the bug it became apparent, that the restorer was
missing a method to retrieve the peers of a specific channel.

Added the `PeersV` field to `persistence.Channel`. Now restoring
a channel will also allow to query the related peers.

Improved `channel/persistence/test/persistrestorertest.go` with
additional peer-restorer testlogic.

Signed-off-by: Norbert Dzikowski <[email protected]>
  • Loading branch information
Norbert Dzikowski committed Sep 24, 2020
1 parent 6f5fb27 commit fa62299
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 36 deletions.
16 changes: 2 additions & 14 deletions channel/persistence/keyvalue/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func sigKey(idx, numParts int) string {
}

// ChannelRemoved deletes a channel from the database.
func (pr *PersistRestorer) ChannelRemoved(_ context.Context, id channel.ID) error {
func (pr *PersistRestorer) ChannelRemoved(ctx context.Context, id channel.ID) error {
db := pr.channelDB(id).NewBatch()
peerdb := sortedkv.NewTable(pr.db, prefix.PeerDB).NewBatch()
// All keys a channel has.
Expand All @@ -90,7 +90,7 @@ func (pr *PersistRestorer) ChannelRemoved(_ context.Context, id channel.ID) erro
}
}

peers, err := pr.peersForChan(id)
peers, err := pr.channelPeers(id)
if err != nil {
return errors.WithMessage(err, "retrieving peers for channel")
}
Expand All @@ -110,18 +110,6 @@ func (pr *PersistRestorer) ChannelRemoved(_ context.Context, id channel.ID) erro
return errors.WithMessage(peerdb.Apply(), "applying peer batch")
}

// peersForChan returns a slice of peer addresses for a given channel id from
// the db of PersistRestorer.
func (pr *PersistRestorer) peersForChan(id channel.ID) ([]wire.Address, error) {
var ps wire.AddressesWithLen
peers, err := pr.channelDB(id).Get(prefix.Peers)
if err != nil {
return nil, errors.WithMessage(err, "unable to get peerlist from db")
}
return []wire.Address(ps), errors.WithMessage(perunio.Decode(bytes.NewBuffer([]byte(peers)), &ps),
"decoding peerlist")
}

// getParamsForChan returns the channel parameters for a given channel id from
// the db.
func (pr *PersistRestorer) getParamsForChan(id channel.ID) (channel.Params, error) {
Expand Down
24 changes: 15 additions & 9 deletions channel/persistence/keyvalue/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ChannelIterator struct {
}

// ActivePeers returns a list of all peers with which a channel is persisted.
func (pr *PersistRestorer) ActivePeers(context.Context) ([]wire.Address, error) {
func (pr *PersistRestorer) ActivePeers(ctx context.Context) ([]wire.Address, error) {
it := sortedkv.NewTable(pr.db, prefix.PeerDB).NewIterator()

peermap := make(map[wallet.AddrKey]wire.Address)
Expand All @@ -61,6 +61,18 @@ func (pr *PersistRestorer) ActivePeers(context.Context) ([]wire.Address, error)
return peers, errors.WithMessage(it.Close(), "closing iterator")
}

// channelPeers returns a slice of peer addresses for a given channel id from
// the db of PersistRestorer.
func (pr *PersistRestorer) channelPeers(id channel.ID) ([]wire.Address, error) {
var ps wire.AddressesWithLen
peers, err := pr.channelDB(id).Get(prefix.Peers)
if err != nil {
return nil, errors.WithMessage(err, "unable to get peerlist from db")
}
return []wire.Address(ps), errors.WithMessage(perunio.Decode(bytes.NewBuffer([]byte(peers)), &ps),
"decoding peerlist")
}

// RestoreAll should return an iterator over all persisted channels.
func (pr *PersistRestorer) RestoreAll() (persistence.ChannelIterator, error) {
return &ChannelIterator{
Expand Down Expand Up @@ -158,7 +170,6 @@ const (
noOpts decOpts = 0
allowEnd decOpts = 1 << iota
allowEmpty
skip
)

// isSetIn masks the given opts with the current opt and returns
Expand All @@ -173,12 +184,11 @@ func (i *ChannelIterator) Next(context.Context) bool {
return false
}

i.ch = &persistence.Channel{ParamsV: new(channel.Params)}

i.ch = persistence.NewChannel()
if !i.decodeNext("current", &i.ch.CurrentTXV, allowEnd) ||
!i.decodeNext("index", &i.ch.IdxV, noOpts) ||
!i.decodeNext("params", i.ch.ParamsV, noOpts) ||
!i.decodeNext("peers", nil, skip) ||
!i.decodeNext("peers", (*wire.AddressesWithLen)(&i.ch.PeersV), noOpts) ||
!i.decodeNext("phase", &i.ch.PhaseV, noOpts) {
return false
}
Expand Down Expand Up @@ -219,10 +229,6 @@ func (i *ChannelIterator) decodeNext(key string, v interface{}, opts decOpts) bo
}
}

if skip.isSetIn(opts) {
return true
}

buf := bytes.NewBuffer(i.its[0].ValueBytes())
if buf.Len() == 0 {
if allowEmpty.isSetIn(opts) {
Expand Down
51 changes: 40 additions & 11 deletions channel/persistence/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,30 @@ type (
io.Closer
}

// A Channel holds all data that is necessary for restoring a channel
// A chSource holds all data that is necessary for restoring a channel
// controller.
Channel struct {
chSource struct {
IdxV channel.Index // IdxV is the own index in the channel.
ParamsV *channel.Params // ParamsV are the channel parameters.
StagingTXV channel.Transaction // StagingTxV is the staging transaction.
CurrentTXV channel.Transaction // CurrentTXV is the current transaction.
PhaseV channel.Phase // PhaseV is the current channel phase.
}

// Channel holds all data that is necessary to restore a channel controller
// and additionally the related peers for this channel.
Channel struct {
chSource
PeersV []wire.Address
}
)

var _ channel.Source = (*Channel)(nil)

// CloneSource creates a new Channel object whose fields are clones of the data
// coming from Source s.
func CloneSource(s channel.Source) *Channel {
return &Channel{
// coming from Source s. Returned `PeersV` are `nil`.
func CloneSource(s channel.Source) channel.Source {
return &chSource{
IdxV: s.Idx(),
ParamsV: s.Params().Clone(),
StagingTXV: s.StagingTX().Clone(),
Expand All @@ -134,20 +141,42 @@ func CloneSource(s channel.Source) *Channel {
}
}

// NewChannel creates a new Channel object whose fields are initialized.
func NewChannel() *Channel {
return &Channel{
chSource{ParamsV: new(channel.Params)},
nil,
}
}

// FromSource creates a new Channel object from given `channel.Source` and peers.
func FromSource(s channel.Source, ps []wire.Address) *Channel {
return &Channel{
chSource{
IdxV: s.Idx(),
ParamsV: s.Params().Clone(),
StagingTXV: s.StagingTX().Clone(),
CurrentTXV: s.CurrentTX().Clone(),
PhaseV: s.Phase(),
},
ps,
}
}

// ID is the channel ID of this source. It is the same as Params().ID().
func (c *Channel) ID() channel.ID { return c.ParamsV.ID() }
func (c *chSource) ID() channel.ID { return c.ParamsV.ID() }

// Idx is the own index in the channel.
func (c *Channel) Idx() channel.Index { return c.IdxV }
func (c *chSource) Idx() channel.Index { return c.IdxV }

// Params are the channel parameters.
func (c *Channel) Params() *channel.Params { return c.ParamsV }
func (c *chSource) Params() *channel.Params { return c.ParamsV }

// StagingTX is the staged transaction (State+incomplete list of sigs).
func (c *Channel) StagingTX() channel.Transaction { return c.StagingTXV }
func (c *chSource) StagingTX() channel.Transaction { return c.StagingTXV }

// CurrentTX is the current transaction (State+complete list of sigs).
func (c *Channel) CurrentTX() channel.Transaction { return c.CurrentTXV }
func (c *chSource) CurrentTX() channel.Transaction { return c.CurrentTXV }

// Phase is the phase in which the channel is currently in.
func (c *Channel) Phase() channel.Phase { return c.PhaseV }
func (c *chSource) Phase() channel.Phase { return c.PhaseV }
14 changes: 13 additions & 1 deletion channel/persistence/test/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,21 @@ func NewRandomChannel(
}

require.NoError(t, pr.ChannelCreated(ctx, c.StateMachine, c.peers))

c.AssertPersisted(ctx, t)
return
}

func requireEqualPeers(t require.TestingT, expected, actual []wire.Address) {
require.Equal(t, len(expected), len(actual))
for i, p := range expected {
if !p.Equals(actual[i]) {
t.Errorf("restored peers for channel do not match\nexpected: %v\nactual: %v",
actual, expected)
t.FailNow()
}
}
}

// AssertPersisted reads the channel state from the restorer and compares it
// to the actual channel state. If an error occurs while restoring the channel
// or if the restored channel does not match the actual channel state, then the
Expand All @@ -81,6 +92,7 @@ func (c *Channel) AssertPersisted(ctx context.Context, t require.TestingT) {
require.NoError(t, err)
require.NotNil(t, ch)
c.RequireEqual(t, ch)
requireEqualPeers(t, c.peers, ch.PeersV)
}

// RequireEqual asserts that the channel is equal to the provided channel state.
Expand Down
2 changes: 1 addition & 1 deletion channel/persistence/test/persistrestorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (pr *PersistRestorer) ChannelCreated(
return errors.Errorf("channel already persisted: %x", id)
}

pr.chans[id] = persistence.CloneSource(source)
pr.chans[id] = persistence.FromSource(source, peers)
pr.pcs.Add(id, peers...)
return nil
}
Expand Down

0 comments on commit fa62299

Please sign in to comment.