Skip to content

Commit

Permalink
discovery+graph: track job set dependencies in ValidationBarrier
Browse files Browse the repository at this point in the history
This commit does two things:
- removes the concept of allow / deny. Having this in place was a
  minor optimization and removing it makes the solution simpler.
- changes the job dependency tracking to track sets of abstact
  parent jobs rather than individual parent jobs.

As a note, the purpose of the ValidationBarrier is that it allows us
to launch gossip validation jobs in goroutines while still ensuring
that the validation order of these goroutines is adhered to when it
comes to validating ChannelAnnouncement _before_ ChannelUpdate and
_before_ NodeAnnouncement.
  • Loading branch information
Crypt-iQ committed Dec 20, 2024
1 parent 605ba38 commit c1e6765
Show file tree
Hide file tree
Showing 5 changed files with 495 additions and 135 deletions.
36 changes: 26 additions & 10 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,9 @@ type AuthenticatedGossiper struct {
// AuthenticatedGossiper lock.
chanUpdateRateLimiter map[uint64][2]*rate.Limiter

// vb is used to enforce job dependency ordering of gossip messages.
vb *graph.ValidationBarrier

sync.Mutex
}

Expand All @@ -537,6 +540,8 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
banman: newBanman(),
}

gossiper.vb = graph.NewValidationBarrier(1000, gossiper.quit)

gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
Expand Down Expand Up @@ -1398,10 +1403,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
log.Errorf("Unable to rebroadcast stale announcements: %v", err)
}

// We'll use this validation to ensure that we process jobs in their
// dependency order during parallel validation.
validationBarrier := graph.NewValidationBarrier(1000, d.quit)

for {
select {
// A new policy update has arrived. We'll commit it to the
Expand Down Expand Up @@ -1470,11 +1471,17 @@ func (d *AuthenticatedGossiper) networkHandler() {
// We'll set up any dependent, and wait until a free
// slot for this job opens up, this allow us to not
// have thousands of goroutines active.
validationBarrier.InitJobDependencies(announcement.msg)
annJobID, err := d.vb.InitJobDependencies(
announcement.msg,
)
if err != nil {
announcement.err <- err
continue
}

d.wg.Add(1)
go d.handleNetworkMessages(
announcement, &announcements, validationBarrier,
announcement, &announcements, annJobID,
)

// The trickle timer has ticked, which indicates we should
Expand Down Expand Up @@ -1525,18 +1532,18 @@ func (d *AuthenticatedGossiper) networkHandler() {
//
// NOTE: must be run as a goroutine.
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) {
deDuped *deDupedAnnouncements, jobID graph.JobID) {

defer d.wg.Done()
defer vb.CompleteJob()
defer d.vb.CompleteJob()

// We should only broadcast this message forward if it originated from
// us or it wasn't received as part of our initial historical sync.
shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()

// If this message has an existing dependency, then we'll wait until
// that has been fully validated before we proceed.
err := vb.WaitForDependants(nMsg.msg)
err := d.vb.WaitForParents(jobID, nMsg.msg)
if err != nil {
log.Debugf("Validating network message %s got err: %v",
nMsg.msg.MsgType(), err)
Expand Down Expand Up @@ -1566,7 +1573,16 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,

// If this message had any dependencies, then we can now signal them to
// continue.
vb.SignalDependants(nMsg.msg, allow)
err = d.vb.SignalDependents(nMsg.msg, jobID)
if err != nil {
// Something is wrong if SignalDependents returns an error.
log.Errorf("SignalDependents returned error for msg=%v with "+
"JobID=%v", spew.Sdump(nMsg.msg), jobID)

nMsg.err <- err

return
}

// If the announcement was accepted, then add the emitted announcements
// to our announce batch to be broadcast once the trickle timer ticks
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb
github.com/lightningnetwork/lnd/cert v1.2.2
github.com/lightningnetwork/lnd/clock v1.1.1
github.com/lightningnetwork/lnd/fn/v2 v2.0.2
github.com/lightningnetwork/lnd/fn/v2 v2.0.7
github.com/lightningnetwork/lnd/healthcheck v1.2.6
github.com/lightningnetwork/lnd/kvdb v1.4.12
github.com/lightningnetwork/lnd/queue v1.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf
github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U=
github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0=
github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ=
github.com/lightningnetwork/lnd/fn/v2 v2.0.2 h1:M7o2lYrh/zCp+lntPB3WP/rWTu5U+4ssyHW+kqNJ0fs=
github.com/lightningnetwork/lnd/fn/v2 v2.0.2/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s=
github.com/lightningnetwork/lnd/fn/v2 v2.0.7 h1:2LkgcGk20vXcUJyrlYLWMptnEouOBnCixskMsQW+GxU=
github.com/lightningnetwork/lnd/fn/v2 v2.0.7/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s=
github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI=
github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ=
github.com/lightningnetwork/lnd/kvdb v1.4.12 h1:Y0WY5Tbjyjn6eCYh068qkWur5oFtioJlfxc8w5SlJeQ=
Expand Down
Loading

0 comments on commit c1e6765

Please sign in to comment.