Skip to content

Commit

Permalink
Merge pull request #8497 from ziggie1984/shutdown-bugfix
Browse files Browse the repository at this point in the history
routing: shutdown chanrouter correctly.
  • Loading branch information
Roasbeef authored Aug 1, 2024
2 parents e3dd886 + 0adcb5c commit 4a3c4e4
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 82 deletions.
7 changes: 6 additions & 1 deletion chainntnfs/bitcoindnotify/bitcoind.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ func (b *BitcoindNotifier) Stop() error {

close(epochClient.epochChan)
}
b.txNotifier.TearDown()

// The txNotifier is only initialized in the start method therefore we
// need to make sure we don't access a nil pointer here.
if b.txNotifier != nil {
b.txNotifier.TearDown()
}

// Stop the mempool notifier.
b.memNotifier.TearDown()
Expand Down
7 changes: 6 additions & 1 deletion chainntnfs/neutrinonotify/neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ func (n *NeutrinoNotifier) Stop() error {

close(epochClient.epochChan)
}
n.txNotifier.TearDown()

// The txNotifier is only initialized in the start method therefore we
// need to make sure we don't access a nil pointer here.
if n.txNotifier != nil {
n.txNotifier.TearDown()
}

return nil
}
Expand Down
32 changes: 28 additions & 4 deletions chanfitness/chaneventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ package chanfitness

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/wire"
Expand Down Expand Up @@ -48,6 +50,9 @@ var (
// ChannelEventStore maintains a set of event logs for the node's channels to
// provide insight into the performance and health of channels.
type ChannelEventStore struct {
started atomic.Bool
stopped atomic.Bool

cfg *Config

// peers tracks all of our currently monitored peers and their channels.
Expand Down Expand Up @@ -142,7 +147,11 @@ func NewChannelEventStore(config *Config) *ChannelEventStore {
// information from the store. If this function fails, it cancels its existing
// subscriptions and returns an error.
func (c *ChannelEventStore) Start() error {
log.Info("ChannelEventStore starting")
log.Info("ChannelEventStore starting...")

if c.started.Swap(true) {
return fmt.Errorf("ChannelEventStore started more than once")
}

// Create a subscription to channel events.
channelClient, err := c.cfg.SubscribeChannelEvents()
Expand Down Expand Up @@ -198,21 +207,36 @@ func (c *ChannelEventStore) Start() error {
cancel: cancel,
})

log.Debug("ChannelEventStore started")

return nil
}

// Stop terminates all goroutines started by the event store.
func (c *ChannelEventStore) Stop() {
func (c *ChannelEventStore) Stop() error {
log.Info("ChannelEventStore shutting down...")
defer log.Debug("ChannelEventStore shutdown complete")

if c.stopped.Swap(true) {
return fmt.Errorf("ChannelEventStore stopped more than once")
}

// Stop the consume goroutine.
close(c.quit)
c.wg.Wait()

// Stop the ticker after the goroutine reading from it has exited, to
// avoid a race.
c.cfg.FlapCountTicker.Stop()
var err error
if c.cfg.FlapCountTicker == nil {
err = fmt.Errorf("ChannelEventStore FlapCountTicker not " +
"initialized")
} else {
c.cfg.FlapCountTicker.Stop()
}

log.Debugf("ChannelEventStore shutdown complete")

return err
}

// addChannel checks whether we are already tracking a channel's peer, creates a
Expand Down
7 changes: 6 additions & 1 deletion discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,12 @@ func (d *AuthenticatedGossiper) stop() {
log.Debug("Authenticated Gossiper is stopping")
defer log.Debug("Authenticated Gossiper stopped")

d.blockEpochs.Cancel()
// `blockEpochs` is only initialized in the start routine so we make
// sure we don't panic here in the case where the `Stop` method is
// called when the `Start` method does not complete.
if d.blockEpochs != nil {
d.blockEpochs.Cancel()
}

d.syncMgr.Stop()

Expand Down
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.18.3.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@

* [Fixed a bug](https://github.com/lightningnetwork/lnd/pull/8896) that caused
LND to use a default fee rate for the batch channel opening flow.

* [Fixed](https://github.com/lightningnetwork/lnd/pull/8497) a case where LND
would not shut down properly when interrupted via e.g. SIGTERM. Moreover, LND
now shutsdown correctly in case one subsystem fails to startup.

* The fee limit for payments [was made
compatible](https://github.com/lightningnetwork/lnd/pull/8941) with inbound
Expand Down
5 changes: 4 additions & 1 deletion graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ func (b *Builder) Start() error {
b.wg.Add(1)
go b.networkHandler()

log.Debug("Builder started")

return nil
}

Expand All @@ -312,7 +314,6 @@ func (b *Builder) Stop() error {
}

log.Info("Builder shutting down...")
defer log.Debug("Builder shutdown complete")

// Our filtered chain view could've only been started if
// AssumeChannelValid isn't present.
Expand All @@ -325,6 +326,8 @@ func (b *Builder) Stop() error {
close(b.quit)
b.wg.Wait()

log.Debug("Builder shutdown complete")

return nil
}

Expand Down
26 changes: 25 additions & 1 deletion htlcswitch/interceptable_switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/sha256"
"fmt"
"sync"
"sync/atomic"

"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
Expand Down Expand Up @@ -33,6 +34,9 @@ var (
// Settle - routes UpdateFulfillHTLC to the originating link.
// Fail - routes UpdateFailHTLC to the originating link.
type InterceptableSwitch struct {
started atomic.Bool
stopped atomic.Bool

// htlcSwitch is the underline switch
htlcSwitch *Switch

Expand Down Expand Up @@ -201,6 +205,12 @@ func (s *InterceptableSwitch) SetInterceptor(
}

func (s *InterceptableSwitch) Start() error {
log.Info("InterceptableSwitch starting...")

if s.started.Swap(true) {
return fmt.Errorf("InterceptableSwitch started more than once")
}

blockEpochStream, err := s.notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
Expand All @@ -217,14 +227,28 @@ func (s *InterceptableSwitch) Start() error {
}
}()

log.Debug("InterceptableSwitch started")

return nil
}

func (s *InterceptableSwitch) Stop() error {
log.Info("InterceptableSwitch shutting down...")

if s.stopped.Swap(true) {
return fmt.Errorf("InterceptableSwitch stopped more than once")
}

close(s.quit)
s.wg.Wait()

s.blockEpochStream.Cancel()
// We need to check whether the start routine run and initialized the
// `blockEpochStream`.
if s.blockEpochStream != nil {
s.blockEpochStream.Cancel()
}

log.Debug("InterceptableSwitch shutdown complete")

return nil
}
Expand Down
14 changes: 9 additions & 5 deletions htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,14 +572,18 @@ func (l *channelLink) Stop() {
}

// Ensure the channel for the timer is drained.
if !l.updateFeeTimer.Stop() {
select {
case <-l.updateFeeTimer.C:
default:
if l.updateFeeTimer != nil {
if !l.updateFeeTimer.Stop() {
select {
case <-l.updateFeeTimer.C:
default:
}
}
}

l.hodlQueue.Stop()
if l.hodlQueue != nil {
l.hodlQueue.Stop()
}

close(l.quit)
l.wg.Wait()
Expand Down
53 changes: 40 additions & 13 deletions invoices/invoiceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
// created by the daemon. The registry is a thin wrapper around a map in order
// to ensure that all updates/reads are thread safe.
type InvoiceRegistry struct {
started atomic.Bool
stopped atomic.Bool

sync.RWMutex

nextClientID uint32 // must be used atomically
Expand Down Expand Up @@ -213,42 +216,66 @@ func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {

// Start starts the registry and all goroutines it needs to carry out its task.
func (i *InvoiceRegistry) Start() error {
// Start InvoiceExpiryWatcher and prepopulate it with existing active
// invoices.
err := i.expiryWatcher.Start(func(hash lntypes.Hash, force bool) error {
return i.cancelInvoiceImpl(context.Background(), hash, force)
})
var err error

log.Info("InvoiceRegistry starting...")

if i.started.Swap(true) {
return fmt.Errorf("InvoiceRegistry started more than once")
}
// Start InvoiceExpiryWatcher and prepopulate it with existing
// active invoices.
err = i.expiryWatcher.Start(
func(hash lntypes.Hash, force bool) error {
return i.cancelInvoiceImpl(
context.Background(), hash, force,
)
})
if err != nil {
return err
}

log.Info("InvoiceRegistry starting")

i.wg.Add(1)
go i.invoiceEventLoop()

// Now scan all pending and removable invoices to the expiry watcher or
// delete them.
// Now scan all pending and removable invoices to the expiry
// watcher or delete them.
err = i.scanInvoicesOnStart(context.Background())
if err != nil {
_ = i.Stop()
return err
}

return nil
log.Debug("InvoiceRegistry started")

return err
}

// Stop signals the registry for a graceful shutdown.
func (i *InvoiceRegistry) Stop() error {
log.Info("InvoiceRegistry shutting down...")

if i.stopped.Swap(true) {
return fmt.Errorf("InvoiceRegistry stopped more than once")
}

log.Info("InvoiceRegistry shutting down...")
defer log.Debug("InvoiceRegistry shutdown complete")

i.expiryWatcher.Stop()
var err error
if i.expiryWatcher == nil {
err = fmt.Errorf("InvoiceRegistry expiryWatcher is not " +
"initialized")
} else {
i.expiryWatcher.Stop()
}

close(i.quit)

i.wg.Wait()
return nil

log.Debug("InvoiceRegistry shutdown complete")

return err
}

// invoiceEvent represents a new event that has modified on invoice on disk.
Expand Down
28 changes: 25 additions & 3 deletions lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,11 +694,33 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
bestHeight)

// With all the relevant chains initialized, we can finally start the
// server itself.
if err := server.Start(); err != nil {
// server itself. We start the server in an asynchronous goroutine so
// that we are able to interrupt and shutdown the daemon gracefully in
// case the startup of the subservers do not behave as expected.
errChan := make(chan error)
go func() {
errChan <- server.Start()
}()

defer func() {
err := server.Stop()
if err != nil {
ltndLog.Warnf("Stopping the server including all "+
"its subsystems failed with %v", err)
}
}()

select {
case err := <-errChan:
if err == nil {
break
}

return mkErr("unable to start server: %v", err)

case <-interceptor.ShutdownChannel():
return nil
}
defer server.Stop()

// We transition the server state to Active, as the server is up.
interceptorChain.SetServerActive()
Expand Down
4 changes: 3 additions & 1 deletion lnwallet/chainfee/estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,9 @@ func (w *WebAPIEstimator) Stop() error {
return nil
}

w.updateFeeTicker.Stop()
if w.updateFeeTicker != nil {
w.updateFeeTicker.Stop()
}

close(w.quit)
w.wg.Wait()
Expand Down
Loading

0 comments on commit 4a3c4e4

Please sign in to comment.