diff --git a/go.mod b/go.mod index 8ed20be325..5f4219afa7 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb github.com/lightningnetwork/lnd/cert v1.2.2 github.com/lightningnetwork/lnd/clock v1.1.1 - github.com/lightningnetwork/lnd/fn/v2 v2.0.4 + github.com/lightningnetwork/lnd/fn/v2 v2.0.8 github.com/lightningnetwork/lnd/healthcheck v1.2.6 github.com/lightningnetwork/lnd/kvdb v1.4.12 github.com/lightningnetwork/lnd/queue v1.1.1 diff --git a/go.sum b/go.sum index 0433b87e57..6e744d6799 100644 --- a/go.sum +++ b/go.sum @@ -456,8 +456,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U= github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0= github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ= -github.com/lightningnetwork/lnd/fn/v2 v2.0.4 h1:DiC/AEa7DhnY4qOEQBISu1cp+1+51LjbVDzNLVBwNjI= -github.com/lightningnetwork/lnd/fn/v2 v2.0.4/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= +github.com/lightningnetwork/lnd/fn/v2 v2.0.8 h1:r2SLz7gZYQPVc3IZhU82M66guz3Zk2oY+Rlj9QN5S3g= +github.com/lightningnetwork/lnd/fn/v2 v2.0.8/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI= github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ= github.com/lightningnetwork/lnd/kvdb v1.4.12 h1:Y0WY5Tbjyjn6eCYh068qkWur5oFtioJlfxc8w5SlJeQ= diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 6e67e87def..3b705fc5cb 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -2,6 +2,7 @@ package htlcswitch import ( "bytes" + "context" crand "crypto/rand" "crypto/sha256" "errors" @@ -408,10 +409,10 @@ type channelLink struct { // the result. quiescenceReqs chan StfuReq - // ContextGuard is a helper that encapsulates a wait group and quit - // channel and allows contexts that either block or cancel on those - // depending on the use case. - *fn.ContextGuard + // cg is a helper that encapsulates a wait group and quit channel and + // allows contexts that either block or cancel on those depending on + // the use case. + cg *fn.ContextGuard } // hookMap is a data structure that is used to track the hooks that need to be @@ -517,7 +518,7 @@ func NewChannelLink(cfg ChannelLinkConfig, incomingCommitHooks: newHookMap(), quiescer: qsm, quiescenceReqs: quiescenceReqs, - ContextGuard: fn.NewContextGuard(), + cg: fn.NewContextGuard(), } } @@ -596,8 +597,8 @@ func (l *channelLink) Start() error { l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout()) - l.Wg.Add(1) - go l.htlcManager() + l.cg.WgAdd(1) + go l.htlcManager(context.TODO()) return nil } @@ -636,8 +637,8 @@ func (l *channelLink) Stop() { l.hodlQueue.Stop() } - close(l.Quit) - l.Wg.Wait() + l.cg.Quit() + l.cg.WgWait() // Now that the htlcManager has completely exited, reset the packet // courier. This allows the mailbox to revaluate any lingering Adds that @@ -662,7 +663,7 @@ func (l *channelLink) Stop() { // WaitForShutdown blocks until the link finishes shutting down, which includes // termination of all dependent goroutines. func (l *channelLink) WaitForShutdown() { - l.Wg.Wait() + l.cg.WgWait() } // EligibleToForward returns a bool indicating if the channel is able to @@ -740,7 +741,7 @@ func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool { func (l *channelLink) OnFlushedOnce(hook func()) { select { case l.flushHooks.newTransients <- hook: - case <-l.Quit: + case <-l.cg.Done(): } } @@ -759,7 +760,7 @@ func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) { select { case queue <- hook: - case <-l.Quit: + case <-l.cg.Done(): } } @@ -777,7 +778,7 @@ func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] { select { case l.quiescenceReqs <- req: - case <-l.Quit: + case <-l.cg.Done(): req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown)) } @@ -887,7 +888,7 @@ func (l *channelLink) createFailureWithUpdate(incoming bool, // This method is to be called upon reconnection after the initial funding // flow. We'll compare out commitment chains with the remote party, and re-send // either a danging commit signature, a revocation, or both. -func (l *channelLink) syncChanStates() error { +func (l *channelLink) syncChanStates(ctx context.Context) error { chanState := l.channel.State() l.log.Infof("Attempting to re-synchronize channel: %v", chanState) @@ -989,7 +990,7 @@ func (l *channelLink) syncChanStates() error { // We've just received a ChanSync message from the remote // party, so we'll process the message in order to determine // if we need to re-transmit any messages to the remote party. - ctx, cancel := l.WithCtxQuitNoTimeout() + ctx, cancel := l.cg.Create(ctx) defer cancel() msgsToReSend, openedCircuits, closedCircuits, err = l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg) @@ -1021,7 +1022,7 @@ func (l *channelLink) syncChanStates() error { l.cfg.Peer.SendMessage(false, msg) } - case <-l.Quit: + case <-l.cg.Done(): return ErrLinkShuttingDown } @@ -1033,7 +1034,7 @@ func (l *channelLink) syncChanStates() error { // we previously received are reinstated in memory, and forwarded to the switch // if necessary. After a restart, this will also delete any previously // completed packages. -func (l *channelLink) resolveFwdPkgs() error { +func (l *channelLink) resolveFwdPkgs(ctx context.Context) error { fwdPkgs, err := l.channel.LoadFwdPkgs() if err != nil { return err @@ -1050,7 +1051,7 @@ func (l *channelLink) resolveFwdPkgs() error { // If any of our reprocessing steps require an update to the commitment // txn, we initiate a state transition to capture all relevant changes. if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 { - return l.updateCommitTx() + return l.updateCommitTx(ctx) } return nil @@ -1111,7 +1112,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error { // // NOTE: This MUST be run as a goroutine. func (l *channelLink) fwdPkgGarbager() { - defer l.Wg.Done() + defer l.cg.WgDone() l.cfg.FwdPkgGCTicker.Resume() defer l.cfg.FwdPkgGCTicker.Stop() @@ -1128,7 +1129,7 @@ func (l *channelLink) fwdPkgGarbager() { err) continue } - case <-l.Quit: + case <-l.cg.Done(): return } } @@ -1248,10 +1249,12 @@ func (l *channelLink) handleChanSyncErr(err error) { // and also the htlc trickle queue+timer for this active channels. // // NOTE: This MUST be run as a goroutine. -func (l *channelLink) htlcManager() { +// +//nolint:funlen +func (l *channelLink) htlcManager(ctx context.Context) { defer func() { l.cfg.BatchTicker.Stop() - l.Wg.Done() + l.cg.WgDone() l.log.Infof("exited") }() @@ -1271,7 +1274,7 @@ func (l *channelLink) htlcManager() { // re-synchronize state with the remote peer. settledHtlcs is a map of // HTLC's that we re-settled as part of the channel state sync. if l.cfg.SyncStates { - err := l.syncChanStates() + err := l.syncChanStates(ctx) if err != nil { l.handleChanSyncErr(err) return @@ -1322,7 +1325,7 @@ func (l *channelLink) htlcManager() { // the channel is not pending, otherwise we should have no htlcs to // reforward. if l.ShortChanID() != hop.Source { - err := l.resolveFwdPkgs() + err := l.resolveFwdPkgs(ctx) switch err { // No error was encountered, success. case nil: @@ -1345,7 +1348,7 @@ func (l *channelLink) htlcManager() { // With our link's in-memory state fully reconstructed, spawn a // goroutine to manage the reclamation of disk space occupied by // completed forwarding packages. - l.Wg.Add(1) + l.cg.WgAdd(1) go l.fwdPkgGarbager() } @@ -1447,7 +1450,8 @@ func (l *channelLink) htlcManager() { // If we do, then we'll send a new UpdateFee message to // the remote party, to be locked in with a new update. - if err := l.updateChannelFee(newCommitFee); err != nil { + err = l.updateChannelFee(ctx, newCommitFee) + if err != nil { l.log.Errorf("unable to update fee rate: %v", err) continue @@ -1475,7 +1479,7 @@ func (l *channelLink) htlcManager() { // including all the currently pending entries. If the // send was unsuccessful, then abandon the update, // waiting for the revocation window to open up. - if !l.updateCommitTxOrFail() { + if !l.updateCommitTxOrFail(ctx) { return } @@ -1493,19 +1497,19 @@ func (l *channelLink) htlcManager() { // that the link is an intermediate hop in a multi-hop HTLC // circuit. case pkt := <-l.downstream: - l.handleDownstreamPkt(pkt) + l.handleDownstreamPkt(ctx, pkt) // A message from the connected peer was just received. This // indicates that we have a new incoming HTLC, either directly // for us, or part of a multi-hop HTLC circuit. case msg := <-l.upstream: - l.handleUpstreamMsg(msg) + l.handleUpstreamMsg(ctx, msg) // A htlc resolution is received. This means that we now have a // resolution for a previously accepted htlc. case hodlItem := <-l.hodlQueue.ChanOut(): htlcResolution := hodlItem.(invoices.HtlcResolution) - err := l.processHodlQueue(htlcResolution) + err := l.processHodlQueue(ctx, htlcResolution) switch err { // No error, success. case nil: @@ -1543,7 +1547,7 @@ func (l *channelLink) htlcManager() { } } - case <-l.Quit: + case <-l.cg.Done(): return } } @@ -1552,7 +1556,7 @@ func (l *channelLink) htlcManager() { // processHodlQueue processes a received htlc resolution and continues reading // from the hodl queue until no more resolutions remain. When this function // returns without an error, the commit tx should be updated. -func (l *channelLink) processHodlQueue( +func (l *channelLink) processHodlQueue(ctx context.Context, firstResolution invoices.HtlcResolution) error { // Try to read all waiting resolution messages, so that they can all be @@ -1584,7 +1588,7 @@ loop: } // Update the commitment tx. - if err := l.updateCommitTx(); err != nil { + if err := l.updateCommitTx(ctx); err != nil { return err } @@ -1671,7 +1675,9 @@ func (l *channelLink) randomFeeUpdateTimeout() time.Duration { // handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the // downstream HTLC Switch. -func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { +func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context, + pkt *htlcPacket) error { + htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) if !ok { return errors.New("not an UpdateAddHTLC packet") @@ -1775,7 +1781,7 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { getEventType(pkt), ) - l.tryBatchUpdateCommitTx() + l.tryBatchUpdateCommitTx(ctx) return nil } @@ -1786,7 +1792,9 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { // cleared HTLCs with the upstream peer. // // TODO(roasbeef): add sync ntfn to ensure switch always has consistent view? -func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { +func (l *channelLink) handleDownstreamPkt(ctx context.Context, + pkt *htlcPacket) { + if pkt.htlc.MsgType().IsChannelUpdate() && !l.quiescer.CanSendUpdates() { @@ -1800,7 +1808,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { case *lnwire.UpdateAddHTLC: // Handle add message. The returned error can be ignored, // because it is also sent through the mailbox. - _ = l.handleDownstreamUpdateAdd(pkt) + _ = l.handleDownstreamUpdateAdd(ctx, pkt) case *lnwire.UpdateFulfillHTLC: // If hodl.SettleOutgoing mode is active, we exit early to @@ -1867,7 +1875,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { ) // Immediately update the commitment tx to minimize latency. - l.updateCommitTxOrFail() + l.updateCommitTxOrFail(ctx) case *lnwire.UpdateFailHTLC: // If hodl.FailOutgoing mode is active, we exit early to @@ -1957,19 +1965,19 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { } // Immediately update the commitment tx to minimize latency. - l.updateCommitTxOrFail() + l.updateCommitTxOrFail(ctx) } } // tryBatchUpdateCommitTx updates the commitment transaction if the batch is // full. -func (l *channelLink) tryBatchUpdateCommitTx() { +func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) { pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) if pending < uint64(l.cfg.BatchSize) { return } - l.updateCommitTxOrFail() + l.updateCommitTxOrFail(ctx) } // cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef @@ -2039,7 +2047,9 @@ func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) { // handleUpstreamMsg processes wire messages related to commitment state // updates from the upstream peer. The upstream peer is the peer whom we have a // direct channel with, updating our respective commitment chains. -func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { +// +//nolint:funlen +func (l *channelLink) handleUpstreamMsg(ctx context.Context, msg lnwire.Message) { // First check if the message is an update and we are capable of // receiving updates right now. if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() { @@ -2418,7 +2428,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } select { - case <-l.Quit: + case <-l.cg.Done(): return default: } @@ -2430,7 +2440,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // reply with a signature as both sides already have a // commitment with the latest accepted. if l.channel.OweCommitment() { - if !l.updateCommitTxOrFail() { + if !l.updateCommitTxOrFail(ctx) { return } } @@ -2488,7 +2498,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } select { - case <-l.Quit: + case <-l.cg.Done(): return default: } @@ -2542,7 +2552,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // but there are still remote updates that are not in the remote // commit tx yet, send out an update. if l.channel.OweCommitment() { - if !l.updateCommitTxOrFail() { + if !l.updateCommitTxOrFail(ctx) { return } } @@ -2732,8 +2742,8 @@ func (l *channelLink) ackDownStreamPackets() error { // updateCommitTxOrFail updates the commitment tx and if that fails, it fails // the link. -func (l *channelLink) updateCommitTxOrFail() bool { - err := l.updateCommitTx() +func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool { + err := l.updateCommitTx(ctx) switch err { // No error encountered, success. case nil: @@ -2759,7 +2769,7 @@ func (l *channelLink) updateCommitTxOrFail() bool { // updateCommitTx signs, then sends an update to the remote peer adding a new // commitment to their commitment chain which includes all the latest updates // we've received+processed up to this point. -func (l *channelLink) updateCommitTx() error { +func (l *channelLink) updateCommitTx(ctx context.Context) error { // Preemptively write all pending keystones to disk, just in case the // HTLCs we have in memory are included in the subsequent attempt to // sign a commitment state. @@ -2782,7 +2792,7 @@ func (l *channelLink) updateCommitTx() error { return nil } - ctx, done := l.WithCtxQuitNoTimeout() + ctx, done := l.cg.Create(ctx) defer done() newCommit, err := l.channel.SignNextCommitment(ctx) @@ -2822,7 +2832,7 @@ func (l *channelLink) updateCommitTx() error { } select { - case <-l.Quit: + case <-l.cg.Done(): return ErrLinkShuttingDown default: } @@ -3529,7 +3539,7 @@ func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error { // NOTE: Part of the ChannelLink interface. func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { select { - case <-l.Quit: + case <-l.cg.Done(): // Return early if the link is already in the process of // quitting. It doesn't make sense to hand the message to the // mailbox here. @@ -3545,7 +3555,9 @@ func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { // updateChannelFee updates the commitment fee-per-kw on this channel by // committing to an update_fee message. -func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error { +func (l *channelLink) updateChannelFee(ctx context.Context, + feePerKw chainfee.SatPerKWeight) error { + l.log.Infof("updating commit fee to %v", feePerKw) // We skip sending the UpdateFee message if the channel is not @@ -3583,7 +3595,8 @@ func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error { if err := l.cfg.Peer.SendMessage(false, msg); err != nil { return err } - return l.updateCommitTx() + + return l.updateCommitTx(ctx) } // processRemoteSettleFails accepts a batch of settle/fail payment descriptors @@ -4290,7 +4303,7 @@ func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) { filteredPkts = append(filteredPkts, pkt) } - err := l.cfg.ForwardPackets(l.Quit, replay, filteredPkts...) + err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...) if err != nil { log.Errorf("Unhandled error while reforwarding htlc "+ "settle/fail over htlcswitch: %v", err) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 1747105597..cdeae6d812 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -2257,7 +2257,7 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt, for { select { case <-notifyUpdateChan: - case <-chanLink.Quit: + case <-chanLink.cg.Done(): close(doneChan) return } @@ -2326,7 +2326,7 @@ func handleStateUpdate(link *channelLink, } link.HandleChannelUpdate(remoteRev) - ctx, done := link.WithCtxQuitNoTimeout() + ctx, done := link.cg.Create(context.Background()) defer done() remoteSigs, err := remoteChannel.SignNextCommitment(ctx) @@ -2372,7 +2372,7 @@ func updateState(batchTick chan time.Time, link *channelLink, // Trigger update by ticking the batchTicker. select { case batchTick <- time.Now(): - case <-link.Quit: + case <-link.cg.Done(): return fmt.Errorf("link shutting down") } return handleStateUpdate(link, remoteChannel) @@ -2380,7 +2380,7 @@ func updateState(batchTick chan time.Time, link *channelLink, // The remote is triggering the state update, emulate this by // signing and sending CommitSig to the link. - ctx, done := link.WithCtxQuitNoTimeout() + ctx, done := link.cg.Create(context.Background()) defer done() remoteSigs, err := remoteChannel.SignNextCommitment(ctx) @@ -4946,7 +4946,7 @@ func (h *persistentLinkHarness) restartLink( for { select { case <-notifyUpdateChan: - case <-chanLink.Quit: + case <-chanLink.cg.Done(): close(doneChan) return } @@ -5932,7 +5932,9 @@ func TestChannelLinkFail(t *testing.T) { // Sign a commitment that will include // signature for the HTLC just sent. - quitCtx, done := c.WithCtxQuitNoTimeout() + quitCtx, done := c.cg.Create( + context.Background(), + ) defer done() sigs, err := remoteChannel.SignNextCommitment( @@ -5979,7 +5981,9 @@ func TestChannelLinkFail(t *testing.T) { // Sign a commitment that will include // signature for the HTLC just sent. - quitCtx, done := c.WithCtxQuitNoTimeout() + quitCtx, done := c.cg.Create( + context.Background(), + ) defer done() sigs, err := remoteChannel.SignNextCommitment( diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 2123465884..43902dc336 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -1190,7 +1190,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, for { select { case <-notifyUpdateChan: - case <-chanLink.Quit: + case <-chanLink.cg.Done(): close(doneChan) return } diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 45267e0966..d78563e95a 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -145,7 +145,7 @@ type StateMachine[Event any, Env Environment] struct { // query the internal state machine state. stateQuery chan stateQuery[Event, Env] - wg *fn.GoroutineManager + gm fn.GoroutineManager quit chan struct{} startOnce sync.Once @@ -206,7 +206,7 @@ func NewStateMachine[Event any, Env Environment]( ), events: make(chan Event, 1), stateQuery: make(chan stateQuery[Event, Env]), - wg: fn.NewGoroutineManager(), + gm: *fn.NewGoroutineManager(), newStateEvents: fn.NewEventDistributor[State[Event, Env]](), quit: make(chan struct{}), } @@ -216,7 +216,7 @@ func NewStateMachine[Event any, Env Environment]( // the state machine to completion. func (s *StateMachine[Event, Env]) Start(ctx context.Context) { s.startOnce.Do(func() { - _ = s.wg.Go(ctx, func(ctx context.Context) { + _ = s.gm.Go(ctx, func(ctx context.Context) { s.driveMachine(ctx) }) }) @@ -227,7 +227,7 @@ func (s *StateMachine[Event, Env]) Start(ctx context.Context) { func (s *StateMachine[Event, Env]) Stop() { s.stopOnce.Do(func() { close(s.quit) - s.wg.Stop() + s.gm.Stop() }) } @@ -335,8 +335,6 @@ func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[ // executeDaemonEvent executes a daemon event, which is a special type of event // that can be emitted as part of the state transition function of the state // machine. An error is returned if the type of event is unknown. -// -//nolint:funlen func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, event DaemonEvent) error { @@ -360,7 +358,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, // If a post-send event was specified, then we'll funnel // that back into the main state machine now as well. return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll - launched := s.wg.Go( + launched := s.gm.Go( ctx, func(ctx context.Context) { s.log.DebugS(ctx, "Sending post-send event", "event", lnutils.SpewLogClosure(event)) @@ -386,7 +384,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, // Otherwise, this has a SendWhen predicate, so we'll need // launch a goroutine to poll the SendWhen, then send only once // the predicate is true. - launched := s.wg.Go(ctx, func(ctx context.Context) { + launched := s.gm.Go(ctx, func(ctx context.Context) { predicateTicker := time.NewTicker( s.cfg.CustomPollInterval.UnwrapOr(pollInterval), ) @@ -456,7 +454,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, return fmt.Errorf("unable to register spend: %w", err) } - launched := s.wg.Go(ctx, func(ctx context.Context) { + launched := s.gm.Go(ctx, func(ctx context.Context) { for { select { case spend, ok := <-spendEvent.Spend: @@ -502,7 +500,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, return fmt.Errorf("unable to register conf: %w", err) } - launched := s.wg.Go(ctx, func(ctx context.Context) { + launched := s.gm.Go(ctx, func(ctx context.Context) { for { select { case <-confEvent.Confirmed: @@ -674,7 +672,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) { return } - case <-s.wg.Done(): + case <-s.gm.Done(): return } } diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go index 1ff0217d9f..bda4986831 100644 --- a/protofsm/state_machine_test.go +++ b/protofsm/state_machine_test.go @@ -1,6 +1,7 @@ package protofsm import ( + "context" "encoding/hex" "fmt" "sync/atomic" @@ -14,7 +15,6 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "golang.org/x/net/context" ) type dummyEvents interface {