From adcaa8802f659b17edf12a8194dd00a16e162393 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 29 Oct 2024 18:44:51 +0200 Subject: [PATCH] multi: remove kvdb.Tx from ChannelGraphSource.ForAllOutgoingChannels and the same for ChannelStateDB.FetchChannel. Most of the calls to these methods provide a `nil` Tx anyways. The only place that currently provides a non-nil tx is in the `localchans.Manager`. It takes the transaction provided to the `ForAllOutgoingChannels` callback and passes it to it's `updateEdge` method. Note, however, that the `ForAllOutgoingChannels` call is a call to the graph db and the call to `updateEdge` is a call to the `ChannelStateDB`. There is no reason that these two calls need to happen under the same transaction as they are reading from two completely disjoint databases. And so in the effort to completely split untangle the relationship between the two databases, we now dont use the same transaction for these two calls. --- chanbackup/backup.go | 6 ++---- chanbackup/backup_test.go | 3 +-- channeldb/db.go | 9 ++++----- channeldb/db_test.go | 8 ++++---- channelnotifier/channelnotifier.go | 2 +- contractcourt/chain_arbitrator.go | 8 ++------ discovery/gossiper.go | 2 -- discovery/gossiper_test.go | 9 +++------ graph/builder.go | 8 ++++---- graph/interfaces.go | 3 +-- pilot.go | 2 +- routing/localchans/manager.go | 26 ++++++++++---------------- routing/localchans/manager_test.go | 10 ++++------ rpcserver.go | 4 ++-- server.go | 8 ++++---- 15 files changed, 43 insertions(+), 65 deletions(-) diff --git a/chanbackup/backup.go b/chanbackup/backup.go index 183d77e7d6..5853b37e45 100644 --- a/chanbackup/backup.go +++ b/chanbackup/backup.go @@ -6,7 +6,6 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/fn" - "github.com/lightningnetwork/lnd/kvdb" ) // LiveChannelSource is an interface that allows us to query for the set of @@ -18,8 +17,7 @@ type LiveChannelSource interface { // FetchChannel attempts to locate a live channel identified by the // passed chanPoint. Optionally an existing db tx can be supplied. - FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) ( - *channeldb.OpenChannel, error) + FetchChannel(chanPoint wire.OutPoint) (*channeldb.OpenChannel, error) } // assembleChanBackup attempts to assemble a static channel backup for the @@ -97,7 +95,7 @@ func FetchBackupForChan(chanPoint wire.OutPoint, chanSource LiveChannelSource, // First, we'll query the channel source to see if the channel is known // and open within the database. - targetChan, err := chanSource.FetchChannel(nil, chanPoint) + targetChan, err := chanSource.FetchChannel(chanPoint) if err != nil { // If we can't find the channel, then we return with an error, // as we have nothing to backup. diff --git a/chanbackup/backup_test.go b/chanbackup/backup_test.go index 0593b97599..46ccf4c244 100644 --- a/chanbackup/backup_test.go +++ b/chanbackup/backup_test.go @@ -8,7 +8,6 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/kvdb" "github.com/stretchr/testify/require" ) @@ -40,7 +39,7 @@ func (m *mockChannelSource) FetchAllChannels() ([]*channeldb.OpenChannel, error) return chans, nil } -func (m *mockChannelSource) FetchChannel(_ kvdb.RTx, chanPoint wire.OutPoint) ( +func (m *mockChannelSource) FetchChannel(chanPoint wire.OutPoint) ( *channeldb.OpenChannel, error) { if m.failQuery { diff --git a/channeldb/db.go b/channeldb/db.go index 3773b5e7b7..70de0aaf74 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -657,9 +657,8 @@ func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) ( // FetchChannel attempts to locate a channel specified by the passed channel // point. If the channel cannot be found, then an error will be returned. -// Optionally an existing db tx can be supplied. -func (c *ChannelStateDB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) ( - *OpenChannel, error) { +func (c *ChannelStateDB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, + error) { var targetChanPoint bytes.Buffer err := graphdb.WriteOutpoint(&targetChanPoint, &chanPoint) @@ -674,7 +673,7 @@ func (c *ChannelStateDB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) ( return targetChanPointBytes, &chanPoint, nil } - return c.channelScanner(tx, selector) + return c.channelScanner(nil, selector) } // FetchChannelByID attempts to locate a channel specified by the passed channel @@ -1366,7 +1365,7 @@ func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint, // With the chanPoint constructed, we'll attempt to find the target // channel in the database. If we can't find the channel, then we'll // return the error back to the caller. - dbChan, err := c.FetchChannel(nil, *chanPoint) + dbChan, err := c.FetchChannel(*chanPoint) switch { // If the channel wasn't found, then it's possible that it was already // abandoned from the database. diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 7d48af942c..f5b525a5bb 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -246,7 +246,7 @@ func TestFetchChannel(t *testing.T) { channelState := createTestChannel(t, cdb, openChannelOption()) // Next, attempt to fetch the channel by its chan point. - dbChannel, err := cdb.FetchChannel(nil, channelState.FundingOutpoint) + dbChannel, err := cdb.FetchChannel(channelState.FundingOutpoint) require.NoError(t, err, "unable to fetch channel") // The decoded channel state should be identical to what we stored @@ -270,7 +270,7 @@ func TestFetchChannel(t *testing.T) { uniqueOutputIndex.Add(1) channelState2.FundingOutpoint.Index = uniqueOutputIndex.Load() - _, err = cdb.FetchChannel(nil, channelState2.FundingOutpoint) + _, err = cdb.FetchChannel(channelState2.FundingOutpoint) require.ErrorIs(t, err, ErrChannelNotFound) chanID2 := lnwire.NewChanIDFromOutPoint(channelState2.FundingOutpoint) @@ -410,7 +410,7 @@ func TestRestoreChannelShells(t *testing.T) { // We should also be able to find the channel if we query for it // directly. - _, err = cdb.FetchChannel(nil, channelShell.Chan.FundingOutpoint) + _, err = cdb.FetchChannel(channelShell.Chan.FundingOutpoint) require.NoError(t, err, "unable to fetch channel") // We should also be able to find the link node that was inserted by @@ -459,7 +459,7 @@ func TestAbandonChannel(t *testing.T) { // At this point, the channel should no longer be found in the set of // open channels. - _, err = cdb.FetchChannel(nil, chanState.FundingOutpoint) + _, err = cdb.FetchChannel(chanState.FundingOutpoint) if err != ErrChannelNotFound { t.Fatalf("channel should not have been found: %v", err) } diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index 2b39396df8..b03d913130 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -144,7 +144,7 @@ func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint, // channel has gone from pending open to open. func (c *ChannelNotifier) NotifyOpenChannelEvent(chanPoint wire.OutPoint) { // Fetch the relevant channel from the database. - channel, err := c.chanDB.FetchChannel(nil, chanPoint) + channel, err := c.chanDB.FetchChannel(chanPoint) if err != nil { log.Warnf("Unable to fetch open channel from the db: %v", err) } diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 0004883c97..78a79a3c2f 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -304,9 +304,7 @@ func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions, // same instance that is used by the link. chanPoint := a.channel.FundingOutpoint - channel, err := a.c.chanSource.ChannelStateDB().FetchChannel( - nil, chanPoint, - ) + channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint) if err != nil { return nil, err } @@ -359,9 +357,7 @@ func (a *arbChannel) ForceCloseChan() (*wire.MsgTx, error) { // Now that we know the link can't mutate the channel // state, we'll read the channel from disk the target // channel according to its channel point. - channel, err := a.c.chanSource.ChannelStateDB().FetchChannel( - nil, chanPoint, - ) + channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint) if err != nil { return nil, err } diff --git a/discovery/gossiper.go b/discovery/gossiper.go index ec00e63c7b..951c4a98d9 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -24,7 +24,6 @@ import ( graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/keychain" - "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnutils" "github.com/lightningnetwork/lnd/lnwallet" @@ -1637,7 +1636,6 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error { edgesToUpdate []updateTuple ) err := d.cfg.Graph.ForAllOutgoingChannels(func( - _ kvdb.RTx, info *models.ChannelEdgeInfo, edge *models.ChannelEdgePolicy) error { diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 9fd916d0c9..0eb33f8c6c 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -29,7 +29,6 @@ import ( graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/keychain" - "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lntest/mock" "github.com/lightningnetwork/lnd/lntest/wait" @@ -207,9 +206,8 @@ func (r *mockGraphSource) ForEachNode(func(node *models.LightningNode) error) er return nil } -func (r *mockGraphSource) ForAllOutgoingChannels(cb func(tx kvdb.RTx, - i *models.ChannelEdgeInfo, - c *models.ChannelEdgePolicy) error) error { +func (r *mockGraphSource) ForAllOutgoingChannels(cb func( + i *models.ChannelEdgeInfo, c *models.ChannelEdgePolicy) error) error { r.mu.Lock() defer r.mu.Unlock() @@ -231,7 +229,7 @@ func (r *mockGraphSource) ForAllOutgoingChannels(cb func(tx kvdb.RTx, } for _, channel := range chans { - if err := cb(nil, channel.Info, channel.Policy1); err != nil { + if err := cb(channel.Info, channel.Policy1); err != nil { return err } } @@ -3483,7 +3481,6 @@ out: const newTimeLockDelta = 100 var edgesToUpdate []EdgeWithInfo err = ctx.router.ForAllOutgoingChannels(func( - _ kvdb.RTx, info *models.ChannelEdgeInfo, edge *models.ChannelEdgePolicy) error { diff --git a/graph/builder.go b/graph/builder.go index ea2361dd35..8c2ba2e3b8 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -1645,11 +1645,11 @@ func (b *Builder) ForEachNode( // the router. // // NOTE: This method is part of the ChannelGraphSource interface. -func (b *Builder) ForAllOutgoingChannels(cb func(kvdb.RTx, - *models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error) error { +func (b *Builder) ForAllOutgoingChannels(cb func(*models.ChannelEdgeInfo, + *models.ChannelEdgePolicy) error) error { return b.cfg.Graph.ForEachNodeChannel(b.cfg.SelfNode, - func(tx kvdb.RTx, c *models.ChannelEdgeInfo, + func(_ kvdb.RTx, c *models.ChannelEdgeInfo, e *models.ChannelEdgePolicy, _ *models.ChannelEdgePolicy) error { @@ -1658,7 +1658,7 @@ func (b *Builder) ForAllOutgoingChannels(cb func(kvdb.RTx, "has no policy") } - return cb(tx, c, e) + return cb(c, e) }, ) } diff --git a/graph/interfaces.go b/graph/interfaces.go index 0dd24e52b1..eb7f56603a 100644 --- a/graph/interfaces.go +++ b/graph/interfaces.go @@ -69,8 +69,7 @@ type ChannelGraphSource interface { // ForAllOutgoingChannels is used to iterate over all channels // emanating from the "source" node which is the center of the // star-graph. - ForAllOutgoingChannels(cb func(tx kvdb.RTx, - c *models.ChannelEdgeInfo, + ForAllOutgoingChannels(cb func(c *models.ChannelEdgeInfo, e *models.ChannelEdgePolicy) error) error // CurrentBlockHeight returns the block height from POV of the router diff --git a/pilot.go b/pilot.go index 2a37b080d0..11333a0722 100644 --- a/pilot.go +++ b/pilot.go @@ -282,7 +282,7 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot, ChannelInfo: func(chanPoint wire.OutPoint) ( *autopilot.LocalChannel, error) { - channel, err := svr.chanStateDB.FetchChannel(nil, chanPoint) + channel, err := svr.chanStateDB.FetchChannel(chanPoint) if err != nil { return nil, err } diff --git a/routing/localchans/manager.go b/routing/localchans/manager.go index af1986a298..d7380439ac 100644 --- a/routing/localchans/manager.go +++ b/routing/localchans/manager.go @@ -13,7 +13,6 @@ import ( "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/graph/db/models" - "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" @@ -40,14 +39,13 @@ type Manager struct { // ForAllOutgoingChannels is required to iterate over all our local // channels. The ChannelEdgePolicy parameter may be nil. - ForAllOutgoingChannels func(cb func(kvdb.RTx, - *models.ChannelEdgeInfo, + ForAllOutgoingChannels func(cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error) error // FetchChannel is used to query local channel parameters. Optionally an // existing db tx can be supplied. - FetchChannel func(tx kvdb.RTx, chanPoint wire.OutPoint) ( - *channeldb.OpenChannel, error) + FetchChannel func(chanPoint wire.OutPoint) (*channeldb.OpenChannel, + error) // AddEdge is used to add edge/channel to the topology of the router. AddEdge func(edge *models.ChannelEdgeInfo) error @@ -83,9 +81,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy) // NOTE: edge may be nil when this function is called. - processChan := func( - tx kvdb.RTx, - info *models.ChannelEdgeInfo, + processChan := func(info *models.ChannelEdgeInfo, edge *models.ChannelEdgePolicy) error { // If we have a channel filter, and this channel isn't a part @@ -114,9 +110,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, } // Apply the new policy to the edge. - err := r.updateEdge( - tx, info.ChannelPoint, edge, newSchema, - ) + err := r.updateEdge(info.ChannelPoint, edge, newSchema) if err != nil { failedUpdates = append(failedUpdates, makeFailureItem(info.ChannelPoint, @@ -164,7 +158,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, // Construct a list of failed policy updates. for chanPoint := range unprocessedChans { - channel, err := r.FetchChannel(nil, chanPoint) + channel, err := r.FetchChannel(chanPoint) switch { case errors.Is(err, channeldb.ErrChannelNotFound): failedUpdates = append(failedUpdates, @@ -203,7 +197,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, channel, newSchema, ) if failedUpdate == nil { - err = processChan(nil, info, edge) + err = processChan(info, edge) if err != nil { return nil, err } @@ -261,7 +255,7 @@ func (r *Manager) createMissingEdge(channel *channeldb.OpenChannel, // Validate the newly created edge policy with the user defined new // schema before adding the edge to the database. - err = r.updateEdge(nil, channel.FundingOutpoint, edge, newSchema) + err = r.updateEdge(channel.FundingOutpoint, edge, newSchema) if err != nil { return nil, nil, makeFailureItem( info.ChannelPoint, @@ -351,11 +345,11 @@ func (r *Manager) createEdge(channel *channeldb.OpenChannel, } // updateEdge updates the given edge with the new schema. -func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint, +func (r *Manager) updateEdge(chanPoint wire.OutPoint, edge *models.ChannelEdgePolicy, newSchema routing.ChannelPolicy) error { - channel, err := r.FetchChannel(tx, chanPoint) + channel, err := r.FetchChannel(chanPoint) if err != nil { return err } diff --git a/routing/localchans/manager_test.go b/routing/localchans/manager_test.go index 863b616875..d428a846be 100644 --- a/routing/localchans/manager_test.go +++ b/routing/localchans/manager_test.go @@ -14,7 +14,6 @@ import ( "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/keychain" - "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" @@ -123,20 +122,19 @@ func TestManager(t *testing.T) { return nil } - forAllOutgoingChannels := func(cb func(kvdb.RTx, - *models.ChannelEdgeInfo, + forAllOutgoingChannels := func(cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error) error { for _, c := range channelSet { - if err := cb(nil, c.edgeInfo, ¤tPolicy); err != nil { + if err := cb(c.edgeInfo, ¤tPolicy); err != nil { return err } } return nil } - fetchChannel := func(tx kvdb.RTx, chanPoint wire.OutPoint) ( - *channeldb.OpenChannel, error) { + fetchChannel := func(chanPoint wire.OutPoint) (*channeldb.OpenChannel, + error) { if chanPoint == chanPointMissing { return &channeldb.OpenChannel{}, channeldb.ErrChannelNotFound diff --git a/rpcserver.go b/rpcserver.go index 62facc6d95..738b46a40b 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2692,7 +2692,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, // First, we'll fetch the channel as is, as we'll need to examine it // regardless of if this is a force close or not. - channel, err := r.server.chanStateDB.FetchChannel(nil, *chanPoint) + channel, err := r.server.chanStateDB.FetchChannel(*chanPoint) if err != nil { return err } @@ -3140,7 +3140,7 @@ func (r *rpcServer) AbandonChannel(_ context.Context, return nil, err } - dbChan, err := r.server.chanStateDB.FetchChannel(nil, *chanPoint) + dbChan, err := r.server.chanStateDB.FetchChannel(*chanPoint) switch { // If the channel isn't found in the set of open channels, then we can // continue on as it can't be loaded into the link/peer. diff --git a/server.go b/server.go index 7efe99a8b6..af66442741 100644 --- a/server.go +++ b/server.go @@ -1130,17 +1130,17 @@ func newServer(cfg *Config, listenAddrs []net.Addr, s.localChanMgr = &localchans.Manager{ SelfPub: nodeKeyDesc.PubKey, DefaultRoutingPolicy: cc.RoutingPolicy, - ForAllOutgoingChannels: func(cb func(kvdb.RTx, - *models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error) error { + ForAllOutgoingChannels: func(cb func(*models.ChannelEdgeInfo, + *models.ChannelEdgePolicy) error) error { return s.graphDB.ForEachNodeChannel(selfVertex, - func(tx kvdb.RTx, c *models.ChannelEdgeInfo, + func(_ kvdb.RTx, c *models.ChannelEdgeInfo, e *models.ChannelEdgePolicy, _ *models.ChannelEdgePolicy) error { // NOTE: The invoked callback here may // receive a nil channel policy. - return cb(tx, c, e) + return cb(c, e) }, ) },