Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle #157

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7164e58
multi: remove ChainHash from lnwire.NetAddress
ellemouton Nov 8, 2024
d3b76db
graph: remove unused ForEachNode method
ellemouton Nov 8, 2024
2d53fe1
graph: let FetchNodeFeatures take an optional read tx
ellemouton Nov 8, 2024
afb6d02
graph: create an abstract graphdb.RTx interface
ellemouton Nov 8, 2024
f3dc00a
graph: separate ForEachNode and ForEachNodeCBTx
ellemouton Nov 8, 2024
2502c1b
graph: remove read TX from ForEachNodeChannel callback
ellemouton Nov 8, 2024
97d1da6
graph: let NewPathFindTx take a context
ellemouton Nov 8, 2024
69c2110
multi: pass context to AddrSource
ellemouton Nov 8, 2024
0396ea0
graph: let IsPublicNode take a context
ellemouton Nov 8, 2024
71c8832
multi: Contexts for FetchChannelEdgesByID
ellemouton Nov 8, 2024
8eefb54
context for HasLnNode
ellemouton Nov 8, 2024
7c84a49
let FetchLightningNode take a context
ellemouton Nov 8, 2024
501a98a
let ForEachNodeChannel take a context
ellemouton Nov 8, 2024
36065ef
pass context to ForEachNodeDirectedChannel
ellemouton Nov 8, 2024
67601b2
pass context to FetchNodeFeatures
ellemouton Nov 8, 2024
baee817
let FetchChannelEdgesByOutpoint take a context
ellemouton Nov 8, 2024
0e9253c
let LookupAlias take a context
ellemouton Nov 8, 2024
c4cfcbb
let NumZombies take a context
ellemouton Nov 8, 2024
1032de0
let ForEachChannel take a context
ellemouton Nov 8, 2024
f60b645
let ForEachNode take a context
ellemouton Nov 8, 2024
5bd4a6e
add and make use of StatsCollector
ellemouton Nov 8, 2024
38b9dce
invoicesrpc: remove invoicerpc server's access to ChannelGraph pointer
ellemouton Nov 8, 2024
1886f87
multi: add GraphSource interface and GraphProvider to ImplementationC…
ellemouton Nov 3, 2024
d0ccc5c
lnrpc: add IsPublic to lnrpc.NodeInfo
ellemouton Nov 8, 2024
9387dab
graphrpc server
ellemouton Nov 8, 2024
05f7f89
add exclude lists to GetNetworkInfo calls
ellemouton Nov 8, 2024
2fe92e5
graph mux and remote client
ellemouton Nov 8, 2024
f283058
multi: add a `--gossip.no-sync` option
ellemouton Nov 4, 2024
9ab9912
address TODO
ellemouton Nov 8, 2024
cd8dc27
remote graph config options
ellemouton Nov 9, 2024
9acda1c
itest :tada:
ellemouton Nov 9, 2024
75afb5c
no not doing this.
ellemouton Nov 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions autopilot/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autopilot

