Skip to content

Commit

Permalink
lnd: add GraphSource interface and GraphProvider to ImplementationConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
ellemouton committed Nov 3, 2024
1 parent 55213cb commit 6f4f4e2
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 26 deletions.
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,6 +1777,7 @@ func (c *Config) ImplementationConfig(
),
WalletConfigBuilder: rpcImpl,
ChainControlBuilder: rpcImpl,
GraphProvider: rpcImpl,
}
}

Expand All @@ -1788,6 +1789,7 @@ func (c *Config) ImplementationConfig(
DatabaseBuilder: NewDefaultDatabaseBuilder(c, ltndLog),
WalletConfigBuilder: defaultImpl,
ChainControlBuilder: defaultImpl,
GraphProvider: defaultImpl,
}
}

Expand Down
23 changes: 23 additions & 0 deletions config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ type ChainControlBuilder interface {
*btcwallet.Config) (*chainreg.ChainControl, func(), error)
}

// GraphProvider is an interface that must be satisfied by any external system
// that wants to provide LND with graph information.
type GraphProvider interface {
// Graph returns the GraphSource that LND will use for read-only graph
// related queries.
Graph(context.Context, *DatabaseInstances) (GraphSource, error)
}

// ImplementationCfg is a struct that holds all configuration items for
// components that can be implemented outside lnd itself.
type ImplementationCfg struct {
Expand Down Expand Up @@ -155,6 +163,10 @@ type ImplementationCfg struct {
// AuxComponents is a set of auxiliary components that can be used by
// lnd for certain custom channel types.
AuxComponents

// GraphProvider is a type that can provide a custom GraphSource for LND
// to use for read-only graph calls.
GraphProvider
}

// AuxComponents is a set of auxiliary components that can be used by lnd for
Expand Down Expand Up @@ -249,6 +261,17 @@ func (d *DefaultWalletImpl) RegisterGrpcSubserver(s *grpc.Server) error {
return nil
}

// Graph returns the GraphSource that LND will use for read-only graph related
// queries. By default, the GraphSource implementation is this LND node's
// backing graphdb.ChannelGraph.
//
// note: this is part of the GraphProvider interface.
func (d *DefaultWalletImpl) Graph(_ context.Context,
dbs *DatabaseInstances) (GraphSource, error) {

return dbs.GraphDB, nil
}

// ValidateMacaroon extracts the macaroon from the context's gRPC metadata,
// checks its signature, makes sure all specified permissions for the called
// method are contained within and finally ensures all caveat conditions are
Expand Down
47 changes: 47 additions & 0 deletions interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package lnd

import (
"time"

"github.com/btcsuite/btcd/btcec/v2"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/graph/graphsession"
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
"github.com/lightningnetwork/lnd/netann"
)

// GraphSource defines the read-only graph interface required by LND for graph
// related tasks.
type GraphSource interface {
graphsession.ReadOnlyGraph
autopilot.GraphSource
invoicesrpc.GraphSource
netann.ChannelGraph
channeldb.AddrSource

// ForEachChannel iterates through all the channel edges stored within
// the graph and invokes the passed callback for each edge. If the
// callback returns an error, then the transaction is aborted and the
// iteration stops early. An edge's policy structs may be nil if the
// ChannelUpdate in question has not yet been received for the channel.
ForEachChannel(cb func(*models.ChannelEdgeInfo,
*models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error

// HasLightningNode determines if the graph has a vertex identified by
// the target node identity public key. If the node exists in the
// database, a timestamp of when the data for the node was lasted
// updated is returned along with a true boolean. Otherwise, an empty
// time.Time is returned with a false boolean.
HasLightningNode(nodePub [33]byte) (time.Time, bool, error)

// NumZombies returns the current number of zombie channels in the
// graph.
NumZombies() (uint64, error)

// LookupAlias attempts to return the alias as advertised by the target
// node.
LookupAlias(pub *btcec.PublicKey) (string, error)
}
7 changes: 6 additions & 1 deletion lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
return mkErr("error deriving node key: %v", err)
}

graphSource, err := implCfg.Graph(ctx, dbs)
if err != nil {
return mkErr("error obtaining graph source: %v", err)
}

if cfg.Tor.StreamIsolation && cfg.Tor.SkipProxyForClearNetTargets {
return errStreamIsolationWithProxySkip
}
Expand Down Expand Up @@ -601,7 +606,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
cfg, cfg.Listeners, dbs, activeChainControl, &idKeyDesc,
activeChainControl.Cfg.WalletUnlockParams.ChansToRestore,
multiAcceptor, torController, tlsManager, leaderElector,
implCfg,
graphSource, implCfg,
)
if err != nil {
return mkErr("unable to create server: %v", err)
Expand Down
4 changes: 3 additions & 1 deletion pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot,
cfg.MinConfs, lnwallet.DefaultAccountName,
)
},
Graph: autopilot.ChannelGraphFromGraphSource(svr.graphDB),
Graph: autopilot.ChannelGraphFromGraphSource(
svr.graphSource,
),
Constraints: atplConstraints,
ConnectToPeer: func(target *btcec.PublicKey, addrs []net.Addr) (bool, error) {
// First, we'll check if we're already connected to the
Expand Down
32 changes: 16 additions & 16 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
if err != nil {
return err
}
graph := s.graphDB
graph := s.graphSource

routerBackend := &routerrpc.RouterBackend{
SelfNode: selfNode.PubKeyBytes,
Expand Down Expand Up @@ -784,12 +784,12 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
err = subServerCgs.PopulateDependencies(
r.cfg, s.cc, r.cfg.networkDir, macService, atpl, invoiceRegistry,
s.htlcSwitch, r.cfg.ActiveNetParams.Params, s.chanRouter,
routerBackend, s.nodeSigner, s.graphDB, s.chanStateDB,
s.sweeper, tower, s.towerClientMgr, r.cfg.net.ResolveTCPAddr,
genInvoiceFeatures, genAmpInvoiceFeatures,
s.getNodeAnnouncement, s.updateAndBrodcastSelfNode, parseAddr,
rpcsLog, s.aliasMgr, r.implCfg.AuxDataParser,
invoiceHtlcModifier,
routerBackend, s.nodeSigner, s.graphDB, s.graphSource,
s.chanStateDB, s.sweeper, tower, s.towerClientMgr,
r.cfg.net.ResolveTCPAddr, genInvoiceFeatures,
genAmpInvoiceFeatures, s.getNodeAnnouncement,
s.updateAndBrodcastSelfNode, parseAddr, rpcsLog, s.aliasMgr,
r.implCfg.AuxDataParser, invoiceHtlcModifier,
)
if err != nil {
return err
Expand Down Expand Up @@ -1745,7 +1745,7 @@ func (r *rpcServer) VerifyMessage(ctx context.Context,
// channels signed the message.
//
// TODO(phlip9): Require valid nodes to have capital in active channels.
graph := r.server.graphDB
graph := r.server.graphSource
_, active, err := graph.HasLightningNode(pub)
if err != nil {
return nil, fmt.Errorf("failed to query graph: %w", err)
Expand Down Expand Up @@ -4732,7 +4732,7 @@ func createRPCOpenChannel(r *rpcServer, dbChannel *channeldb.OpenChannel,

// Look up our channel peer's node alias if the caller requests it.
if peerAliasLookup {
peerAlias, err := r.server.graphDB.LookupAlias(nodePub)
peerAlias, err := r.server.graphSource.LookupAlias(nodePub)
if err != nil {
peerAlias = fmt.Sprintf("unable to lookup "+
"peer alias: %v", err)
Expand Down Expand Up @@ -6071,7 +6071,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context,
NodeSigner: r.server.nodeSigner,
DefaultCLTVExpiry: defaultDelta,
ChanDB: r.server.chanStateDB,
Graph: r.server.graphDB,
Graph: r.server.graphSource,
GenInvoiceFeatures: func() *lnwire.FeatureVector {
v := r.server.featureMgr.Get(feature.SetInvoice)

Expand Down Expand Up @@ -6502,7 +6502,7 @@ func (r *rpcServer) DescribeGraph(ctx context.Context,
// Obtain the pointer to the global singleton channel graph, this will
// provide a consistent view of the graph due to bolt db's
// transactional model.
graph := r.server.graphDB
graph := r.server.graphSource

// First iterate through all the known nodes (connected or unconnected
// within the graph), collating their current state into the RPC
Expand Down Expand Up @@ -6687,7 +6687,7 @@ func (r *rpcServer) GetNodeMetrics(ctx context.Context,
// Obtain the pointer to the global singleton channel graph, this will
// provide a consistent view of the graph due to bolt db's
// transactional model.
graph := r.server.graphDB
graph := r.server.graphSource

// Calculate betweenness centrality if requested. Note that depending on the
// graph size, this may take up to a few minutes.
Expand Down Expand Up @@ -6727,7 +6727,7 @@ func (r *rpcServer) GetNodeMetrics(ctx context.Context,
func (r *rpcServer) GetChanInfo(_ context.Context,
in *lnrpc.ChanInfoRequest) (*lnrpc.ChannelEdge, error) {

graph := r.server.graphDB
graph := r.server.graphSource

var (
edgeInfo *models.ChannelEdgeInfo
Expand Down Expand Up @@ -6771,7 +6771,7 @@ func (r *rpcServer) GetChanInfo(_ context.Context,
func (r *rpcServer) GetNodeInfo(ctx context.Context,
in *lnrpc.NodeInfoRequest) (*lnrpc.NodeInfo, error) {

graph := r.server.graphDB
graph := r.server.graphSource

// First, parse the hex-encoded public key into a full in-memory public
// key object we can work with for querying.
Expand Down Expand Up @@ -6882,7 +6882,7 @@ func (r *rpcServer) QueryRoutes(ctx context.Context,
func (r *rpcServer) GetNetworkInfo(ctx context.Context,
_ *lnrpc.NetworkInfoRequest) (*lnrpc.NetworkInfo, error) {

graph := r.server.graphDB
graph := r.server.graphSource

var (
numNodes uint32
Expand Down Expand Up @@ -7852,7 +7852,7 @@ func (r *rpcServer) ForwardingHistory(ctx context.Context,
return "", err
}

peer, err := r.server.graphDB.FetchLightningNode(nil, vertex)
peer, err := r.server.graphSource.FetchLightningNode(nil, vertex)
if err != nil {
return "", err
}
Expand Down
21 changes: 14 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,14 @@ type server struct {

fundingMgr *funding.Manager

// graphDB is the pointer to this node's local graph DB.
graphDB *graphdb.ChannelGraph

// graphSource can be used for any read only graph queries. This may be
// backed by the graphDB pointer above or replace a different graph
// source.
graphSource GraphSource

chanStateDB *channeldb.ChannelStateDB

addrSource channeldb.AddrSource
Expand Down Expand Up @@ -492,7 +498,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
chansToRestore walletunlocker.ChannelsToRecover,
chanPredicate chanacceptor.ChannelAcceptor,
torController *tor.Controller, tlsManager *TLSManager,
leaderElector cluster.LeaderElector,
leaderElector cluster.LeaderElector, graphSource GraphSource,
implCfg *ImplementationCfg) (*server, error) {

var (
Expand Down Expand Up @@ -590,12 +596,13 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
HtlcInterceptor: invoiceHtlcModifier,
}

addrSource := channeldb.NewMultiAddrSource(dbs.ChanStateDB, dbs.GraphDB)
addrSource := channeldb.NewMultiAddrSource(dbs.ChanStateDB, graphSource)

s := &server{
cfg: cfg,
implCfg: implCfg,
graphDB: dbs.GraphDB,
graphSource: graphSource,
chanStateDB: dbs.ChanStateDB.ChannelStateDB(),
addrSource: addrSource,
miscDB: dbs.ChanStateDB,
Expand Down Expand Up @@ -749,7 +756,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
IsChannelActive: s.htlcSwitch.HasActiveLink,
ApplyChannelUpdate: s.applyChannelUpdate,
DB: s.chanStateDB,
Graph: dbs.GraphDB,
Graph: graphSource,
}

chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
Expand Down Expand Up @@ -1003,7 +1010,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
}
paymentSessionSource := &routing.SessionSource{
GraphSessionFactory: graphsession.NewGraphSessionFactory(
dbs.GraphDB,
graphSource,
),
SourceNode: sourceNode,
MissionControl: s.defaultMC,
Expand Down Expand Up @@ -1037,7 +1044,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,

s.chanRouter, err = routing.New(routing.Config{
SelfNode: selfNode.PubKeyBytes,
RoutingGraph: graphsession.NewRoutingGraph(dbs.GraphDB),
RoutingGraph: graphsession.NewRoutingGraph(graphSource),
Chain: cc.ChainIO,
Payer: s.htlcSwitch,
Control: s.controlTower,
Expand Down Expand Up @@ -2779,7 +2786,7 @@ func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, e
// First, we'll create an instance of the ChannelGraphBootstrapper as
// this can be used by default if we've already partially seeded the
// network.
chanGraph := autopilot.ChannelGraphFromGraphSource(s.graphDB)
chanGraph := autopilot.ChannelGraphFromGraphSource(s.graphSource)
graphBootstrapper, err := discovery.NewGraphBootstrapper(chanGraph)
if err != nil {
return nil, err
Expand Down Expand Up @@ -4903,7 +4910,7 @@ func (s *server) fetchNodeAdvertisedAddrs(pub *btcec.PublicKey) ([]net.Addr, err
return nil, err
}

node, err := s.graphDB.FetchLightningNode(nil, vertex)
node, err := s.graphSource.FetchLightningNode(nil, vertex)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion subrpcserver_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config,
routerBackend *routerrpc.RouterBackend,
nodeSigner *netann.NodeSigner,
graphDB *graphdb.ChannelGraph,
graphSource GraphSource,
chanStateDB *channeldb.ChannelStateDB,
sweeper *sweep.UtxoSweeper,
tower *watchtower.Standalone,
Expand Down Expand Up @@ -263,7 +264,7 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config,
reflect.ValueOf(defaultDelta),
)
subCfgValue.FieldByName("GraphDB").Set(
reflect.ValueOf(graphDB),
reflect.ValueOf(graphSource),
)
subCfgValue.FieldByName("ChanStateDB").Set(
reflect.ValueOf(chanStateDB),
Expand Down

0 comments on commit 6f4f4e2

Please sign in to comment.