Skip to content

Commit

Permalink
Merge pull request hyperledger-labs#276 from perun-network/275-enable…
Browse files Browse the repository at this point in the history
…-context-check-linter

Enable context check linter
  • Loading branch information
Manoranjith authored Dec 8, 2021
2 parents 9cb3da3 + 029e577 commit 39553f9
Show file tree
Hide file tree
Showing 15 changed files with 94 additions and 84 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ linters:
# These could be enabled in the future:
- ifshort # we often don't use `if err := …` for readability.
- tparallel # We don't always use parallel tests.
- contextcheck # Wants contexts to be always passed down.

# Deprecated:
- maligned
Expand Down
11 changes: 5 additions & 6 deletions backend/ethereum/channel/test/adjudicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (t *SimTimeout) IsElapsed(ctx context.Context) bool {
}
defer t.sb.clockMu.Unlock()

return t.timeLeft() <= 0
return t.timeLeft(ctx) <= 0
}

// Wait advances the clock of the simulated blockchain past the timeout.
Expand All @@ -132,7 +132,7 @@ func (t *SimTimeout) Wait(ctx context.Context) error {
}
defer t.sb.clockMu.Unlock()

if d := t.timeLeft(); d > 0 {
if d := t.timeLeft(ctx); d > 0 {
if err := t.sb.AdjustTime(time.Duration(d) * time.Second); err != nil {
return errors.Wrap(err, "adjusting time")
}
Expand All @@ -141,10 +141,9 @@ func (t *SimTimeout) Wait(ctx context.Context) error {
return nil
}

func (t *SimTimeout) timeLeft() int64 {
// context is ignored by sim blockchain anyways
h, err := t.sb.HeaderByNumber(nil, nil) //nolint:staticcheck
if err != nil { // should never happen with a sim blockchain
func (t *SimTimeout) timeLeft(ctx context.Context) int64 {
h, err := t.sb.HeaderByNumber(ctx, nil)
if err != nil { // should never happen with a sim blockchain
panic(fmt.Sprint("Error getting latest block: ", err))
}
return int64(t.Time) - int64(h.Time)
Expand Down
4 changes: 2 additions & 2 deletions backend/ethereum/channel/test/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa

// FundAddress funds a given address with `test.MaxBalance` eth from a faucet.
func (s *SimulatedBackend) FundAddress(ctx context.Context, addr common.Address) {
nonce, err := s.PendingNonceAt(context.Background(), s.faucetAddr)
nonce, err := s.PendingNonceAt(ctx, s.faucetAddr)
if err != nil {
panic(err)
}
Expand All @@ -132,7 +132,7 @@ func (s *SimulatedBackend) FundAddress(ctx context.Context, addr common.Address)
panic(err)
}
s.Commit()
if _, err := bind.WaitMined(context.Background(), s, signedTX); err != nil {
if _, err := bind.WaitMined(ctx, s, signedTX); err != nil {
panic(err)
}
}
Expand Down
16 changes: 8 additions & 8 deletions channel/persistence/test/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ func isNilSigs(s []wallet.Sig) bool {
}

// Init calls Init on the state machine and then checks the persistence.
func (c *Channel) Init(t require.TestingT, rng *rand.Rand) {
func (c *Channel) Init(ctx context.Context, t require.TestingT, rng *rand.Rand) {
initAlloc := *ctest.NewRandomAllocation(rng, ctest.WithNumParts(len(c.accounts)))
initData := channel.NewMockOp(channel.OpValid)
err := c.StateMachine.Init(nil, initAlloc, initData) //nolint:staticcheck
err := c.StateMachine.Init(ctx, initAlloc, initData)
require.NoError(t, err)
c.AssertPersisted(c.ctx, t)
c.AssertPersisted(ctx, t)
}

// EnableInit calls EnableInit on the state machine and then checks the persistence.
Expand All @@ -167,16 +167,16 @@ func (c *Channel) EnableInit(t require.TestingT) {
}

// SignAll signs the current staged state by all parties.
func (c *Channel) SignAll(t require.TestingT) {
func (c *Channel) SignAll(ctx context.Context, t require.TestingT) {
// trigger local signing
c.Sig(nil) //nolint:errcheck,staticcheck
c.AssertPersisted(c.ctx, t)
c.Sig(ctx) //nolint:errcheck
c.AssertPersisted(ctx, t)
// remote signers
for i := range c.accounts {
sig, err := channel.Sign(c.accounts[i], c.StagingState())
require.NoError(t, err)
c.AddSig(nil, channel.Index(i), sig) //nolint:errcheck,staticcheck
c.AssertPersisted(c.ctx, t)
c.AddSig(ctx, channel.Index(i), sig) //nolint:errcheck
c.AssertPersisted(ctx, t)
}
}

Expand Down
8 changes: 4 additions & 4 deletions channel/persistence/test/persistrestorertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func GenericPersistRestorerTest(
seed := pkgtest.Seed("", subSeed, numChans, numPeers, chIndex, ch.ID())
rng := rand.New(rand.NewSource(seed)) //nolint:gosec

ch.Init(t, rng)
ch.SignAll(t)
ch.Init(ctx, t, rng)
ch.SignAll(ctx, t)
ch.EnableInit(t)

ch.SetFunded(t)
Expand All @@ -140,7 +140,7 @@ func GenericPersistRestorerTest(
ch.DiscardUpdate(t)
err = ch.Update(t, state1, ch.Idx())
require.NoError(t, err)
ch.SignAll(t)
ch.SignAll(ctx, t)
ch.EnableUpdate(t)

// Final state
Expand All @@ -149,7 +149,7 @@ func GenericPersistRestorerTest(
statef.IsFinal = true
err = ch.Update(t, statef, ch.Idx()^1)
require.NoError(t, err)
ch.SignAll(t)
ch.SignAll(ctx, t)
ch.EnableFinal(t)

ch.SetRegistering(t)
Expand Down
3 changes: 2 additions & 1 deletion client/channelconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func newChannelConn(id channel.ID, peers []wire.Address, idx channel.Index, sub
// relay to receive all update responses
relay := wire.NewRelay()
// we cache all responses for the lifetime of the relay
relay.Cache(context.Background(), func(*wire.Envelope) bool { return true })
cacheAll := func(*wire.Envelope) bool { return true }
relay.Cache(&cacheAll)
// Close the relay if anything goes wrong in the following.
// We could have a leaky subscription otherwise.
defer func() {
Expand Down
28 changes: 18 additions & 10 deletions client/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ func (c *Client) ProposeChannel(ctx context.Context, prop ChannelProposal) (*Cha
}

// 2. send proposal, wait for response, create channel object
c.enableVer1Cache() // cache version 1 updates until channel is opened
defer c.releaseVer1Cache() // replay cached version 1 updates
// cache version 1 updates until channel is opened
c.enableVer1Cache()
// replay cached version 1 updates
defer c.releaseVer1Cache() //nolint:contextcheck
ch, err := c.proposeTwoPartyChannel(ctx, prop)
if err != nil {
return nil, errors.WithMessage(err, "channel proposal")
Expand Down Expand Up @@ -236,8 +238,10 @@ func (c *Client) handleChannelProposalAcc(
return ch, errors.WithMessage(err, "validating channel proposal acceptance")
}

c.enableVer1Cache() // cache version 1 updates
defer c.releaseVer1Cache() // replay cached version 1 updates
// cache version 1 updates
c.enableVer1Cache()
// replay cached version 1 updates
defer c.releaseVer1Cache() //nolint:contextcheck

if ch, err = c.acceptChannelProposal(ctx, prop, p, acc); err != nil {
return ch, errors.WithMessage(err, "accept channel proposal")
Expand All @@ -261,7 +265,8 @@ func (c *Client) acceptChannelProposal(
// enables caching of incoming version 0 signatures before sending any message
// that might trigger a fast peer to send those. We don't know the channel id
// yet so the cache predicate is coarser than the later subscription.
enableVer0Cache(ctx, c.conn)
pred := enableVer0Cache(c.conn)
defer c.conn.ReleaseCache(pred)

if err := c.conn.pubMsg(ctx, acc, p); err != nil {
c.logPeer(p).Errorf("error sending proposal acceptance: %v", err)
Expand Down Expand Up @@ -298,7 +303,8 @@ func (c *Client) proposeTwoPartyChannel(
// enables caching of incoming version 0 signatures before sending any message
// that might trigger a fast peer to send those. We don't know the channel id
// yet so the cache predicate is coarser than the later subscription.
enableVer0Cache(ctx, c.conn)
pred := enableVer0Cache(c.conn)
defer c.conn.ReleaseCache(pred)

proposalID := proposal.ProposalID()
isResponse := func(e *wire.Envelope) bool {
Expand Down Expand Up @@ -676,11 +682,13 @@ func (c *Client) fundSubchannel(ctx context.Context, prop *SubChannelProposal, s
}

// enableVer0Cache enables caching of incoming version 0 signatures.
func enableVer0Cache(ctx context.Context, c wire.Cacher) {
c.Cache(ctx, func(m *wire.Envelope) bool {
func enableVer0Cache(c wire.Cacher) *wire.Predicate {
p := func(m *wire.Envelope) bool {
return m.Msg.Type() == wire.ChannelUpdateAcc &&
m.Msg.(*msgChannelUpdateAcc).Version == 0
})
}
c.Cache(&p)
return &p
}

func (c *Client) enableVer1Cache() {
Expand All @@ -700,7 +708,7 @@ func (c *Client) releaseVer1Cache() {

c.version1Cache.enabled--
for _, u := range c.version1Cache.cache {
go c.handleChannelUpdate(u.uh, u.p, u.m)
go c.handleChannelUpdate(u.uh, u.p, u.m) //nolint:contextcheck
}
c.version1Cache.cache = nil
}
Expand Down
6 changes: 3 additions & 3 deletions client/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c *Client) handleChannelUpdate(uh UpdateHandler, p wire.Address, m Channel
return
}
pidx := ch.Idx() ^ 1
ch.handleUpdateReq(pidx, m, uh)
ch.handleUpdateReq(pidx, m, uh) //nolint:contextcheck
}

func (c *Client) cacheVersion1Update(uh UpdateHandler, p wire.Address, m ChannelUpdateProposal) bool {
Expand Down Expand Up @@ -288,12 +288,12 @@ func (c *Channel) handleUpdateReq(
client := c.client

if prop, ok := req.(*virtualChannelFundingProposal); ok {
client.handleVirtualChannelFundingProposal(c, prop, responder)
client.handleVirtualChannelFundingProposal(c, prop, responder) //nolint:contextcheck
return
}

if prop, ok := req.(*virtualChannelSettlementProposal); ok {
client.handleVirtualChannelSettlementProposal(c, prop, responder)
client.handleVirtualChannelSettlementProposal(c, prop, responder) //nolint:contextcheck
return
}

Expand Down
8 changes: 4 additions & 4 deletions client/virtual_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,18 @@ func (c *Client) handleVirtualChannelFundingProposal(
) {
err := c.validateVirtualChannelFundingProposal(ch, prop)
if err != nil {
c.rejectProposal(responder, err.Error())
c.rejectProposal(responder, err.Error()) //nolint:contextcheck
}

ctx, cancel := context.WithTimeout(c.Ctx(), virtualFundingTimeout)
defer cancel()

err = c.fundingWatcher.Await(ctx, prop)
if err != nil {
c.rejectProposal(responder, err.Error())
c.rejectProposal(responder, err.Error()) //nolint:contextcheck
}

c.acceptProposal(responder)
c.acceptProposal(responder) //nolint:contextcheck
}

func (c *Channel) watchVirtual() error {
Expand Down Expand Up @@ -348,7 +348,7 @@ func (c *Client) matchFundingProposal(ctx context.Context, a, b interface{}) boo
}

go func() {
err := virtual.watchVirtual()
err := virtual.watchVirtual() //nolint:contextcheck // The context will be derived from the channel context.
c.log.Debugf("channel %v: watcher stopped: %v", virtual.ID(), err)
}()
return true
Expand Down
4 changes: 2 additions & 2 deletions client/virtual_channel_settlement.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *Client) handleVirtualChannelSettlementProposal(
) {
err := c.validateVirtualChannelSettlementProposal(parent, prop)
if err != nil {
c.rejectProposal(responder, err.Error())
c.rejectProposal(responder, err.Error()) //nolint:contextcheck
}

ctx, cancel := context.WithTimeout(c.Ctx(), virtualSettlementTimeout)
Expand All @@ -90,7 +90,7 @@ func (c *Client) handleVirtualChannelSettlementProposal(
resp: responder,
})
if err != nil {
c.rejectProposal(responder, err.Error())
c.rejectProposal(responder, err.Error()) //nolint:contextcheck
}
}

Expand Down
10 changes: 4 additions & 6 deletions watcher/local/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (w *Watcher) startWatching(
Sigs: signedState.Sigs,
}
ch.Go(func() { ch.handleStatesFromClient(initialTx) })
ch.Go(func() { ch.handleEventsFromChain(w.rs, w.registry) })
ch.Go(func() { ch.handleEventsFromChain(w.rs, w.registry) }) //nolint:contextcheck

return statesPubSub, eventsToClientPubSub, nil
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func (ch *ch) handleEventsFromChain(registerer channel.Registerer, chRegistry *r
}

log.Debugf("Registering latest version (%d)", latestTx.Version)
err := registerDispute(chRegistry, registerer, parent)
err := registerDispute(chRegistry, registerer, parent) //nolint:contextcheck
if err != nil {
log.Error("Error registering dispute")
return
Expand All @@ -355,15 +355,13 @@ func (ch *ch) handleEventsFromChain(registerer channel.Registerer, chRegistry *r
}

// registerDispute collects the latest transaction for the parent channel and
// each of its children. It then registers dispute for the channel tree.
// each of its children. It then registers a dispute for the channel tree.
//
// This function assumes the callers has locked the parent channel.
func registerDispute(r *registry, registerer channel.Registerer, parentCh *ch) error {
parentTx, subStates := retreiveLatestSubStates(r, parentCh)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := registerer.Register(ctx, makeAdjudicatorReq(parentCh.params, parentTx), subStates)
err := registerer.Register(context.TODO(), makeAdjudicatorReq(parentCh.params, parentTx), subStates)
if err != nil {
return err
}
Expand Down
42 changes: 19 additions & 23 deletions wire/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,53 @@

package wire

import "context"

type (
// Cache is a message cache. The default value is a valid empty cache.
// Cache is a message cache.
Cache struct {
msgs []*Envelope
preds []ctxPredicate
preds map[*Predicate]struct{}
}

// A Predicate defines a message filter.
Predicate = func(*Envelope) bool

ctxPredicate struct {
ctx context.Context
p Predicate
}

// A Cacher has the Cache method to enable caching of messages.
Cacher interface {
// Cache should enable the caching of messages
Cache(context.Context, Predicate)
Cache(*Predicate)
}
)

// MakeCache creates a new cache.
func MakeCache() Cache {
return Cache{
preds: make(map[*func(*Envelope) bool]struct{}),
}
}

// Cache is a message cache. The default value is a valid empty cache.
func (c *Cache) Cache(ctx context.Context, p Predicate) {
c.preds = append(c.preds, ctxPredicate{ctx, p})
func (c *Cache) Cache(p *Predicate) {
c.preds[p] = struct{}{}
}

// Release releases the cache predicate.
func (c *Cache) Release(p *Predicate) {
delete(c.preds, p)
}

// Put puts the message into the cache if it matches any active predicate.
// If it matches several predicates, it is still only added once to the cache.
func (c *Cache) Put(e *Envelope) bool {
// we filter the predicates for non-active and lazily remove them
preds := c.preds[:0]
any := false
for _, p := range c.preds {
select {
case <-p.ctx.Done():
continue // skip done predicate
default:
preds = append(preds, p)
}

any = any || p.p(e)
for p := range c.preds {
any = any || (*p)(e)
}

if any {
c.msgs = append(c.msgs, e)
}

c.preds = preds
return any
}

Expand Down
Loading

0 comments on commit 39553f9

Please sign in to comment.