Skip to content

Commit

Permalink
channeldb: ChannelEdgePolicy2 encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
ellemouton committed Oct 20, 2023
1 parent 7adbe6a commit 2b691cd
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 27 deletions.
236 changes: 211 additions & 25 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ var (
// maps: updateTime || chanID -> nil
edgeUpdateIndexBucket = []byte("edge-update-index")

// maps: blockHeight || chanID -> nil
edgeUpdateIndex2Bucket = []byte("edge-update-index-2")

// channelPointBucket maps a channel's full outpoint (txid:index) to
// its short 8-byte channel ID. This bucket resides within the
// edgeBucket above, and can be used to quickly remove an edge due to
Expand Down Expand Up @@ -323,11 +326,17 @@ func (c *ChannelGraph) getChannelMap(edges kvdb.RBucket) (
return err
}

e, ok := edge.(*ChannelEdgePolicy1)
if !ok {
return fmt.Errorf("expected *ChannelEdgePolicy1, "+
"got: %T", edge)
}

var pub [33]byte
copy(pub[:], pubKey)

channelMap[key] = &ChannelEdgePolicyWithNode{
ChannelEdgePolicy1: *edge,
ChannelEdgePolicy1: *e,
Node: &LightningNode{
PubKeyBytes: pub,
},
Expand Down Expand Up @@ -3617,6 +3626,13 @@ type ChannelEdgePolicyWithNode struct {
Node *LightningNode
}

type edgePolicyEncodingInfo interface {
updateBucketKey() []byte
updateKey() []byte
serialize(w io.Writer, toNode []byte) error
typeByte() (edgePolicyEncodingType, bool)
}

// ChannelEdgePolicy1 represents a *directed* edge within the channel graph. For
// each channel in the database, there are two distinct edges: one for each
// possible direction of travel along the channel. The edges themselves hold
Expand Down Expand Up @@ -3679,7 +3695,37 @@ type ChannelEdgePolicy1 struct {
ExtraOpaqueData []byte
}

func (c *ChannelEdgePolicy1) typeByte() (edgePolicyEncodingType, bool) {
return 0, false
}

func (c *ChannelEdgePolicy1) IsNode1() bool {
return c.ChannelFlags&lnwire.ChanUpdateDirection == 0
}

func (c *ChannelEdgePolicy1) SCID() lnwire.ShortChannelID {
return lnwire.NewShortChanIDFromInt(c.ChannelID)
}

func (c *ChannelEdgePolicy1) serialize(w io.Writer, toNode []byte) error {
return serializeChanEdgePolicy1(w, c, toNode)
}

func (c *ChannelEdgePolicy1) updateBucketKey() []byte {
return edgeUpdateIndexBucket
}

func (c *ChannelEdgePolicy1) updateKey() []byte {
updateUnix := uint64(c.LastUpdate.Unix())
var indexKey [8 + 8]byte
byteOrder.PutUint64(indexKey[:8], updateUnix)
byteOrder.PutUint64(indexKey[8:], c.ChannelID)

return indexKey[:]
}

var _ models.ChannelEdgePolicy = (*ChannelEdgePolicy1)(nil)
var _ edgePolicyEncodingInfo = (*ChannelEdgePolicy1)(nil)

// Signature is a channel announcement signature, which is needed for proper
// edge policy announcement.
Expand Down Expand Up @@ -3738,6 +3784,105 @@ func (c *ChannelEdgePolicy1) ComputeFeeFromIncoming(
)
}

// edgePolicyEncoding indicates how the bytes for a channel edge policy have
// been serialised.
type edgePolicyEncodingType uint8

const (
// edgePolicy2EncodingType will be used as a prefix for edge policies
// advertised using the ChannelUpdate2 message. The type indicates how
// the bytes following should be deserialized.
edgePolicy2EncodingType edgePolicyEncodingType = 0
)

type ChannelEdgePolicy2 struct {
lnwire.ChannelUpdate2
}

func (c *ChannelEdgePolicy2) typeByte() (edgePolicyEncodingType, bool) {
return edgePolicy2EncodingType, true
}

func (c *ChannelEdgePolicy2) updateBucketKey() []byte {
return edgeUpdateIndex2Bucket
}

func (c *ChannelEdgePolicy2) updateKey() []byte {
indexKey := make([]byte, 4+8)
byteOrder.PutUint32(indexKey[:4], c.BlockHeight)
byteOrder.PutUint64(indexKey[8:], c.ShortChannelID.ToUint64())

return indexKey
}

const (
EdgePolicy2MsgType = tlv.Type(0)
EdgePolicy2ToNode = tlv.Type(1)
)

func (c *ChannelEdgePolicy2) serialize(w io.Writer, toNode []byte) error {
if len(c.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
return ErrTooManyExtraOpaqueBytes(len(c.ExtraOpaqueData))
}

var b bytes.Buffer
if err := c.Encode(&b, 0); err != nil {
return err
}

msg := b.Bytes()

records := []tlv.Record{
tlv.MakePrimitiveRecord(EdgePolicy2MsgType, &msg),
tlv.MakePrimitiveRecord(EdgePolicy2ToNode, &toNode),
}

stream, err := tlv.NewStream(records...)
if err != nil {
return err
}

return stream.Encode(w)
}

func deserializeChanEdgePolicy2Raw(r io.Reader) (*ChannelEdgePolicy2, []byte,
error) {

var (
msgBytes []byte
toNode []byte
)

records := []tlv.Record{
tlv.MakePrimitiveRecord(EdgePolicy2MsgType, &msgBytes),
tlv.MakePrimitiveRecord(EdgePolicy2ToNode, &toNode),
}

stream, err := tlv.NewStream(records...)
if err != nil {
return nil, nil, err
}

err = stream.Decode(r)
if err != nil {
return nil, nil, err
}

var (
policy ChannelEdgePolicy2
reader = bytes.NewReader(msgBytes)
)
err = policy.Decode(reader, 0)
if err != nil {
return nil, nil, err
}

return &policy, toNode, nil
}

var _ models.ChannelEdgePolicy = (*ChannelEdgePolicy2)(nil)
var _ edgePolicyEncodingInfo = (*ChannelEdgePolicy2)(nil)

func EdgePolicyFromUpdate(update lnwire.ChannelUpdate) (
*ChannelEdgePolicyWithNode, error) {

Expand Down Expand Up @@ -4789,32 +4934,46 @@ func deserializeChanEdgeInfo1(r io.Reader) (*ChannelEdgeInfo1, error) {
return &edgeInfo, nil
}

func putChanEdgePolicy(edges kvdb.RwBucket, edgePolicy models.ChannelEdgePolicy,
func putChanEdgePolicy(edges kvdb.RwBucket, edge models.ChannelEdgePolicy,
from, to []byte) error {

edge, ok := edgePolicy.(*ChannelEdgePolicy1)
encodingInfo, ok := edge.(edgePolicyEncodingInfo)
if !ok {
return fmt.Errorf("unhandled implementation of "+
"ChannelEdgePolicy: %T", edgePolicy)
return fmt.Errorf("type %T does not implement "+
"edgePolicyEncodingInfo", edge)
}

chanID := edge.SCID().ToUint64()

var edgeKey [33 + 8]byte
copy(edgeKey[:], from)
byteOrder.PutUint64(edgeKey[33:], edge.ChannelID)
byteOrder.PutUint64(edgeKey[33:], chanID)

var b bytes.Buffer
if err := serializeChanEdgePolicy(&b, edge, to); err != nil {

if typeByte, ok := encodingInfo.typeByte(); ok {
_, err := b.Write([]byte{chanEdgePolicyNewEncodingPrefix})
if err != nil {
return err
}

_, err = b.Write([]byte{byte(typeByte)})
if err != nil {
return err
}
}

if err := encodingInfo.serialize(&b, to); err != nil {
return err
}

// Before we write out the new edge, we'll create a new entry in the
// update index in order to keep it fresh.
updateUnix := uint64(edge.LastUpdate.Unix())
var indexKey [8 + 8]byte
byteOrder.PutUint64(indexKey[:8], updateUnix)
byteOrder.PutUint64(indexKey[8:], edge.ChannelID)
indexKey := encodingInfo.updateKey()

updateIndex, err := edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
updateIndex, err := edges.CreateBucketIfNotExists(
encodingInfo.updateBucketKey(),
)
if err != nil {
return err
}
Expand Down Expand Up @@ -4842,11 +5001,13 @@ func putChanEdgePolicy(edges kvdb.RwBucket, edgePolicy models.ChannelEdgePolicy,
return err
}

oldUpdateTime := uint64(oldEdgePolicy.LastUpdate.Unix())
oldPolicy, ok := oldEdgePolicy.(edgePolicyEncodingInfo)
if !ok {
return fmt.Errorf("type %T does not implement "+
"edgePolicyEncodingInfo", oldEdgePolicy)
}

var oldIndexKey [8 + 8]byte
byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime)
byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID)
oldIndexKey := oldPolicy.updateKey()

if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
return err
Expand All @@ -4857,11 +5018,12 @@ func putChanEdgePolicy(edges kvdb.RwBucket, edgePolicy models.ChannelEdgePolicy,
return err
}

updateEdgePolicyDisabledIndex(
edges, edge.ChannelID,
edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
edge.IsDisabled(),
err = updateEdgePolicyDisabledIndex(
edges, chanID, !edge.IsNode1(), edge.IsDisabled(),
)
if err != nil {
return err
}

return edges.Put(edgeKey[:], b.Bytes()[:])
}
Expand Down Expand Up @@ -4983,7 +5145,7 @@ func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
return edge1, edge2, nil
}

func serializeChanEdgePolicy(w io.Writer, edge *ChannelEdgePolicy1,
func serializeChanEdgePolicy1(w io.Writer, edge *ChannelEdgePolicy1,
to []byte) error {

err := wire.WriteVarBytes(w, 0, edge.SigBytes)
Expand Down Expand Up @@ -5069,15 +5231,21 @@ func deserializeChanEdgePolicy(r io.Reader,
pubKeyBytes, err)
}

e, ok := edge.(*ChannelEdgePolicy1)
if !ok {
return nil, fmt.Errorf("expected type ChannelEdgePolicy1, "+
"got: %T", edge)
}

policy := ChannelEdgePolicyWithNode{
ChannelEdgePolicy1: *edge,
ChannelEdgePolicy1: *e,
Node: &node,
}

return &policy, deserializeErr
}

func deserializeChanEdgePolicyRaw(reader io.Reader) (*ChannelEdgePolicy1,
func deserializeChanEdgePolicyRaw(reader io.Reader) (models.ChannelEdgePolicy,
[]byte, error) {

// Wrap the io.Reader in a bufio.Reader so that we can peak the first
Expand All @@ -5093,8 +5261,26 @@ func deserializeChanEdgePolicyRaw(reader io.Reader) (*ChannelEdgePolicy1,
return deserializeChanEdgePolicy1Raw(r)
}

return nil, nil, fmt.Errorf("unknown channel edge policy encoding "+
"type: %x", firstByte[0])
// Pop the encoding type byte.
var scratch [1]byte
if _, err = r.Read(scratch[:]); err != nil {
return nil, nil, err
}

// Now, read the encoding type byte.
if _, err = r.Read(scratch[:]); err != nil {
return nil, nil, err
}

encoding := edgePolicyEncodingType(scratch[0])
switch encoding {
case edgePolicy2EncodingType:
return deserializeChanEdgePolicy2Raw(r)

default:
return nil, nil, fmt.Errorf("unknown edge policy encoding "+
"type: %d", encoding)
}
}

func deserializeChanEdgePolicy1Raw(r io.Reader) (*ChannelEdgePolicy1, []byte,
Expand Down
4 changes: 2 additions & 2 deletions channeldb/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3024,7 +3024,7 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) {
edge1.ExtraOpaqueData = nil

var b bytes.Buffer
err = serializeChanEdgePolicy(&b, &edge1.ChannelEdgePolicy1, to)
err = serializeChanEdgePolicy1(&b, &edge1.ChannelEdgePolicy1, to)
if err != nil {
t.Fatalf("unable to serialize policy")
}
Expand All @@ -3034,7 +3034,7 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) {
edge1.MessageFlags = lnwire.ChanUpdateRequiredMaxHtlc
edge1.MaxHTLC = 13928598
var b2 bytes.Buffer
err = serializeChanEdgePolicy(&b2, &edge1.ChannelEdgePolicy1, to)
err = serializeChanEdgePolicy1(&b2, &edge1.ChannelEdgePolicy1, to)
if err != nil {
t.Fatalf("unable to serialize policy")
}
Expand Down
4 changes: 4 additions & 0 deletions channeldb/models/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/lnwire"
)

type ChannelEdgeInfo interface { //nolint:interfacebloat
Expand Down Expand Up @@ -52,4 +53,7 @@ type ChannelAuthProof interface {
}

type ChannelEdgePolicy interface {
SCID() lnwire.ShortChannelID
IsDisabled() bool
IsNode1() bool
}

0 comments on commit 2b691cd

Please sign in to comment.