Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wtclient startup perf #100

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 26 additions & 61 deletions watchtower/wtclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet"
Expand Down Expand Up @@ -295,9 +296,8 @@ type TowerClient struct {

closableSessionQueue *sessionCloseMinHeap

backupMu sync.Mutex
summaries wtdb.ChannelSummaries
chanCommitHeights map[lnwire.ChannelID]uint64
backupMu sync.Mutex
chanInfos wtdb.ChannelInfos

statTicker *time.Ticker
stats *ClientStats
Expand Down Expand Up @@ -339,9 +339,7 @@ func New(config *Config) (*TowerClient, error) {

plog := build.NewPrefixLog(prefix, log)

// Load the sweep pkscripts that have been generated for all previously
// registered channels.
chanSummaries, err := cfg.DB.FetchChanSummaries()
chanInfos, err := cfg.DB.FetchChanInfos()
if err != nil {
return nil, err
}
Expand All @@ -358,9 +356,8 @@ func New(config *Config) (*TowerClient, error) {
cfg: cfg,
log: plog,
pipeline: queue,
chanCommitHeights: make(map[lnwire.ChannelID]uint64),
activeSessions: newSessionQueueSet(),
summaries: chanSummaries,
chanInfos: chanInfos,
closableSessionQueue: newSessionCloseMinHeap(),
statTicker: time.NewTicker(DefaultStatInterval),
stats: new(ClientStats),
Expand All @@ -369,44 +366,6 @@ func New(config *Config) (*TowerClient, error) {
quit: make(chan struct{}),
}

// perUpdate is a callback function that will be used to inspect the
// full set of candidate client sessions loaded from disk, and to
// determine the highest known commit height for each channel. This
// allows the client to reject backups that it has already processed for
// its active policy.
perUpdate := func(policy wtpolicy.Policy, chanID lnwire.ChannelID,
commitHeight uint64) {

// We only want to consider accepted updates that have been
// accepted under an identical policy to the client's current
// policy.
if policy != c.cfg.Policy {
return
}

c.backupMu.Lock()
defer c.backupMu.Unlock()

// Take the highest commit height found in the session's acked
// updates.
height, ok := c.chanCommitHeights[chanID]
if !ok || commitHeight > height {
c.chanCommitHeights[chanID] = commitHeight
}
}

perMaxHeight := func(s *wtdb.ClientSession, chanID lnwire.ChannelID,
height uint64) {

perUpdate(s.Policy, chanID, height)
}

perCommittedUpdate := func(s *wtdb.ClientSession,
u *wtdb.CommittedUpdate) {

perUpdate(s.Policy, u.BackupID.ChanID, u.BackupID.CommitHeight)
}

candidateTowers := newTowerListIterator()
perActiveTower := func(tower *Tower) {
// If the tower has already been marked as active, then there is
Expand All @@ -429,8 +388,6 @@ func New(config *Config) (*TowerClient, error) {
candidateSessions, err := getTowerAndSessionCandidates(
cfg.DB, cfg.SecretKeyRing, perActiveTower,
wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)),
wtdb.WithPerMaxHeight(perMaxHeight),
wtdb.WithPerCommittedUpdate(perCommittedUpdate),
wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()),
)
if err != nil {
Expand Down Expand Up @@ -594,7 +551,7 @@ func (c *TowerClient) Start() error {

// Iterate over the list of registered channels and check if
// any of them can be marked as closed.
for id := range c.summaries {
for id := range c.chanInfos {
isClosed, closedHeight, err := c.isChannelClosed(id)
if err != nil {
returnErr = err
Expand All @@ -615,7 +572,7 @@ func (c *TowerClient) Start() error {

// Since the channel has been marked as closed, we can
// also remove it from the channel summaries map.
delete(c.summaries, id)
delete(c.chanInfos, id)
}

// Load all closable sessions.
Expand Down Expand Up @@ -732,7 +689,7 @@ func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error {

// If a pkscript for this channel already exists, the channel has been
// previously registered.
if _, ok := c.summaries[chanID]; ok {
if _, ok := c.chanInfos[chanID]; ok {
return nil
}

Expand All @@ -752,8 +709,10 @@ func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error {

// Finally, cache the pkscript in our in-memory cache to avoid db
// lookups for the remainder of the daemon's execution.
c.summaries[chanID] = wtdb.ClientChanSummary{
SweepPkScript: pkScript,
c.chanInfos[chanID] = &wtdb.ChannelInfo{
ClientChanSummary: wtdb.ClientChanSummary{
SweepPkScript: pkScript,
},
}

return nil
Expand All @@ -770,16 +729,23 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID,

// Make sure that this channel is registered with the tower client.
c.backupMu.Lock()
if _, ok := c.summaries[*chanID]; !ok {
info, ok := c.chanInfos[*chanID]
if !ok {
c.backupMu.Unlock()

return ErrUnregisteredChannel
}

// Ignore backups that have already been presented to the client.
height, ok := c.chanCommitHeights[*chanID]
if ok && stateNum <= height {
var duplicate bool
info.MaxHeight.WhenSome(func(maxHeight uint64) {
if stateNum <= maxHeight {
duplicate = true
}
})
if duplicate {
c.backupMu.Unlock()

c.log.Debugf("Ignoring duplicate backup for chanid=%v at "+
"height=%d", chanID, stateNum)

Expand All @@ -789,7 +755,7 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID,
// This backup has a higher commit height than any known backup for this
// channel. We'll update our tip so that we won't accept it again if the
// link flaps.
c.chanCommitHeights[*chanID] = stateNum
c.chanInfos[*chanID].MaxHeight = fn.Some(stateNum)
c.backupMu.Unlock()

id := &wtdb.BackupID{
Expand Down Expand Up @@ -899,7 +865,7 @@ func (c *TowerClient) handleClosedChannel(chanID lnwire.ChannelID,
defer c.backupMu.Unlock()

// We only care about channels registered with the tower client.
if _, ok := c.summaries[chanID]; !ok {
if _, ok := c.chanInfos[chanID]; !ok {
return nil
}

Expand All @@ -924,8 +890,7 @@ func (c *TowerClient) handleClosedChannel(chanID lnwire.ChannelID,
return fmt.Errorf("could not track closable sessions: %w", err)
}

delete(c.summaries, chanID)
delete(c.chanCommitHeights, chanID)
delete(c.chanInfos, chanID)

return nil
}
Expand Down Expand Up @@ -1332,7 +1297,7 @@ func (c *TowerClient) backupDispatcher() {
// the prevTask, and should be reprocessed after obtaining a new sessionQueue.
func (c *TowerClient) processTask(task *wtdb.BackupID) {
c.backupMu.Lock()
summary, ok := c.summaries[task.ChanID]
summary, ok := c.chanInfos[task.ChanID]
if !ok {
c.backupMu.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions watchtower/wtclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ type DB interface {
// successfully backed up using the given session.
NumAckedUpdates(id *wtdb.SessionID) (uint64, error)

// FetchChanSummaries loads a mapping from all registered channels to
// their channel summaries. Only the channels that have not yet been
// FetchChanInfos loads a mapping from all registered channels to
// their wtdb.ChannelInfo. Only the channels that have not yet been
// marked as closed will be loaded.
FetchChanSummaries() (wtdb.ChannelSummaries, error)
FetchChanInfos() (wtdb.ChannelInfos, error)

// MarkChannelClosed will mark a registered channel as closed by setting
// its closed-height as the given block height. It returns a list of
Expand Down
15 changes: 15 additions & 0 deletions watchtower/wtdb/client_chan_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,24 @@ package wtdb
import (
"io"

"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/lnwire"
)

// ChannelInfos is a map for a given channel id to it's ChannelInfo.
type ChannelInfos map[lnwire.ChannelID]*ChannelInfo

// ChannelInfo contains various useful things about a registered channel.
type ChannelInfo struct {
ClientChanSummary

// MaxHeight is the highest commitment height that the tower has been
// handed for this channel. An Option type is used to store this since
// a commitment height of zero is valid, and we need a way of knowing if
// we have seen a new height yet or not.
MaxHeight fn.Option[uint64]
}

// ChannelSummaries is a map for a given channel id to it's ClientChanSummary.
type ChannelSummaries map[lnwire.ChannelID]ClientChanSummary

Expand Down
Loading