import (
"bytes"
"context"
"encoding/hex"
"errors"
"net"
Expand Down Expand Up @@ -135,7 +136,7 @@ func (d *dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
//
// NOTE: Part of the autopilot.ChannelGraph interface.
func (d *databaseChannelGraph) ForEachNode(cb func(Node) error) error {
return d.db.ForEachNode(func(tx kvdb.RTx,
return d.db.ForEachNodeWithCBTx(func(tx kvdb.RTx,
n *models.LightningNode) error {

// We'll skip over any node that doesn't have any advertised
Expand Down Expand Up @@ -169,7 +170,9 @@ func (d *databaseChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey,
return nil, err
}

dbNode, err := d.db.FetchLightningNode(vertex)
dbNode, err := d.db.FetchLightningNode(
context.Background(), vertex,
)
switch {
case errors.Is(err, graphdb.ErrGraphNodeNotFound):
fallthrough
Expand Down Expand Up @@ -554,7 +557,7 @@ func (nc dbNodeCached) ForEachChannel(cb func(ChannelEdge) error) error {
//
// NOTE: Part of the autopilot.ChannelGraph interface.
func (dc *databaseChannelGraphCached) ForEachNode(cb func(Node) error) error {
return dc.db.ForEachNodeCached(func(n route.Vertex,
return dc.db.ForEachNodeCached(context.Background(), func(n route.Vertex,
channels map[uint64]*graphdb.DirectedChannel) error {

if len(channels) > 0 {
Expand Down
18 changes: 11 additions & 7 deletions chanbackup/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chanbackup

import (
"context"
"fmt"

"github.com/btcsuite/btcd/wire"
Expand All @@ -24,15 +25,17 @@ type LiveChannelSource interface {
// passed open channel. The backup includes all information required to restore
// the channel, as well as addressing information so we can find the peer and
// reconnect to them to initiate the protocol.
func assembleChanBackup(addrSource channeldb.AddrSource,
func assembleChanBackup(ctx context.Context, addrSource channeldb.AddrSource,
openChan *channeldb.OpenChannel) (*Single, error) {

log.Debugf("Crafting backup for ChannelPoint(%v)",
openChan.FundingOutpoint)

// First, we'll query the channel source to obtain all the addresses
// that are associated with the peer for this channel.
known, nodeAddrs, err := addrSource.AddrsForNode(openChan.IdentityPub)
known, nodeAddrs, err := addrSource.AddrsForNode(
ctx, openChan.IdentityPub,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -90,8 +93,9 @@ func buildCloseTxInputs(
// FetchBackupForChan attempts to create a plaintext static channel backup for
// the target channel identified by its channel point. If we're unable to find
// the target channel, then an error will be returned.
func FetchBackupForChan(chanPoint wire.OutPoint, chanSource LiveChannelSource,
addrSource channeldb.AddrSource) (*Single, error) {
func FetchBackupForChan(ctx context.Context, chanPoint wire.OutPoint,
chanSource LiveChannelSource, addrSource channeldb.AddrSource) (*Single,
error) {

// First, we'll query the channel source to see if the channel is known
// and open within the database.
Expand All @@ -104,7 +108,7 @@ func FetchBackupForChan(chanPoint wire.OutPoint, chanSource LiveChannelSource,

// Once we have the target channel, we can assemble the backup using
// the source to obtain any extra information that we may need.
staticChanBackup, err := assembleChanBackup(addrSource, targetChan)
staticChanBackup, err := assembleChanBackup(ctx, addrSource, targetChan)
if err != nil {
return nil, fmt.Errorf("unable to create chan backup: %w", err)
}
Expand All @@ -114,7 +118,7 @@ func FetchBackupForChan(chanPoint wire.OutPoint, chanSource LiveChannelSource,

// FetchStaticChanBackups will return a plaintext static channel back up for
// all known active/open channels within the passed channel source.
func FetchStaticChanBackups(chanSource LiveChannelSource,
func FetchStaticChanBackups(ctx context.Context, chanSource LiveChannelSource,
addrSource channeldb.AddrSource) ([]Single, error) {

// First, we'll query the backup source for information concerning all
Expand All @@ -129,7 +133,7 @@ func FetchStaticChanBackups(chanSource LiveChannelSource,
// channel.
staticChanBackups := make([]Single, 0, len(openChans))
for _, openChan := range openChans {
chanBackup, err := assembleChanBackup(addrSource, openChan)
chanBackup, err := assembleChanBackup(ctx, addrSource, openChan)
if err != nil {
return nil, err
}
Expand Down
17 changes: 11 additions & 6 deletions chanbackup/backup_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chanbackup

import (
"context"
"fmt"
"net"
"testing"
Expand Down Expand Up @@ -61,8 +62,8 @@ func (m *mockChannelSource) addAddrsForNode(nodePub *btcec.PublicKey, addrs []ne
m.addrs[nodeKey] = addrs
}

func (m *mockChannelSource) AddrsForNode(nodePub *btcec.PublicKey) (bool,
[]net.Addr, error) {
func (m *mockChannelSource) AddrsForNode(_ context.Context,
nodePub *btcec.PublicKey) (bool, []net.Addr, error) {

if m.failQuery {
return false, nil, fmt.Errorf("fail")
Expand Down Expand Up @@ -120,7 +121,8 @@ func TestFetchBackupForChan(t *testing.T) {
}
for i, testCase := range testCases {
_, err := FetchBackupForChan(
testCase.chanPoint, chanSource, chanSource,
context.Background(), testCase.chanPoint, chanSource,
chanSource,
)
switch {
// If this is a valid test case, and we failed, then we'll
Expand Down Expand Up @@ -160,7 +162,9 @@ func TestFetchStaticChanBackups(t *testing.T) {
// With the channel source populated, we'll now attempt to create a set
// of backups for all the channels. This should succeed, as all items
// are populated within the channel source.
backups, err := FetchStaticChanBackups(chanSource, chanSource)
backups, err := FetchStaticChanBackups(
context.Background(), chanSource, chanSource,
)
require.NoError(t, err, "unable to create chan back ups")

if len(backups) != numChans {
Expand All @@ -175,7 +179,8 @@ func TestFetchStaticChanBackups(t *testing.T) {
copy(n[:], randomChan2.IdentityPub.SerializeCompressed())
delete(chanSource.addrs, n)

_, err = FetchStaticChanBackups(chanSource, chanSource)
ctx := context.Background()
_, err = FetchStaticChanBackups(ctx, chanSource, chanSource)
if err == nil {
t.Fatalf("query with incomplete information should fail")
}
Expand All @@ -184,7 +189,7 @@ func TestFetchStaticChanBackups(t *testing.T) {
// source at all, then we'll fail as well.
chanSource = newMockChannelSource()
chanSource.failQuery = true
_, err = FetchStaticChanBackups(chanSource, chanSource)
_, err = FetchStaticChanBackups(ctx, chanSource, chanSource)
if err == nil {
t.Fatalf("query should fail")
}
Expand Down
11 changes: 7 additions & 4 deletions chanbackup/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chanbackup

import (
"bytes"
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -81,7 +82,8 @@ type ChannelNotifier interface {
// synchronization point to ensure that the chanbackup.SubSwapper does
// not miss any channel open or close events in the period between when
// it's created, and when it requests the channel subscription.
SubscribeChans(map[wire.OutPoint]struct{}) (*ChannelSubscription, error)
SubscribeChans(context.Context, map[wire.OutPoint]struct{}) (
*ChannelSubscription, error)
}

// SubSwapper subscribes to new updates to the open channel state, and then
Expand Down Expand Up @@ -119,16 +121,17 @@ type SubSwapper struct {
// set of channels, and the required interfaces to be notified of new channel
// updates, pack a multi backup, and swap the current best backup from its
// storage location.
func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
keyRing keychain.KeyRing, backupSwapper Swapper) (*SubSwapper, error) {
func NewSubSwapper(ctx context.Context, startingChans []Single,
chanNotifier ChannelNotifier, keyRing keychain.KeyRing,
backupSwapper Swapper) (*SubSwapper, error) {

// First, we'll subscribe to the latest set of channel updates given
// the set of channels we already know of.
knownChans := make(map[wire.OutPoint]struct{})
for _, chanBackup := range startingChans {
knownChans[chanBackup.FundingOutpoint] = struct{}{}
}
chanEvents, err := chanNotifier.SubscribeChans(knownChans)
chanEvents, err := chanNotifier.SubscribeChans(ctx, knownChans)
if err != nil {
return nil, err
}
Expand Down
20 changes: 12 additions & 8 deletions chanbackup/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chanbackup

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -62,8 +63,8 @@ func newMockChannelNotifier() *mockChannelNotifier {
}
}

func (m *mockChannelNotifier) SubscribeChans(chans map[wire.OutPoint]struct{}) (
*ChannelSubscription, error) {
func (m *mockChannelNotifier) SubscribeChans(_ context.Context,
chans map[wire.OutPoint]struct{}) (*ChannelSubscription, error) {

if m.fail {
return nil, fmt.Errorf("fail")
Expand All @@ -88,10 +89,10 @@ func TestNewSubSwapperSubscribeFail(t *testing.T) {
fail: true,
}

_, err := NewSubSwapper(nil, &chanNotifier, keyRing, &swapper)
if err == nil {
t.Fatalf("expected fail due to lack of subscription")
}
_, err := NewSubSwapper(
context.Background(), nil, &chanNotifier, keyRing, &swapper,
)
require.Errorf(t, err, "expected fail due to lack of subscription")
}

func assertExpectedBackupSwap(t *testing.T, swapper *mockSwapper,
Expand Down Expand Up @@ -158,7 +159,9 @@ func TestSubSwapperIdempotentStartStop(t *testing.T) {
var chanNotifier mockChannelNotifier

swapper := newMockSwapper(keyRing)
subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, swapper)
subSwapper, err := NewSubSwapper(
context.Background(), nil, &chanNotifier, keyRing, swapper,
)
require.NoError(t, err, "unable to init subSwapper")

if err := subSwapper.Start(); err != nil {
Expand Down Expand Up @@ -224,7 +227,8 @@ func TestSubSwapperUpdater(t *testing.T) {
// With our channel set created, we'll make a fresh sub swapper
// instance to begin our test.
subSwapper, err := NewSubSwapper(
initialChanSet, chanNotifier, keyRing, swapper,
context.Background(), initialChanSet, chanNotifier, keyRing,
swapper,
)
require.NoError(t, err, "unable to make swapper")
if err := subSwapper.Start(); err != nil {
Expand Down
16 changes: 9 additions & 7 deletions chanbackup/recover.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chanbackup

import (
"context"
"net"

"github.com/btcsuite/btcd/btcec/v2"
Expand Down Expand Up @@ -29,7 +30,8 @@ type PeerConnector interface {
// available addresses. Once this method returns with a non-nil error,
// the connector should attempt to persistently connect to the target
// peer in the background as a persistent attempt.
ConnectPeer(node *btcec.PublicKey, addrs []net.Addr) error
ConnectPeer(ctx context.Context, node *btcec.PublicKey,
addrs []net.Addr) error
}

// Recover attempts to recover the static channel state from a set of static
Expand All @@ -41,7 +43,7 @@ type PeerConnector interface {
// well, in order to expose the addressing information required to locate to
// and connect to each peer in order to initiate the recovery protocol.
// The number of channels that were successfully restored is returned.
func Recover(backups []Single, restorer ChannelRestorer,
func Recover(ctx context.Context, backups []Single, restorer ChannelRestorer,
peerConnector PeerConnector) (int, error) {

var numRestored int
Expand Down Expand Up @@ -70,7 +72,7 @@ func Recover(backups []Single, restorer ChannelRestorer,
backup.FundingOutpoint)

err = peerConnector.ConnectPeer(
backup.RemoteNodePub, backup.Addresses,
ctx, backup.RemoteNodePub, backup.Addresses,
)
if err != nil {
return numRestored, err
Expand All @@ -95,7 +97,7 @@ func Recover(backups []Single, restorer ChannelRestorer,
// established, then the PeerConnector will continue to attempt to re-establish
// a persistent connection in the background. The number of channels that were
// successfully restored is returned.
func UnpackAndRecoverSingles(singles PackedSingles,
func UnpackAndRecoverSingles(ctx context.Context, singles PackedSingles,
keyChain keychain.KeyRing, restorer ChannelRestorer,
peerConnector PeerConnector) (int, error) {

Expand All @@ -104,7 +106,7 @@ func UnpackAndRecoverSingles(singles PackedSingles,
return 0, err
}

return Recover(chanBackups, restorer, peerConnector)
return Recover(ctx, chanBackups, restorer, peerConnector)
}

// UnpackAndRecoverMulti is a one-shot method, that given a set of packed
Expand All @@ -114,7 +116,7 @@ func UnpackAndRecoverSingles(singles PackedSingles,
// established, then the PeerConnector will continue to attempt to re-establish
// a persistent connection in the background. The number of channels that were
// successfully restored is returned.
func UnpackAndRecoverMulti(packedMulti PackedMulti,
func UnpackAndRecoverMulti(ctx context.Context, packedMulti PackedMulti,
keyChain keychain.KeyRing, restorer ChannelRestorer,
peerConnector PeerConnector) (int, error) {

Expand All @@ -123,5 +125,5 @@ func UnpackAndRecoverMulti(packedMulti PackedMulti,
return 0, err
}

return Recover(chanBackups.StaticBackups, restorer, peerConnector)
return Recover(ctx, chanBackups.StaticBackups, restorer, peerConnector)
}
6 changes: 4 additions & 2 deletions channel_notifier.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lnd

import (
"context"
"fmt"

"github.com/btcsuite/btcd/wire"
Expand Down Expand Up @@ -31,7 +32,8 @@ type channelNotifier struct {
// the channel subscription.
//
// NOTE: This is part of the chanbackup.ChannelNotifier interface.
func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{}) (
func (c *channelNotifier) SubscribeChans(ctx context.Context,
startingChans map[wire.OutPoint]struct{}) (
*chanbackup.ChannelSubscription, error) {

ltndLog.Infof("Channel backup proxy channel notifier starting")
Expand All @@ -46,7 +48,7 @@ func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{
// confirmed channels.
sendChanOpenUpdate := func(newOrPendingChan *channeldb.OpenChannel) {
_, nodeAddrs, err := c.addrs.AddrsForNode(
newOrPendingChan.IdentityPub,
ctx, newOrPendingChan.IdentityPub,
)
if err != nil {
pub := newOrPendingChan.IdentityPub
Expand Down
10 changes: 6 additions & 4 deletions channeldb/addr_source.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package channeldb

import (
"context"
"errors"
"net"

Expand All @@ -13,7 +14,8 @@ type AddrSource interface {
// AddrsForNode returns all known addresses for the target node public
// key. The returned boolean must indicate if the given node is unknown
// to the backing source.
AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr, error)
AddrsForNode(ctx context.Context, nodePub *btcec.PublicKey) (bool,
[]net.Addr, error)
}

// multiAddrSource is an implementation of AddrSource which gathers all the
Expand All @@ -38,8 +40,8 @@ func NewMultiAddrSource(sources ...AddrSource) AddrSource {
// node.
//
// NOTE: this implements the AddrSource interface.
func (c *multiAddrSource) AddrsForNode(nodePub *btcec.PublicKey) (bool,
[]net.Addr, error) {
func (c *multiAddrSource) AddrsForNode(ctx context.Context,
nodePub *btcec.PublicKey) (bool, []net.Addr, error) {

if len(c.sources) == 0 {
return false, nil, errors.New("no address sources")
Expand All @@ -55,7 +57,7 @@ func (c *multiAddrSource) AddrsForNode(nodePub *btcec.PublicKey) (bool,
// Iterate over all the address sources and query each one for the
// addresses it has for the node in question.
for _, src := range c.sources {
isKnown, addrs, err := src.AddrsForNode(nodePub)
isKnown, addrs, err := src.AddrsForNode(ctx, nodePub)
if err != nil {
return false, nil, err
}
Expand Down
Loading
Loading