Skip to content

Commit

Permalink
multi: make reassignment of edge atomic
Browse files Browse the repository at this point in the history
When the option SCID is used we need to make sure we update the
channel data in an atomic way so that we don't miss any updates
by our peer and remove all edge info which still uses the alias
as the channel id (e.g. ChannelUpdate msgs).
Moreover add unit test for the new  ReAddEdge method.
  • Loading branch information
ziggie1984 committed Dec 20, 2024
1 parent 01c09ec commit 0c76e29
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 37 deletions.
119 changes: 94 additions & 25 deletions funding/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,9 +530,12 @@ type Config struct {
// the initiator for channels of the anchor type.
MaxAnchorsCommitFeeRate chainfee.SatPerKWeight

// DeleteAliasEdge allows the Manager to delete an alias channel edge
// from the graph. It also returns our local to-be-deleted policy.
DeleteAliasEdge func(scid lnwire.ShortChannelID) (
// ReAssignSCID allows the Manager to assign a new SCID to an
// option-scid channel being part of the underlying graph. This is
// necessary because option-scid channels change their scid during their
// lifetime (public zeroconf channels for example) so we need to make
// sure to update the underlying graph.
ReAssignSCID func(aliasScID, newScID lnwire.ShortChannelID) (
*models.ChannelEdgePolicy, error)

// AliasManager is an implementation of the aliasHandler interface that
Expand Down Expand Up @@ -3719,19 +3722,29 @@ func (f *Manager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
"maps: %v", err)
}

// We'll delete the edge and add it again via
// addToGraph. This is because the peer may have
// sent us a ChannelUpdate with an alias and we don't
// want to relay this.
ourPolicy, err := f.cfg.DeleteAliasEdge(baseScid)
// We reassign the same scid to the graph db. This will
// trigger a deletion of the current edge data and
// reinsert the channel with the same edge info and
// policy. This is done to guarantee that potential
// ChannelUpdates using the alias as the scid are
// removed and not relayed to the broader network
// because the alias is not a verifiable channel id.
ourPolicy, err := f.cfg.ReAssignSCID(
baseScid, baseScid,
)
if err != nil {
return fmt.Errorf("failed deleting real edge "+
"for alias channel from graph: %v",
err)
return fmt.Errorf("unable to reassign alias "+
"edge in graph: %w", err)
}

err = f.addToGraph(
completeChan, &baseScid, nil, ourPolicy,
log.Infof("Successfully reassigned alias edge in "+
"graph(non-zeroconf): %v(%d) -> %v(%d)",
baseScid, baseScid.ToUint64(),
baseScid, baseScid.ToUint64())

// We send the rassigned ChannelUpdate to the peer.
err = f.sendChanUpdate(
completeChan, &baseScid, ourPolicy,
)
if err != nil {
return fmt.Errorf("failed to re-add to "+
Expand Down Expand Up @@ -3808,23 +3821,33 @@ func (f *Manager) waitForZeroConfChannel(c *channeldb.OpenChannel) error {
"six confirmations: %v", err)
}

// TODO: Make this atomic!
ourPolicy, err := f.cfg.DeleteAliasEdge(c.ShortChanID())
// The underlying graph entry for this channel id needs to be
// reassigned with the new confirmed scid. Moreover channel
// updates with the alias scid are removed so that we do not
// relay them to the broader network.
ourPolicy, err := f.cfg.ReAssignSCID(
c.ShortChanID(), confChan.shortChanID,
)
if err != nil {
return fmt.Errorf("unable to delete alias edge from "+
"graph: %v", err)
return fmt.Errorf("unable to reassign alias edge in "+
"graph: %w", err)
}

// We'll need to update the graph with the new ShortChannelID
// via an addToGraph call. We don't pass in the peer's
// alias since we'll be using the confirmed SCID from now on
// regardless if it's public or not.
err = f.addToGraph(
c, &confChan.shortChanID, nil, ourPolicy,
aliasScid := c.ShortChanID()
confirmedScid := confChan.shortChanID

log.Infof("Successfully reassigned alias edge in "+
"graph(zeroconf): %v(%d) -> %v(%d)",
aliasScid, aliasScid.ToUint64(),
confirmedScid, confirmedScid.ToUint64())

// Send the ChannelUpdate with the confirmed scid to the peer.
err = f.sendChanUpdate(
c, &confChan.shortChanID, ourPolicy,
)
if err != nil {
return fmt.Errorf("failed adding confirmed zero-conf "+
"SCID to graph: %v", err)
return fmt.Errorf("failed to send ChannelUpdate to "+
"gossiper: %v", err)
}
}

Expand Down Expand Up @@ -4590,6 +4613,52 @@ func (f *Manager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey,
return nil
}

// sendChanUpdate sends a ChannelUpdate to the gossiper which is as a
// consequence sent to the peer.
//
// TODO(ziggie): Refactor the gossip msgs so that not always all msgs have
// to be created but only the ones which are needed.
func (f *Manager) sendChanUpdate(completeChan *channeldb.OpenChannel,
shortChanID *lnwire.ShortChannelID,
ourPolicy *models.ChannelEdgePolicy) error {

chanID := lnwire.NewChanIDFromOutPoint(completeChan.FundingOutpoint)

fwdMinHTLC, fwdMaxHTLC := f.extractAnnounceParams(completeChan)

ann, err := f.newChanAnnouncement(
f.cfg.IDKey, completeChan.IdentityPub,
&completeChan.LocalChanCfg.MultiSigKey,
completeChan.RemoteChanCfg.MultiSigKey.PubKey, *shortChanID,
chanID, fwdMinHTLC, fwdMaxHTLC, ourPolicy,
completeChan.ChanType,
)
if err != nil {
return fmt.Errorf("error generating channel "+
"announcement: %v", err)
}

errChan := f.cfg.SendAnnouncement(ann.chanUpdateAnn)
select {
case err := <-errChan:
if err != nil {
if graph.IsError(err, graph.ErrOutdated,
graph.ErrIgnored) {

log.Debugf("Graph rejected "+
"ChannelUpdate: %v", err)
} else {
return fmt.Errorf("error sending channel "+
"update: %v", err)
}
}
case <-f.quit:
return ErrFundingManagerShuttingDown
}

return nil
}

// InitFundingWorkflow sends a message to the funding manager instructing it
// to initiate a single funder workflow with the source peer.
func (f *Manager) InitFundingWorkflow(msg *InitFundingMsg) {
Expand Down
39 changes: 36 additions & 3 deletions funding/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,8 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
NotifyOpenChannelEvent: evt.NotifyOpenChannelEvent,
OpenChannelPredicate: chainedAcceptor,
NotifyPendingOpenChannelEvent: evt.NotifyPendingOpenChannelEvent,
DeleteAliasEdge: func(scid lnwire.ShortChannelID) (
ReAssignSCID: func(aliasScID,
newScID lnwire.ShortChannelID) (
*models.ChannelEdgePolicy, error) {

return nil, nil
Expand Down Expand Up @@ -674,7 +675,7 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
ZombieSweeperInterval: oldCfg.ZombieSweeperInterval,
ReservationTimeout: oldCfg.ReservationTimeout,
OpenChannelPredicate: chainedAcceptor,
DeleteAliasEdge: oldCfg.DeleteAliasEdge,
ReAssignSCID: oldCfg.ReAssignSCID,
AliasManager: oldCfg.AliasManager,
AuxLeafStore: oldCfg.AuxLeafStore,
AuxSigner: oldCfg.AuxSigner,
Expand Down Expand Up @@ -1197,6 +1198,38 @@ func assertChannelAnnouncements(t *testing.T, alice, bob *testNode,
}
}

// assertChannelUpdate checks that a ChannelUpdate has been sent by the node
// with the expected parameters.
func assertChannelUpdates(t *testing.T, alice, bob *testNode,
capacity btcutil.Amount, customMinHtlc, customMaxHtlc,
baseFees, feeRates []lnwire.MilliSatoshi) {

t.Helper()

// Validate custom parameter arrays have expected length
validateCustomParams(
t, customMinHtlc, customMaxHtlc, baseFees, feeRates,
)

nodes := []*testNode{alice, bob}
for i, node := range nodes {
// Each node should send exactly 2 announcements
// ChannelAnnouncement and ChannelUpdate.
_, updates := collectGossipMsgs(t, node, 1)
verifyNoExtraMsgs(t, node)

require.Len(t, updates, 1, "expected 1 ChannelUpdate from "+
"node %d", i)
for _, update := range updates {
verifyChannelUpdate(
t, update, i, nodes,
node.fundingMgr.cfg, capacity, customMinHtlc,
customMaxHtlc, baseFees, feeRates,
)
}
}
}

// validateCustomParams ensures custom parameter arrays have valid length.
func validateCustomParams(t *testing.T, params ...[]lnwire.MilliSatoshi) {
t.Helper()
Expand Down Expand Up @@ -4620,7 +4653,7 @@ func testZeroConf(t *testing.T, chanType *lnwire.ChannelType) {

// For taproot channels, we don't expect them to be announced atm.
if !isTaprootChanType(chanType) {
assertChannelAnnouncements(
assertChannelUpdates(
t, alice, bob, fundingAmt, nil, nil, nil, nil,
)
}
Expand Down
74 changes: 74 additions & 0 deletions graph/db/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,80 @@ func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx,
return chanIndex.Put(b.Bytes(), chanKey[:])
}

// ReAddChannelEdge removes the edge with the given channel ID from the
// database and adds the new edge to guarantee atomicity.
// This is important for option-scid-alias channels which may change its SCID
// over the course of its lifetime (e.g., public zero-conf channels).
func (c *ChannelGraph) ReAddChannelEdge(
chanID uint64, newEdge *models.ChannelEdgeInfo,
ourPolicy *models.ChannelEdgePolicy) error {

c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
edges := tx.ReadWriteBucket(edgeBucket)
if edges == nil {
return ErrEdgeNotFound
}
edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrEdgeNotFound
}
chanIndex := edges.NestedReadWriteBucket(channelPointBucket)
if chanIndex == nil {
return ErrEdgeNotFound
}
nodes := tx.ReadWriteBucket(nodeBucket)
if nodes == nil {
return ErrGraphNodeNotFound
}

var rawChanID [8]byte
byteOrder.PutUint64(rawChanID[:], chanID)

// We don't mark this channel as zombie, because we are readding
// it immediately after deleting it below. This method also
// deletes the channel from the graph cache, however there is
// no entry in the cache because we only add it if we have
// received the proof(signatures).
err := c.delChannelEdgeUnsafe(
edges, edgeIndex, chanIndex, nil,
rawChanID[:], false, false,
)
if err != nil {
return err
}

// Now we add the channel with the new edge info.
err = c.addChannelEdge(tx, newEdge)
if err != nil {
return err
}

// Also add the new channel update from our side.
_, err = updateEdgePolicy(tx, ourPolicy, c.graphCache)

return err
}, func() {})
if err != nil {
return err
}

// Remove the Cache entries.
c.rejectCache.remove(chanID)
c.chanCache.remove(chanID)

// We also make sure we clear the reject cache because we might have
// received a channel update msg with the new SCID from our peer which
// would have been put in the reject cache because the channel was not
// part of the graph.
c.rejectCache.remove(newEdge.ChannelID)
c.chanCache.remove(newEdge.ChannelID)

return nil
}

// HasChannelEdge returns true if the database knows of a channel edge with the
// passed channel ID, and false otherwise. If an edge with that ID is found
// within the graph, then two time stamps representing the last time the edge
Expand Down
82 changes: 82 additions & 0 deletions graph/db/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4063,3 +4063,85 @@ func TestClosedScid(t *testing.T) {
require.Nil(t, err)
require.True(t, exists)
}

// TestUpdateAliasChannel tests that the underlying graph channel information
// for an alias channel is deleted and readded under a different short channel
// id.
func TestUpdateAliasChannel(t *testing.T) {
t.Parallel()

graph, err := MakeTestGraph(t)
require.NoError(t, err, "unable to make test database")

// Create first node and add it to the graph.
node1, err := createTestVertex(graph.db)
require.NoError(t, err, "unable to create test node")
err = graph.AddLightningNode(node1)
require.NoError(t, err)

// Create second node and add it to the graph.
node2, err := createTestVertex(graph.db)
require.NoError(t, err, "unable to create test node")
err = graph.AddLightningNode(node2)
require.NoError(t, err)

// Adding a new channel edge to the graph.
edgeInfo, edge1, edge2 := createChannelEdge(
graph.db, node1, node2,
)
if err := graph.AddChannelEdge(edgeInfo); err != nil {
t.Fatalf("unable to create channel edge: %v", err)
}

// Add both edge policies.
err = graph.UpdateEdgePolicy(edge1)
require.NoError(t, err)

err = graph.UpdateEdgePolicy(edge2)
require.NoError(t, err)

oldSCID := edgeInfo.ChannelID

// Define the new channel id (under real conditions this is the new
// confirmed channel id. For non-zeroconf alias channels this is the
// same as the old SCID). but we for this test require a different SCID
// and therefore we add 1 to the old SCID.
newChanSCID := oldSCID + 1

// Readd the channel edgeInfo under a different scid.
newEdgeInfo := new(models.ChannelEdgeInfo)
*newEdgeInfo = *edgeInfo
newEdgeInfo.ChannelID = newChanSCID

// Create a new edge policy with the new channel id.
newEdge1 := new(models.ChannelEdgePolicy)
*newEdge1 = *edge1
newEdge1.ChannelID = newChanSCID

// Re-add the channel edge to the graph which deletes the old data and
// inserts the new edge data (edge info and edge policy).
err = graph.ReAddChannelEdge(oldSCID, newEdgeInfo, newEdge1)
require.NoError(t, err)

// The old channel data should not exist anymore.
_, _, _, err = graph.FetchChannelEdgesByID(oldSCID)
require.ErrorIs(t, err, ErrEdgeNotFound, "channel should not exist")

// Fetch the new channel data.
newEdgeInfo, newEdge1, newEdge2, err := graph.FetchChannelEdgesByID(
newChanSCID,
)
require.NoError(t, err, "unable to fetch channel by ID")

// Edge 1 should be different and should have another channel id.
err = compareEdgePolicies(newEdge1, edge1)
require.ErrorContains(t, err, "ChannelID doesn't match")

// Edge 2 should be nil, because we deleted the former peer data.
require.Nil(t, newEdge2)

// Because we did not change the signatures the edge info should be
// the same as soon as we swap in the old channel id.
newEdgeInfo.ChannelID = oldSCID
assertEdgeInfoEqual(t, newEdgeInfo, edgeInfo)
}
Loading

0 comments on commit 0c76e29

Please sign in to comment.