Skip to content

Commit

Permalink
contractcourt: start channel arbitrator in errGroup.
Browse files Browse the repository at this point in the history
There are cases where the startup of the channel arbitrators block
waiting for other subsystems e.g. taproot asset channels to collect
resolution information. Hence we make sure we start them concurrently.
  • Loading branch information
ziggie1984 committed Nov 9, 2024
1 parent ef17354 commit 9278afc
Showing 1 changed file with 55 additions and 4 deletions.
59 changes: 55 additions & 4 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package contractcourt

import (
"context"
"errors"
"fmt"
"sync"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
"golang.org/x/sync/errgroup"
)

// ErrChainArbExiting signals that the chain arbitrator is shutting down.
Expand Down Expand Up @@ -763,6 +765,14 @@ func (c *ChainArbitrator) Start() error {

// Launch all the goroutines for each arbitrator so they can carry out
// their duties.
// Set a timeout for the group of goroutines. Maybe have different
// timeouts for itests and normal operations.
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel() // Ensure resources are cleaned up

// Create an errgroup with the context
errGroup, ctx := errgroup.WithContext(ctx)

for _, arbitrator := range c.activeChannels {
startState, ok := startStates[arbitrator.cfg.ChanPoint]
if !ok {
Expand All @@ -771,12 +781,53 @@ func (c *ChainArbitrator) Start() error {
arbitrator.cfg.ChanPoint)
}

if err := arbitrator.Start(startState); err != nil {
stopAndLog()
return err
}
errGroup.Go(func() error {
// Create buffered error channel for this specific
// arbitrator
errChan := make(chan error, 1)

// Start arbitrator in a separate goroutine
go func() {
errChan <- arbitrator.Start(startState)
}()

// Wait for either the arbitrator to complete or the
// context to be cancelled/timing out.
select {
case err := <-errChan:
return err
case <-ctx.Done():
return ctx.Err()
}
})
}

// Wait for all arbitrators to start in a separate goroutine. We don't
// have to wait here for the chain arbitrator to start, because there
// might be situations where other subsystems will block the start up
// while fetching resolve information (e.g. custom channels.)
//
// NOTE: We do not add this collector to the waitGroup because we want
// to stop the chain arbitrator if there occurs an error.
go func() {
select {
// As soon as the context cancels we can be sure the
// errGroup has finished waiting.
case <-ctx.Done():
if err := errGroup.Wait(); err != nil {
log.Criticalf("error starting arbitrators: %v",
err)

stopAndLog()
}

case <-c.quit:
// Chain arbitrator is shutting down so we close the
// goroutine.
return
}
}()

// Subscribe to a single stream of block epoch notifications that we
// will dispatch to all active arbitrators.
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
Expand Down

0 comments on commit 9278afc

Please sign in to comment.