From 0c76e297ed85c318749b954c69a148be816bf6e5 Mon Sep 17 00:00:00 2001 From: ziggie Date: Fri, 24 May 2024 11:57:06 +0100 Subject: [PATCH] multi: make reassignment of edge atomic When the option SCID is used we need to make sure we update the channel data in an atomic way so that we don't miss any updates by our peer and remove all edge info which still uses the alias as the channel id (e.g. ChannelUpdate msgs). Moreover add unit test for the new ReAddEdge method. --- funding/manager.go | 119 +++++++++++++++++++++++++++++++--------- funding/manager_test.go | 39 ++++++++++++- graph/db/graph.go | 74 +++++++++++++++++++++++++ graph/db/graph_test.go | 82 +++++++++++++++++++++++++++ server.go | 54 +++++++++++++++--- 5 files changed, 331 insertions(+), 37 deletions(-) diff --git a/funding/manager.go b/funding/manager.go index 395cccb2a6..9373657c5d 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -530,9 +530,12 @@ type Config struct { // the initiator for channels of the anchor type. MaxAnchorsCommitFeeRate chainfee.SatPerKWeight - // DeleteAliasEdge allows the Manager to delete an alias channel edge - // from the graph. It also returns our local to-be-deleted policy. - DeleteAliasEdge func(scid lnwire.ShortChannelID) ( + // ReAssignSCID allows the Manager to assign a new SCID to an + // option-scid channel being part of the underlying graph. This is + // necessary because option-scid channels change their scid during their + // lifetime (public zeroconf channels for example) so we need to make + // sure to update the underlying graph. + ReAssignSCID func(aliasScID, newScID lnwire.ShortChannelID) ( *models.ChannelEdgePolicy, error) // AliasManager is an implementation of the aliasHandler interface that @@ -3719,19 +3722,29 @@ func (f *Manager) annAfterSixConfs(completeChan *channeldb.OpenChannel, "maps: %v", err) } - // We'll delete the edge and add it again via - // addToGraph. This is because the peer may have - // sent us a ChannelUpdate with an alias and we don't - // want to relay this. - ourPolicy, err := f.cfg.DeleteAliasEdge(baseScid) + // We reassign the same scid to the graph db. This will + // trigger a deletion of the current edge data and + // reinsert the channel with the same edge info and + // policy. This is done to guarantee that potential + // ChannelUpdates using the alias as the scid are + // removed and not relayed to the broader network + // because the alias is not a verifiable channel id. + ourPolicy, err := f.cfg.ReAssignSCID( + baseScid, baseScid, + ) if err != nil { - return fmt.Errorf("failed deleting real edge "+ - "for alias channel from graph: %v", - err) + return fmt.Errorf("unable to reassign alias "+ + "edge in graph: %w", err) } - err = f.addToGraph( - completeChan, &baseScid, nil, ourPolicy, + log.Infof("Successfully reassigned alias edge in "+ + "graph(non-zeroconf): %v(%d) -> %v(%d)", + baseScid, baseScid.ToUint64(), + baseScid, baseScid.ToUint64()) + + // We send the rassigned ChannelUpdate to the peer. + err = f.sendChanUpdate( + completeChan, &baseScid, ourPolicy, ) if err != nil { return fmt.Errorf("failed to re-add to "+ @@ -3808,23 +3821,33 @@ func (f *Manager) waitForZeroConfChannel(c *channeldb.OpenChannel) error { "six confirmations: %v", err) } - // TODO: Make this atomic! - ourPolicy, err := f.cfg.DeleteAliasEdge(c.ShortChanID()) + // The underlying graph entry for this channel id needs to be + // reassigned with the new confirmed scid. Moreover channel + // updates with the alias scid are removed so that we do not + // relay them to the broader network. + ourPolicy, err := f.cfg.ReAssignSCID( + c.ShortChanID(), confChan.shortChanID, + ) if err != nil { - return fmt.Errorf("unable to delete alias edge from "+ - "graph: %v", err) + return fmt.Errorf("unable to reassign alias edge in "+ + "graph: %w", err) } - // We'll need to update the graph with the new ShortChannelID - // via an addToGraph call. We don't pass in the peer's - // alias since we'll be using the confirmed SCID from now on - // regardless if it's public or not. - err = f.addToGraph( - c, &confChan.shortChanID, nil, ourPolicy, + aliasScid := c.ShortChanID() + confirmedScid := confChan.shortChanID + + log.Infof("Successfully reassigned alias edge in "+ + "graph(zeroconf): %v(%d) -> %v(%d)", + aliasScid, aliasScid.ToUint64(), + confirmedScid, confirmedScid.ToUint64()) + + // Send the ChannelUpdate with the confirmed scid to the peer. + err = f.sendChanUpdate( + c, &confChan.shortChanID, ourPolicy, ) if err != nil { - return fmt.Errorf("failed adding confirmed zero-conf "+ - "SCID to graph: %v", err) + return fmt.Errorf("failed to send ChannelUpdate to "+ + "gossiper: %v", err) } } @@ -4590,6 +4613,52 @@ func (f *Manager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey, return nil } +// sendChanUpdate sends a ChannelUpdate to the gossiper which is as a +// consequence sent to the peer. +// +// TODO(ziggie): Refactor the gossip msgs so that not always all msgs have +// to be created but only the ones which are needed. +func (f *Manager) sendChanUpdate(completeChan *channeldb.OpenChannel, + shortChanID *lnwire.ShortChannelID, + ourPolicy *models.ChannelEdgePolicy) error { + + chanID := lnwire.NewChanIDFromOutPoint(completeChan.FundingOutpoint) + + fwdMinHTLC, fwdMaxHTLC := f.extractAnnounceParams(completeChan) + + ann, err := f.newChanAnnouncement( + f.cfg.IDKey, completeChan.IdentityPub, + &completeChan.LocalChanCfg.MultiSigKey, + completeChan.RemoteChanCfg.MultiSigKey.PubKey, *shortChanID, + chanID, fwdMinHTLC, fwdMaxHTLC, ourPolicy, + completeChan.ChanType, + ) + if err != nil { + return fmt.Errorf("error generating channel "+ + "announcement: %v", err) + } + + errChan := f.cfg.SendAnnouncement(ann.chanUpdateAnn) + select { + case err := <-errChan: + if err != nil { + if graph.IsError(err, graph.ErrOutdated, + graph.ErrIgnored) { + + log.Debugf("Graph rejected "+ + "ChannelUpdate: %v", err) + } else { + return fmt.Errorf("error sending channel "+ + "update: %v", err) + } + } + case <-f.quit: + return ErrFundingManagerShuttingDown + } + + return nil +} + // InitFundingWorkflow sends a message to the funding manager instructing it // to initiate a single funder workflow with the source peer. func (f *Manager) InitFundingWorkflow(msg *InitFundingMsg) { diff --git a/funding/manager_test.go b/funding/manager_test.go index ab6bd48541..ea6326b4a8 100644 --- a/funding/manager_test.go +++ b/funding/manager_test.go @@ -550,7 +550,8 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, NotifyOpenChannelEvent: evt.NotifyOpenChannelEvent, OpenChannelPredicate: chainedAcceptor, NotifyPendingOpenChannelEvent: evt.NotifyPendingOpenChannelEvent, - DeleteAliasEdge: func(scid lnwire.ShortChannelID) ( + ReAssignSCID: func(aliasScID, + newScID lnwire.ShortChannelID) ( *models.ChannelEdgePolicy, error) { return nil, nil @@ -674,7 +675,7 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { ZombieSweeperInterval: oldCfg.ZombieSweeperInterval, ReservationTimeout: oldCfg.ReservationTimeout, OpenChannelPredicate: chainedAcceptor, - DeleteAliasEdge: oldCfg.DeleteAliasEdge, + ReAssignSCID: oldCfg.ReAssignSCID, AliasManager: oldCfg.AliasManager, AuxLeafStore: oldCfg.AuxLeafStore, AuxSigner: oldCfg.AuxSigner, @@ -1197,6 +1198,38 @@ func assertChannelAnnouncements(t *testing.T, alice, bob *testNode, } } +// assertChannelUpdate checks that a ChannelUpdate has been sent by the node +// with the expected parameters. +func assertChannelUpdates(t *testing.T, alice, bob *testNode, + capacity btcutil.Amount, customMinHtlc, customMaxHtlc, + baseFees, feeRates []lnwire.MilliSatoshi) { + + t.Helper() + + // Validate custom parameter arrays have expected length + validateCustomParams( + t, customMinHtlc, customMaxHtlc, baseFees, feeRates, + ) + + nodes := []*testNode{alice, bob} + for i, node := range nodes { + // Each node should send exactly 2 announcements + // ChannelAnnouncement and ChannelUpdate. + _, updates := collectGossipMsgs(t, node, 1) + verifyNoExtraMsgs(t, node) + + require.Len(t, updates, 1, "expected 1 ChannelUpdate from "+ + "node %d", i) + for _, update := range updates { + verifyChannelUpdate( + t, update, i, nodes, + node.fundingMgr.cfg, capacity, customMinHtlc, + customMaxHtlc, baseFees, feeRates, + ) + } + } +} + // validateCustomParams ensures custom parameter arrays have valid length. func validateCustomParams(t *testing.T, params ...[]lnwire.MilliSatoshi) { t.Helper() @@ -4620,7 +4653,7 @@ func testZeroConf(t *testing.T, chanType *lnwire.ChannelType) { // For taproot channels, we don't expect them to be announced atm. if !isTaprootChanType(chanType) { - assertChannelAnnouncements( + assertChannelUpdates( t, alice, bob, fundingAmt, nil, nil, nil, nil, ) } diff --git a/graph/db/graph.go b/graph/db/graph.go index 2a91398c62..3e94acb4cc 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -1178,6 +1178,80 @@ func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx, return chanIndex.Put(b.Bytes(), chanKey[:]) } +// ReAddChannelEdge removes the edge with the given channel ID from the +// database and adds the new edge to guarantee atomicity. +// This is important for option-scid-alias channels which may change its SCID +// over the course of its lifetime (e.g., public zero-conf channels). +func (c *ChannelGraph) ReAddChannelEdge( + chanID uint64, newEdge *models.ChannelEdgeInfo, + ourPolicy *models.ChannelEdgePolicy) error { + + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { + edges := tx.ReadWriteBucket(edgeBucket) + if edges == nil { + return ErrEdgeNotFound + } + edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket) + if edgeIndex == nil { + return ErrEdgeNotFound + } + chanIndex := edges.NestedReadWriteBucket(channelPointBucket) + if chanIndex == nil { + return ErrEdgeNotFound + } + nodes := tx.ReadWriteBucket(nodeBucket) + if nodes == nil { + return ErrGraphNodeNotFound + } + + var rawChanID [8]byte + byteOrder.PutUint64(rawChanID[:], chanID) + + // We don't mark this channel as zombie, because we are readding + // it immediately after deleting it below. This method also + // deletes the channel from the graph cache, however there is + // no entry in the cache because we only add it if we have + // received the proof(signatures). + err := c.delChannelEdgeUnsafe( + edges, edgeIndex, chanIndex, nil, + rawChanID[:], false, false, + ) + if err != nil { + return err + } + + // Now we add the channel with the new edge info. + err = c.addChannelEdge(tx, newEdge) + if err != nil { + return err + } + + // Also add the new channel update from our side. + _, err = updateEdgePolicy(tx, ourPolicy, c.graphCache) + + return err + }, func() {}) + if err != nil { + return err + } + + // Remove the Cache entries. + c.rejectCache.remove(chanID) + c.chanCache.remove(chanID) + + // We also make sure we clear the reject cache because we might have + // received a channel update msg with the new SCID from our peer which + // would have been put in the reject cache because the channel was not + // part of the graph. + c.rejectCache.remove(newEdge.ChannelID) + c.chanCache.remove(newEdge.ChannelID) + + return nil +} + // HasChannelEdge returns true if the database knows of a channel edge with the // passed channel ID, and false otherwise. If an edge with that ID is found // within the graph, then two time stamps representing the last time the edge diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 8a02f24ff4..62e45df923 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -4063,3 +4063,85 @@ func TestClosedScid(t *testing.T) { require.Nil(t, err) require.True(t, exists) } + +// TestUpdateAliasChannel tests that the underlying graph channel information +// for an alias channel is deleted and readded under a different short channel +// id. +func TestUpdateAliasChannel(t *testing.T) { + t.Parallel() + + graph, err := MakeTestGraph(t) + require.NoError(t, err, "unable to make test database") + + // Create first node and add it to the graph. + node1, err := createTestVertex(graph.db) + require.NoError(t, err, "unable to create test node") + err = graph.AddLightningNode(node1) + require.NoError(t, err) + + // Create second node and add it to the graph. + node2, err := createTestVertex(graph.db) + require.NoError(t, err, "unable to create test node") + err = graph.AddLightningNode(node2) + require.NoError(t, err) + + // Adding a new channel edge to the graph. + edgeInfo, edge1, edge2 := createChannelEdge( + graph.db, node1, node2, + ) + if err := graph.AddChannelEdge(edgeInfo); err != nil { + t.Fatalf("unable to create channel edge: %v", err) + } + + // Add both edge policies. + err = graph.UpdateEdgePolicy(edge1) + require.NoError(t, err) + + err = graph.UpdateEdgePolicy(edge2) + require.NoError(t, err) + + oldSCID := edgeInfo.ChannelID + + // Define the new channel id (under real conditions this is the new + // confirmed channel id. For non-zeroconf alias channels this is the + // same as the old SCID). but we for this test require a different SCID + // and therefore we add 1 to the old SCID. + newChanSCID := oldSCID + 1 + + // Readd the channel edgeInfo under a different scid. + newEdgeInfo := new(models.ChannelEdgeInfo) + *newEdgeInfo = *edgeInfo + newEdgeInfo.ChannelID = newChanSCID + + // Create a new edge policy with the new channel id. + newEdge1 := new(models.ChannelEdgePolicy) + *newEdge1 = *edge1 + newEdge1.ChannelID = newChanSCID + + // Re-add the channel edge to the graph which deletes the old data and + // inserts the new edge data (edge info and edge policy). + err = graph.ReAddChannelEdge(oldSCID, newEdgeInfo, newEdge1) + require.NoError(t, err) + + // The old channel data should not exist anymore. + _, _, _, err = graph.FetchChannelEdgesByID(oldSCID) + require.ErrorIs(t, err, ErrEdgeNotFound, "channel should not exist") + + // Fetch the new channel data. + newEdgeInfo, newEdge1, newEdge2, err := graph.FetchChannelEdgesByID( + newChanSCID, + ) + require.NoError(t, err, "unable to fetch channel by ID") + + // Edge 1 should be different and should have another channel id. + err = compareEdgePolicies(newEdge1, edge1) + require.ErrorContains(t, err, "ChannelID doesn't match") + + // Edge 2 should be nil, because we deleted the former peer data. + require.Nil(t, newEdge2) + + // Because we did not change the signatures the edge info should be + // the same as soon as we swap in the old channel id. + newEdgeInfo.ChannelID = oldSCID + assertEdgeInfoEqual(t, newEdgeInfo, edgeInfo) +} diff --git a/server.go b/server.go index 799bfde6b8..8961b4bd51 100644 --- a/server.go +++ b/server.go @@ -1380,13 +1380,13 @@ func newServer(cfg *Config, listenAddrs []net.Addr, return nil, err } - // Wrap the DeleteChannelEdges method so that the funding manager can + // Wrap the `ReAddChannelEdge` method so that the funding manager can // use it without depending on several layers of indirection. - deleteAliasEdge := func(scid lnwire.ShortChannelID) ( + reAssignSCID := func(aliasScID, newScID lnwire.ShortChannelID) ( *models.ChannelEdgePolicy, error) { info, e1, e2, err := s.graphDB.FetchChannelEdgesByID( - scid.ToUint64(), + aliasScID.ToUint64(), ) if errors.Is(err, graphdb.ErrEdgeNotFound) { // This is unlikely but there is a slim chance of this @@ -1398,7 +1398,13 @@ func newServer(cfg *Config, listenAddrs []net.Addr, return nil, err } - // Grab our key to find our policy. + // We create a new ChannelEdgeInfo with the new SCID. + newEdgeInfo := new(models.ChannelEdgeInfo) + *newEdgeInfo = *info + newEdgeInfo.ChannelID = newScID.ToUint64() + + // We also readd the channel policy from our side with the new + // short channel id so we grab our key to find our policy. var ourKey [33]byte copy(ourKey[:], nodeKeyDesc.PubKey.SerializeCompressed()) @@ -1410,13 +1416,43 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } if ourPolicy == nil { - // Something is wrong, so return an error. - return nil, fmt.Errorf("we don't have an edge") + // We should always have our policy available. If that + // is not the case there might be an error in the + // ChannelUpdate msg logic so we return early. + return nil, fmt.Errorf("edge policy not found") } - err = s.graphDB.DeleteChannelEdges( - false, false, scid.ToUint64(), + // Update the policy data, this invalidates the signature + // therefore we need to resign the data. + ourPolicy.ChannelID = newEdgeInfo.ChannelID + chanUpdate := netann.UnsignedChannelUpdateFromEdge( + newEdgeInfo, ourPolicy, ) + + data, err := chanUpdate.DataToSign() + if err != nil { + return nil, err + } + + nodeSig, err := cc.MsgSigner.SignMessage( + nodeKeyDesc.KeyLocator, data, true, + ) + if err != nil { + return nil, err + } + + sig, err := lnwire.NewSigFromSignature(nodeSig) + if err != nil { + return nil, err + } + ourPolicy.SetSigBytes(sig.ToSignatureBytes()) + + // Delete the old edge information under the alias SCID and add + // the updated data with the new SCID. + err = s.graphDB.ReAddChannelEdge( + aliasScID.ToUint64(), newEdgeInfo, ourPolicy, + ) + return ourPolicy, err } @@ -1612,7 +1648,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, EnableUpfrontShutdown: cfg.EnableUpfrontShutdown, MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte( s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(), - DeleteAliasEdge: deleteAliasEdge, + ReAssignSCID: reAssignSCID, AliasManager: s.aliasMgr, IsSweeperOutpoint: s.sweeper.IsSweeperOutpoint, AuxFundingController: implCfg.AuxFundingController,