diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 473ed99d73..df5142a242 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -824,14 +824,11 @@ func (d *AuthenticatedGossiper) stop() { // then added to a queue for batched trickled announcement to all connected // peers. Remote channel announcements should contain the announcement proof // and be fully validated. -func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, - peer lnpeer.Peer) chan error { +func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context, + msg lnwire.Message, peer lnpeer.Peer) chan error { errChan := make(chan error, 1) - // TODO(elle): let ProcessRemoteAnnouncement take a context. - ctx := context.TODO() - // If gossip syncing has been disabled, we expect not to receive any // gossip queries from our peer. if d.cfg.NoGossipSync { diff --git a/peer/brontide.go b/peer/brontide.go index 1f356367d1..bf10bdde53 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -1595,7 +1595,7 @@ type msgStream struct { peer *Brontide - apply func(lnwire.Message) + apply func(context.Context, lnwire.Message) startMsg string stopMsg string @@ -1607,8 +1607,9 @@ type msgStream struct { producerSema chan struct{} - wg sync.WaitGroup - quit chan struct{} + wg sync.WaitGroup + quit chan struct{} + cancel func() } // newMsgStream creates a new instance of a chanMsgStream for a particular @@ -1617,7 +1618,7 @@ type msgStream struct { // sane value that avoids blocking unnecessarily, but doesn't allow an // unbounded amount of memory to be allocated to buffer incoming messages. func newMsgStream(p *Brontide, startMsg, stopMsg string, bufSize uint32, - apply func(lnwire.Message)) *msgStream { + apply func(context.Context, lnwire.Message)) *msgStream { stream := &msgStream{ peer: p, @@ -1642,14 +1643,21 @@ func newMsgStream(p *Brontide, startMsg, stopMsg string, bufSize uint32, // Start starts the chanMsgStream. func (ms *msgStream) Start() { + ctx, cancel := context.WithCancel(context.Background()) + ms.cancel = cancel + ms.wg.Add(1) - go ms.msgConsumer() + go ms.msgConsumer(ctx) } // Stop stops the chanMsgStream. func (ms *msgStream) Stop() { // TODO(roasbeef): signal too? + if ms.cancel != nil { + ms.cancel() + } + close(ms.quit) // Now that we've closed the channel, we'll repeatedly signal the msg @@ -1664,7 +1672,7 @@ func (ms *msgStream) Stop() { // msgConsumer is the main goroutine that streams messages from the peer's // readHandler directly to the target channel. -func (ms *msgStream) msgConsumer() { +func (ms *msgStream) msgConsumer(ctx context.Context) { defer ms.wg.Done() defer peerLog.Tracef(ms.stopMsg) defer atomic.StoreInt32(&ms.streamShutdown, 1) @@ -1701,7 +1709,7 @@ func (ms *msgStream) msgConsumer() { ms.msgCond.L.Unlock() - ms.apply(msg) + ms.apply(ctx, msg) // We've just successfully processed an item, so we'll signal // to the producer that a new slot in the buffer. We'll use @@ -1819,7 +1827,7 @@ func waitUntilLinkActive(p *Brontide, func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream { var chanLink htlcswitch.ChannelUpdateHandler - apply := func(msg lnwire.Message) { + apply := func(_ context.Context, msg lnwire.Message) { // This check is fine because if the link no longer exists, it will // be removed from the activeChannels map and subsequent messages // shouldn't reach the chan msg stream. @@ -1858,10 +1866,10 @@ func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream { // authenticated gossiper. This stream should be used to forward all remote // channel announcements. func newDiscMsgStream(p *Brontide) *msgStream { - apply := func(msg lnwire.Message) { + apply := func(ctx context.Context, msg lnwire.Message) { // TODO(yy): `ProcessRemoteAnnouncement` returns an error chan // and we need to process it. - p.cfg.AuthGossiper.ProcessRemoteAnnouncement(msg, p) + p.cfg.AuthGossiper.ProcessRemoteAnnouncement(ctx, msg, p) } return newMsgStream(