Skip to content

Commit

Permalink
address TODO
Browse files Browse the repository at this point in the history
  • Loading branch information
ellemouton committed Nov 8, 2024
1 parent f283058 commit 9ab9912
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
7 changes: 2 additions & 5 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,14 +824,11 @@ func (d *AuthenticatedGossiper) stop() {
// then added to a queue for batched trickled announcement to all connected
// peers. Remote channel announcements should contain the announcement proof
// and be fully validated.
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
peer lnpeer.Peer) chan error {
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
msg lnwire.Message, peer lnpeer.Peer) chan error {

errChan := make(chan error, 1)

// TODO(elle): let ProcessRemoteAnnouncement take a context.
ctx := context.TODO()

// If gossip syncing has been disabled, we expect not to receive any
// gossip queries from our peer.
if d.cfg.NoGossipSync {
Expand Down
28 changes: 18 additions & 10 deletions peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,7 @@ type msgStream struct {

peer *Brontide

apply func(lnwire.Message)
apply func(context.Context, lnwire.Message)

startMsg string
stopMsg string
Expand All @@ -1607,8 +1607,9 @@ type msgStream struct {

producerSema chan struct{}

wg sync.WaitGroup
quit chan struct{}
wg sync.WaitGroup
quit chan struct{}
cancel func()
}

// newMsgStream creates a new instance of a chanMsgStream for a particular
Expand All @@ -1617,7 +1618,7 @@ type msgStream struct {
// sane value that avoids blocking unnecessarily, but doesn't allow an
// unbounded amount of memory to be allocated to buffer incoming messages.
func newMsgStream(p *Brontide, startMsg, stopMsg string, bufSize uint32,
apply func(lnwire.Message)) *msgStream {
apply func(context.Context, lnwire.Message)) *msgStream {

stream := &msgStream{
peer: p,
Expand All @@ -1642,14 +1643,21 @@ func newMsgStream(p *Brontide, startMsg, stopMsg string, bufSize uint32,

// Start starts the chanMsgStream.
func (ms *msgStream) Start() {
ctx, cancel := context.WithCancel(context.Background())
ms.cancel = cancel

ms.wg.Add(1)
go ms.msgConsumer()
go ms.msgConsumer(ctx)
}

// Stop stops the chanMsgStream.
func (ms *msgStream) Stop() {
// TODO(roasbeef): signal too?

if ms.cancel != nil {
ms.cancel()
}

close(ms.quit)

// Now that we've closed the channel, we'll repeatedly signal the msg
Expand All @@ -1664,7 +1672,7 @@ func (ms *msgStream) Stop() {

// msgConsumer is the main goroutine that streams messages from the peer's
// readHandler directly to the target channel.
func (ms *msgStream) msgConsumer() {
func (ms *msgStream) msgConsumer(ctx context.Context) {
defer ms.wg.Done()
defer peerLog.Tracef(ms.stopMsg)
defer atomic.StoreInt32(&ms.streamShutdown, 1)
Expand Down Expand Up @@ -1701,7 +1709,7 @@ func (ms *msgStream) msgConsumer() {

ms.msgCond.L.Unlock()

ms.apply(msg)
ms.apply(ctx, msg)

// We've just successfully processed an item, so we'll signal
// to the producer that a new slot in the buffer. We'll use
Expand Down Expand Up @@ -1819,7 +1827,7 @@ func waitUntilLinkActive(p *Brontide,
func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream {
var chanLink htlcswitch.ChannelUpdateHandler

apply := func(msg lnwire.Message) {
apply := func(_ context.Context, msg lnwire.Message) {
// This check is fine because if the link no longer exists, it will
// be removed from the activeChannels map and subsequent messages
// shouldn't reach the chan msg stream.
Expand Down Expand Up @@ -1858,10 +1866,10 @@ func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream {
// authenticated gossiper. This stream should be used to forward all remote
// channel announcements.
func newDiscMsgStream(p *Brontide) *msgStream {
apply := func(msg lnwire.Message) {
apply := func(ctx context.Context, msg lnwire.Message) {
// TODO(yy): `ProcessRemoteAnnouncement` returns an error chan
// and we need to process it.
p.cfg.AuthGossiper.ProcessRemoteAnnouncement(msg, p)
p.cfg.AuthGossiper.ProcessRemoteAnnouncement(ctx, msg, p)
}

return newMsgStream(
Expand Down

0 comments on commit 9ab9912

Please sign in to comment.