Skip to content

Commit

Permalink
Merge pull request lightningnetwork#8222 from ellemouton/wtclientStar…
Browse files Browse the repository at this point in the history
…tupPerf

wtclient+migration: start storing chan max height in channel details bucket
  • Loading branch information
yyforyongyu authored Nov 28, 2023
2 parents 9ce543f + 3642cb6 commit 80684ec
Show file tree
Hide file tree
Showing 15 changed files with 1,552 additions and 84 deletions.
3 changes: 3 additions & 0 deletions docs/release-notes/release-notes-0.18.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@
## Breaking Changes
## Performance Improvements

* Watchtower client DB migration to massively [improve the start-up
performance](https://github.com/lightningnetwork/lnd/pull/8222) of a client.

# Technical and Architectural Updates
## BOLT Spec Updates

Expand Down
87 changes: 26 additions & 61 deletions watchtower/wtclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet"
Expand Down Expand Up @@ -295,9 +296,8 @@ type TowerClient struct {

closableSessionQueue *sessionCloseMinHeap

backupMu sync.Mutex
summaries wtdb.ChannelSummaries
chanCommitHeights map[lnwire.ChannelID]uint64
backupMu sync.Mutex
chanInfos wtdb.ChannelInfos

statTicker *time.Ticker
stats *ClientStats
Expand Down Expand Up @@ -339,9 +339,7 @@ func New(config *Config) (*TowerClient, error) {

plog := build.NewPrefixLog(prefix, log)

// Load the sweep pkscripts that have been generated for all previously
// registered channels.
chanSummaries, err := cfg.DB.FetchChanSummaries()
chanInfos, err := cfg.DB.FetchChanInfos()
if err != nil {
return nil, err
}
Expand All @@ -358,9 +356,8 @@ func New(config *Config) (*TowerClient, error) {
cfg: cfg,
log: plog,
pipeline: queue,
chanCommitHeights: make(map[lnwire.ChannelID]uint64),
activeSessions: newSessionQueueSet(),
summaries: chanSummaries,
chanInfos: chanInfos,
closableSessionQueue: newSessionCloseMinHeap(),
statTicker: time.NewTicker(DefaultStatInterval),
stats: new(ClientStats),
Expand All @@ -369,44 +366,6 @@ func New(config *Config) (*TowerClient, error) {
quit: make(chan struct{}),
}

// perUpdate is a callback function that will be used to inspect the
// full set of candidate client sessions loaded from disk, and to
// determine the highest known commit height for each channel. This
// allows the client to reject backups that it has already processed for
// its active policy.
perUpdate := func(policy wtpolicy.Policy, chanID lnwire.ChannelID,
commitHeight uint64) {

// We only want to consider accepted updates that have been
// accepted under an identical policy to the client's current
// policy.
if policy != c.cfg.Policy {
return
}

c.backupMu.Lock()
defer c.backupMu.Unlock()

// Take the highest commit height found in the session's acked
// updates.
height, ok := c.chanCommitHeights[chanID]
if !ok || commitHeight > height {
c.chanCommitHeights[chanID] = commitHeight
}
}

perMaxHeight := func(s *wtdb.ClientSession, chanID lnwire.ChannelID,
height uint64) {

perUpdate(s.Policy, chanID, height)
}

perCommittedUpdate := func(s *wtdb.ClientSession,
u *wtdb.CommittedUpdate) {

perUpdate(s.Policy, u.BackupID.ChanID, u.BackupID.CommitHeight)
}

candidateTowers := newTowerListIterator()
perActiveTower := func(tower *Tower) {
// If the tower has already been marked as active, then there is
Expand All @@ -429,8 +388,6 @@ func New(config *Config) (*TowerClient, error) {
candidateSessions, err := getTowerAndSessionCandidates(
cfg.DB, cfg.SecretKeyRing, perActiveTower,
wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
wtdb.WithPerMaxHeight(perMaxHeight),
wtdb.WithPerCommittedUpdate(perCommittedUpdate),
wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
)
if err != nil {
Expand Down Expand Up @@ -594,7 +551,7 @@ func (c *TowerClient) Start() error {

// Iterate over the list of registered channels and check if
// any of them can be marked as closed.
for id := range c.summaries {
for id := range c.chanInfos {
isClosed, closedHeight, err := c.isChannelClosed(id)
if err != nil {
returnErr = err
Expand All @@ -615,7 +572,7 @@ func (c *TowerClient) Start() error {

// Since the channel has been marked as closed, we can
// also remove it from the channel summaries map.
delete(c.summaries, id)
delete(c.chanInfos, id)
}

// Load all closable sessions.
Expand Down Expand Up @@ -732,7 +689,7 @@ func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error {

// If a pkscript for this channel already exists, the channel has been
// previously registered.
if _, ok := c.summaries[chanID]; ok {
if _, ok := c.chanInfos[chanID]; ok {
return nil
}

Expand All @@ -752,8 +709,10 @@ func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error {

// Finally, cache the pkscript in our in-memory cache to avoid db
// lookups for the remainder of the daemon's execution.
c.summaries[chanID] = wtdb.ClientChanSummary{
SweepPkScript: pkScript,
c.chanInfos[chanID] = &wtdb.ChannelInfo{
ClientChanSummary: wtdb.ClientChanSummary{
SweepPkScript: pkScript,
},
}

return nil
Expand All @@ -770,16 +729,23 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID,

// Make sure that this channel is registered with the tower client.
c.backupMu.Lock()
if _, ok := c.summaries[*chanID]; !ok {
info, ok := c.chanInfos[*chanID]
if !ok {
c.backupMu.Unlock()

return ErrUnregisteredChannel
}

// Ignore backups that have already been presented to the client.
height, ok := c.chanCommitHeights[*chanID]
if ok && stateNum <= height {
var duplicate bool
info.MaxHeight.WhenSome(func(maxHeight uint64) {
if stateNum <= maxHeight {
duplicate = true
}
})
if duplicate {
c.backupMu.Unlock()

c.log.Debugf("Ignoring duplicate backup for chanid=%v at "+
"height=%d", chanID, stateNum)

Expand All @@ -789,7 +755,7 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID,
// This backup has a higher commit height than any known backup for this
// channel. We'll update our tip so that we won't accept it again if the
// link flaps.
c.chanCommitHeights[*chanID] = stateNum
c.chanInfos[*chanID].MaxHeight = fn.Some(stateNum)
c.backupMu.Unlock()

id := &wtdb.BackupID{
Expand Down Expand Up @@ -899,7 +865,7 @@ func (c *TowerClient) handleClosedChannel(chanID lnwire.ChannelID,
defer c.backupMu.Unlock()

// We only care about channels registered with the tower client.
if _, ok := c.summaries[chanID]; !ok {
if _, ok := c.chanInfos[chanID]; !ok {
return nil
}

Expand All @@ -924,8 +890,7 @@ func (c *TowerClient) handleClosedChannel(chanID lnwire.ChannelID,
return fmt.Errorf("could not track closable sessions: %w", err)
}

delete(c.summaries, chanID)
delete(c.chanCommitHeights, chanID)
delete(c.chanInfos, chanID)

return nil
}
Expand Down Expand Up @@ -1332,7 +1297,7 @@ func (c *TowerClient) backupDispatcher() {
// the prevTask, and should be reprocessed after obtaining a new sessionQueue.
func (c *TowerClient) processTask(task *wtdb.BackupID) {
c.backupMu.Lock()
summary, ok := c.summaries[task.ChanID]
summary, ok := c.chanInfos[task.ChanID]
if !ok {
c.backupMu.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions watchtower/wtclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ type DB interface {
// successfully backed up using the given session.
NumAckedUpdates(id *wtdb.SessionID) (uint64, error)

// FetchChanSummaries loads a mapping from all registered channels to
// their channel summaries. Only the channels that have not yet been
// FetchChanInfos loads a mapping from all registered channels to
// their wtdb.ChannelInfo. Only the channels that have not yet been
// marked as closed will be loaded.
FetchChanSummaries() (wtdb.ChannelSummaries, error)
FetchChanInfos() (wtdb.ChannelInfos, error)

// MarkChannelClosed will mark a registered channel as closed by setting
// its closed-height as the given block height. It returns a list of
Expand Down
22 changes: 20 additions & 2 deletions watchtower/wtdb/client_chan_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,29 @@ package wtdb
import (
"io"

"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/lnwire"
)

// ChannelSummaries is a map for a given channel id to it's ClientChanSummary.
type ChannelSummaries map[lnwire.ChannelID]ClientChanSummary
// ChannelInfos is a map for a given channel id to it's ChannelInfo.
type ChannelInfos map[lnwire.ChannelID]*ChannelInfo

// ChannelInfo contains various useful things about a registered channel.
//
// NOTE: the reason for adding this struct which wraps ClientChanSummary
// instead of extending ClientChanSummary is for faster look-up of added fields.
// If we were to extend ClientChanSummary instead then we would need to decode
// the entire struct each time we want to read the new fields and then re-encode
// the struct each time we want to write to a new field.
type ChannelInfo struct {
ClientChanSummary

// MaxHeight is the highest commitment height that the tower has been
// handed for this channel. An Option type is used to store this since
// a commitment height of zero is valid, and we need a way of knowing if
// we have seen a new height yet or not.
MaxHeight fn.Option[uint64]
}

// ClientChanSummary tracks channel-specific information. A new
// ClientChanSummary is inserted in the database the first time the client
Expand Down
Loading

0 comments on commit 80684ec

Please sign in to comment.