Skip to content

Commit

Permalink
🚨 Fix 'cyclop'.
Browse files Browse the repository at this point in the history
Signed-off-by: Oliver Tale-Yazdi <[email protected]>
  • Loading branch information
ggwpez committed Nov 18, 2021
1 parent b6e04a9 commit d73e3ad
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 33 deletions.
27 changes: 16 additions & 11 deletions backend/ethereum/channel/conclude.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,11 @@ func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.Adjudicat
subErr <- sub.Read(ctx, events)
}()

// In final Register calls, as the non-initiator, we optimistically wait for
// the other party to send the transaction first for
// `secondaryWaitBlocks + TxFinalityDepth` many blocks.
if req.Tx.IsFinal && req.Secondary {
waitBlocks := secondaryWaitBlocks + int(a.txFinalityDepth)
isConcluded, err := waitConcludedForNBlocks(waitCtx, a, events, waitBlocks)
if err != nil {
return err
} else if isConcluded {
return nil
}
concluded, err := a.waitConcludedSecondary(waitCtx, req, events)
if err != nil {
return errors.WithMessage(err, "waiting for secondary conclude")
} else if concluded {
return nil
}

// No conclude event found in the past, send transaction.
Expand Down Expand Up @@ -100,6 +94,17 @@ func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.Adjudicat
}
}

func (a *Adjudicator) waitConcludedSecondary(ctx context.Context, req channel.AdjudicatorReq, events chan *subscription.Event) (concluded bool, err error) {
// In final Register calls, as the non-initiator, we optimistically wait for
// the other party to send the transaction first for
// `secondaryWaitBlocks + TxFinalityDepth` many blocks.
if req.Tx.IsFinal && req.Secondary {
waitBlocks := secondaryWaitBlocks + int(a.txFinalityDepth)
return waitConcludedForNBlocks(ctx, a, events, waitBlocks)
}
return false, nil
}

func (a *Adjudicator) conclude(ctx context.Context, req channel.AdjudicatorReq, subStates channel.StateMap) error {
// If the on-chain state resulted from forced execution, we do not have a fully-signed state and cannot call concludeFinal.
forceExecuted, err := a.isForceExecuted(ctx, req.Params.ID())
Expand Down
41 changes: 19 additions & 22 deletions backend/ethereum/subscription/eventsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,9 @@ read2:
}

for _, log := range logs {
event := s.eFact()
if err := s.contract.UnpackLog(event.Data, event.Name, log); err != nil {
if err := s.processLog(ctx, log, sink); err != nil {
return err
}
event.Log = log

select {
case <-ctx.Done():
return ctx.Err()
case sink <- event:
case <-s.closer.Closed():
return nil
}
}
return nil
}
Expand All @@ -195,19 +185,9 @@ func (s *EventSub) readFuture(ctx context.Context, sink chan<- *Event) error {
for {
select {
case log := <-s.watchLogs:
event := s.eFact()
if err := s.contract.UnpackLog(event.Data, event.Name, log); err != nil {
if err := s.processLog(ctx, log, sink); err != nil {
return err
}
event.Log = log

select {
case <-ctx.Done():
return ctx.Err()
case sink <- event:
case <-s.closer.Closed():
return nil
}
case err := <-s.watchSub.Err():
err = cherrors.CheckIsChainNotReachableError(err)
return err
Expand All @@ -219,6 +199,23 @@ func (s *EventSub) readFuture(ctx context.Context, sink chan<- *Event) error {
}
}

func (s *EventSub) processLog(ctx context.Context, log types.Log, sink chan<- *Event) error {
event := s.eFact()
if err := s.contract.UnpackLog(event.Data, event.Name, log); err != nil {
return err
}
event.Log = log

select {
case <-ctx.Done():
return ctx.Err()
case sink <- event:
return nil
case <-s.closer.Closed():
return nil
}
}

// Close closes the sub and frees associated resources.
// Can be called more than once. Is thread safe.
func (s *EventSub) Close() {
Expand Down

0 comments on commit d73e3ad

Please sign in to comment.