From 883d6da006954ba375fa2446c7f48a8ff464973c Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 19 Jul 2024 11:19:35 +0200 Subject: [PATCH 01/11] lnwire21: add CodeInvalidBlinding To prevent the need to copy the entire onion_error.go file for a new Mission Control migration, this commit just updates the existing lnwire21/onion_error.go file with the new CodeInvalidBlinding code. The lnwire21 should not really ever be updated but adding a new code should be fine as it does not affect old migrations since this is a new code. --- channeldb/migration/lnwire21/onion_error.go | 49 +++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/channeldb/migration/lnwire21/onion_error.go b/channeldb/migration/lnwire21/onion_error.go index a2923b076f..51e962b35b 100644 --- a/channeldb/migration/lnwire21/onion_error.go +++ b/channeldb/migration/lnwire21/onion_error.go @@ -80,6 +80,7 @@ const ( CodeExpiryTooFar FailCode = 21 CodeInvalidOnionPayload = FlagPerm | 22 CodeMPPTimeout FailCode = 23 + CodeInvalidBlinding = FlagBadOnion | FlagPerm | 24 ) // String returns the string representation of the failure code. @@ -157,6 +158,9 @@ func (c FailCode) String() string { case CodeMPPTimeout: return "MPPTimeout" + case CodeInvalidBlinding: + return "InvalidBlinding" + default: return "" } @@ -571,6 +575,51 @@ func (f *FailInvalidOnionKey) Error() string { return fmt.Sprintf("InvalidOnionKey(onion_sha=%x)", f.OnionSHA256[:]) } +// FailInvalidBlinding is returned if there has been a route blinding related +// error. +type FailInvalidBlinding struct { + OnionSHA256 [sha256.Size]byte +} + +// Code returns the failure unique code. +// +// NOTE: Part of the FailureMessage interface. +func (f *FailInvalidBlinding) Code() FailCode { + return CodeInvalidBlinding +} + +// Returns a human readable string describing the target FailureMessage. +// +// NOTE: Implements the error interface. +func (f *FailInvalidBlinding) Error() string { + return f.Code().String() +} + +// Decode decodes the failure from bytes stream. +// +// NOTE: Part of the Serializable interface. +func (f *FailInvalidBlinding) Decode(r io.Reader, _ uint32) error { + return ReadElement(r, f.OnionSHA256[:]) +} + +// Encode writes the failure in bytes stream. +// +// NOTE: Part of the Serializable interface. +func (f *FailInvalidBlinding) Encode(w *bytes.Buffer, _ uint32) error { + return WriteElement(w, f.OnionSHA256[:]) +} + +// NewInvalidBlinding creates new instance of FailInvalidBlinding. +func NewInvalidBlinding(onion []byte) *FailInvalidBlinding { + // The spec allows empty onion hashes for invalid blinding, so we only + // include our onion hash if it's provided. + if onion == nil { + return &FailInvalidBlinding{} + } + + return &FailInvalidBlinding{OnionSHA256: sha256.Sum256(onion)} +} + // parseChannelUpdateCompatabilityMode will attempt to parse a channel updated // encoded into an onion error payload in two ways. First, we'll try the // compatibility oriented version wherein we'll _skip_ the length prefixing on From 78b0f70b420fca622bacee8b01a5e9e70d76ea34 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 6 Aug 2024 14:22:03 +0200 Subject: [PATCH 02/11] channeldb: boiler plate code for migration32 In preparation for the commit which will add the main logic for migration 32 (which will migrate the MC store to use a more minimal encoding), this commit just adds some of the code that the migration will need to the package. --- channeldb/log.go | 2 + channeldb/migration32/codec.go | 146 +++++++++ channeldb/migration32/hop.go | 99 ++++++ channeldb/migration32/log.go | 14 + channeldb/migration32/route.go | 580 +++++++++++++++++++++++++++++++++ 5 files changed, 841 insertions(+) create mode 100644 channeldb/migration32/codec.go create mode 100644 channeldb/migration32/hop.go create mode 100644 channeldb/migration32/log.go create mode 100644 channeldb/migration32/route.go diff --git a/channeldb/log.go b/channeldb/log.go index a53d662cdc..e50e5054ef 100644 --- a/channeldb/log.go +++ b/channeldb/log.go @@ -10,6 +10,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration24" "github.com/lightningnetwork/lnd/channeldb/migration30" "github.com/lightningnetwork/lnd/channeldb/migration31" + "github.com/lightningnetwork/lnd/channeldb/migration32" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/kvdb" ) @@ -42,5 +43,6 @@ func UseLogger(logger btclog.Logger) { migration24.UseLogger(logger) migration30.UseLogger(logger) migration31.UseLogger(logger) + migration32.UseLogger(logger) kvdb.UseLogger(logger) } diff --git a/channeldb/migration32/codec.go b/channeldb/migration32/codec.go new file mode 100644 index 0000000000..9f626c5748 --- /dev/null +++ b/channeldb/migration32/codec.go @@ -0,0 +1,146 @@ +package migration32 + +import ( + "encoding/binary" + "fmt" + "io" + + "github.com/btcsuite/btcd/wire" + lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21" +) + +var ( + // Big endian is the preferred byte order, due to cursor scans over + // integer keys iterating in order. + byteOrder = binary.BigEndian +) + +// ReadElement is a one-stop utility function to deserialize any datastructure +// encoded using the serialization format of the database. +func ReadElement(r io.Reader, element interface{}) error { + switch e := element.(type) { + case *uint32: + if err := binary.Read(r, byteOrder, e); err != nil { + return err + } + + case *lnwire.MilliSatoshi: + var a uint64 + if err := binary.Read(r, byteOrder, &a); err != nil { + return err + } + + *e = lnwire.MilliSatoshi(a) + + case *[]byte: + bytes, err := wire.ReadVarBytes(r, 0, 66000, "[]byte") + if err != nil { + return err + } + + *e = bytes + + case *int64, *uint64: + if err := binary.Read(r, byteOrder, e); err != nil { + return err + } + + case *bool: + if err := binary.Read(r, byteOrder, e); err != nil { + return err + } + + case *int32: + if err := binary.Read(r, byteOrder, e); err != nil { + return err + } + + default: + return UnknownElementType{"ReadElement", e} + } + + return nil +} + +// ReadElements deserializes a variable number of elements into the passed +// io.Reader, with each element being deserialized according to the ReadElement +// function. +func ReadElements(r io.Reader, elements ...interface{}) error { + for _, element := range elements { + err := ReadElement(r, element) + if err != nil { + return err + } + } + + return nil +} + +// UnknownElementType is an error returned when the codec is unable to encode or +// decode a particular type. +type UnknownElementType struct { + method string + element interface{} +} + +// Error returns the name of the method that encountered the error, as well as +// the type that was unsupported. +func (e UnknownElementType) Error() string { + return fmt.Sprintf("Unknown type in %s: %T", e.method, e.element) +} + +// WriteElement is a one-stop shop to write the big endian representation of +// any element which is to be serialized for storage on disk. The passed +// io.Writer should be backed by an appropriately sized byte slice, or be able +// to dynamically expand to accommodate additional data. +func WriteElement(w io.Writer, element interface{}) error { + switch e := element.(type) { + case int64, uint64: + if err := binary.Write(w, byteOrder, e); err != nil { + return err + } + + case uint32: + if err := binary.Write(w, byteOrder, e); err != nil { + return err + } + + case int32: + if err := binary.Write(w, byteOrder, e); err != nil { + return err + } + + case bool: + if err := binary.Write(w, byteOrder, e); err != nil { + return err + } + + case lnwire.MilliSatoshi: + if err := binary.Write(w, byteOrder, uint64(e)); err != nil { + return err + } + + case []byte: + if err := wire.WriteVarBytes(w, 0, e); err != nil { + return err + } + + default: + return UnknownElementType{"WriteElement", e} + } + + return nil +} + +// WriteElements is writes each element in the elements slice to the passed +// io.Writer using WriteElement. +func WriteElements(w io.Writer, elements ...interface{}) error { + for _, element := range elements { + err := WriteElement(w, element) + if err != nil { + return err + } + } + + return nil +} diff --git a/channeldb/migration32/hop.go b/channeldb/migration32/hop.go new file mode 100644 index 0000000000..40c68eacf3 --- /dev/null +++ b/channeldb/migration32/hop.go @@ -0,0 +1,99 @@ +package migration32 + +import ( + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightningnetwork/lnd/tlv" +) + +const ( + // AmtOnionType is the type used in the onion to reference the amount to + // send to the next hop. + AmtOnionType tlv.Type = 2 + + // LockTimeOnionType is the type used in the onion to reference the CLTV + // value that should be used for the next hop's HTLC. + LockTimeOnionType tlv.Type = 4 + + // NextHopOnionType is the type used in the onion to reference the ID + // of the next hop. + NextHopOnionType tlv.Type = 6 + + // EncryptedDataOnionType is the type used to include encrypted data + // provided by the receiver in the onion for use in blinded paths. + EncryptedDataOnionType tlv.Type = 10 + + // BlindingPointOnionType is the type used to include receiver provided + // ephemeral keys in the onion that are used in blinded paths. + BlindingPointOnionType tlv.Type = 12 + + // MetadataOnionType is the type used in the onion for the payment + // metadata. + MetadataOnionType tlv.Type = 16 + + // TotalAmtMsatBlindedType is the type used in the onion for the total + // amount field that is included in the final hop for blinded payments. + TotalAmtMsatBlindedType tlv.Type = 18 +) + +// NewAmtToFwdRecord creates a tlv.Record that encodes the amount_to_forward +// (type 2) for an onion payload. +func NewAmtToFwdRecord(amt *uint64) tlv.Record { + return tlv.MakeDynamicRecord( + AmtOnionType, amt, func() uint64 { + return tlv.SizeTUint64(*amt) + }, + tlv.ETUint64, tlv.DTUint64, + ) +} + +// NewLockTimeRecord creates a tlv.Record that encodes the outgoing_cltv_value +// (type 4) for an onion payload. +func NewLockTimeRecord(lockTime *uint32) tlv.Record { + return tlv.MakeDynamicRecord( + LockTimeOnionType, lockTime, func() uint64 { + return tlv.SizeTUint32(*lockTime) + }, + tlv.ETUint32, tlv.DTUint32, + ) +} + +// NewNextHopIDRecord creates a tlv.Record that encodes the short_channel_id +// (type 6) for an onion payload. +func NewNextHopIDRecord(cid *uint64) tlv.Record { + return tlv.MakePrimitiveRecord(NextHopOnionType, cid) +} + +// NewEncryptedDataRecord creates a tlv.Record that encodes the encrypted_data +// (type 10) record for an onion payload. +func NewEncryptedDataRecord(data *[]byte) tlv.Record { + return tlv.MakePrimitiveRecord(EncryptedDataOnionType, data) +} + +// NewBlindingPointRecord creates a tlv.Record that encodes the blinding_point +// (type 12) record for an onion payload. +func NewBlindingPointRecord(point **btcec.PublicKey) tlv.Record { + return tlv.MakePrimitiveRecord(BlindingPointOnionType, point) +} + +// NewMetadataRecord creates a tlv.Record that encodes the metadata (type 10) +// for an onion payload. +func NewMetadataRecord(metadata *[]byte) tlv.Record { + return tlv.MakeDynamicRecord( + MetadataOnionType, metadata, + func() uint64 { + return uint64(len(*metadata)) + }, + tlv.EVarBytes, tlv.DVarBytes, + ) +} + +// NewTotalAmtMsatBlinded creates a tlv.Record that encodes the +// total_amount_msat for the final an onion payload within a blinded route. +func NewTotalAmtMsatBlinded(amt *uint64) tlv.Record { + return tlv.MakeDynamicRecord( + TotalAmtMsatBlindedType, amt, func() uint64 { + return tlv.SizeTUint64(*amt) + }, + tlv.ETUint64, tlv.DTUint64, + ) +} diff --git a/channeldb/migration32/log.go b/channeldb/migration32/log.go new file mode 100644 index 0000000000..98709c28ea --- /dev/null +++ b/channeldb/migration32/log.go @@ -0,0 +1,14 @@ +package migration32 + +import ( + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized as disabled. This means the package will +// not perform any logging by default until a logger is set. +var log = btclog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/channeldb/migration32/route.go b/channeldb/migration32/route.go new file mode 100644 index 0000000000..9b96ae814c --- /dev/null +++ b/channeldb/migration32/route.go @@ -0,0 +1,580 @@ +package migration32 + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/wire" + lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21" + "github.com/lightningnetwork/lnd/tlv" +) + +const ( + // MPPOnionType is the type used in the onion to reference the MPP + // fields: total_amt and payment_addr. + MPPOnionType tlv.Type = 8 + + // AMPOnionType is the type used in the onion to reference the AMP + // fields: root_share, set_id, and child_index. + AMPOnionType tlv.Type = 14 +) + +// VertexSize is the size of the array to store a vertex. +const VertexSize = 33 + +// Vertex is a simple alias for the serialization of a compressed Bitcoin +// public key. +type Vertex [VertexSize]byte + +// Route represents a path through the channel graph which runs over one or +// more channels in succession. This struct carries all the information +// required to craft the Sphinx onion packet, and send the payment along the +// first hop in the path. A route is only selected as valid if all the channels +// have sufficient capacity to carry the initial payment amount after fees are +// accounted for. +type Route struct { + // TotalTimeLock is the cumulative (final) time lock across the entire + // route. This is the CLTV value that should be extended to the first + // hop in the route. All other hops will decrement the time-lock as + // advertised, leaving enough time for all hops to wait for or present + // the payment preimage to complete the payment. + TotalTimeLock uint32 + + // TotalAmount is the total amount of funds required to complete a + // payment over this route. This value includes the cumulative fees at + // each hop. As a result, the HTLC extended to the first-hop in the + // route will need to have at least this many satoshis, otherwise the + // route will fail at an intermediate node due to an insufficient + // amount of fees. + TotalAmount lnwire.MilliSatoshi + + // SourcePubKey is the pubkey of the node where this route originates + // from. + SourcePubKey Vertex + + // Hops contains details concerning the specific forwarding details at + // each hop. + Hops []*Hop +} + +// Hop represents an intermediate or final node of the route. This naming +// is in line with the definition given in BOLT #4: Onion Routing Protocol. +// The struct houses the channel along which this hop can be reached and +// the values necessary to create the HTLC that needs to be sent to the +// next hop. It is also used to encode the per-hop payload included within +// the Sphinx packet. +type Hop struct { + // PubKeyBytes is the raw bytes of the public key of the target node. + PubKeyBytes Vertex + + // ChannelID is the unique channel ID for the channel. The first 3 + // bytes are the block height, the next 3 the index within the block, + // and the last 2 bytes are the output index for the channel. + ChannelID uint64 + + // OutgoingTimeLock is the timelock value that should be used when + // crafting the _outgoing_ HTLC from this hop. + OutgoingTimeLock uint32 + + // AmtToForward is the amount that this hop will forward to the next + // hop. This value is less than the value that the incoming HTLC + // carries as a fee will be subtracted by the hop. + AmtToForward lnwire.MilliSatoshi + + // MPP encapsulates the data required for option_mpp. This field should + // only be set for the final hop. + MPP *MPP + + // AMP encapsulates the data required for option_amp. This field should + // only be set for the final hop. + AMP *AMP + + // CustomRecords if non-nil are a set of additional TLV records that + // should be included in the forwarding instructions for this node. + CustomRecords CustomSet + + // LegacyPayload if true, then this signals that this node doesn't + // understand the new TLV payload, so we must instead use the legacy + // payload. + // + // NOTE: we should no longer ever create a Hop with Legacy set to true. + // The only reason we are keeping this member is that it could be the + // case that we have serialised hops persisted to disk where + // LegacyPayload is true. + LegacyPayload bool + + // Metadata is additional data that is sent along with the payment to + // the payee. + Metadata []byte + + // EncryptedData is an encrypted data blob includes for hops that are + // part of a blinded route. + EncryptedData []byte + + // BlindingPoint is an ephemeral public key used by introduction nodes + // in blinded routes to unblind their portion of the route and pass on + // the next ephemeral key to the next blinded node to do the same. + BlindingPoint *btcec.PublicKey + + // TotalAmtMsat is the total amount for a blinded payment, potentially + // spread over more than one HTLC. This field should only be set for + // the final hop in a blinded path. + TotalAmtMsat lnwire.MilliSatoshi +} + +// MPP is a record that encodes the fields necessary for multi-path payments. +type MPP struct { + // paymentAddr is a random, receiver-generated value used to avoid + // collisions with concurrent payers. + paymentAddr [32]byte + + // totalMsat is the total value of the payment, potentially spread + // across more than one HTLC. + totalMsat lnwire.MilliSatoshi +} + +// Record returns a tlv.Record that can be used to encode or decode this record. +func (r *MPP) Record() tlv.Record { + // Fixed-size, 32 byte payment address followed by truncated 64-bit + // total msat. + size := func() uint64 { + return 32 + tlv.SizeTUint64(uint64(r.totalMsat)) + } + + return tlv.MakeDynamicRecord( + MPPOnionType, r, size, MPPEncoder, MPPDecoder, + ) +} + +const ( + // minMPPLength is the minimum length of a serialized MPP TLV record, + // which occurs when the truncated encoding of total_amt_msat takes 0 + // bytes, leaving only the payment_addr. + minMPPLength = 32 + + // maxMPPLength is the maximum length of a serialized MPP TLV record, + // which occurs when the truncated encoding of total_amt_msat takes 8 + // bytes. + maxMPPLength = 40 +) + +// MPPEncoder writes the MPP record to the provided io.Writer. +func MPPEncoder(w io.Writer, val interface{}, buf *[8]byte) error { + if v, ok := val.(*MPP); ok { + err := tlv.EBytes32(w, &v.paymentAddr, buf) + if err != nil { + return err + } + + return tlv.ETUint64T(w, uint64(v.totalMsat), buf) + } + + return tlv.NewTypeForEncodingErr(val, "MPP") +} + +// MPPDecoder reads the MPP record to the provided io.Reader. +func MPPDecoder(r io.Reader, val interface{}, buf *[8]byte, l uint64) error { + if v, ok := val.(*MPP); ok && minMPPLength <= l && l <= maxMPPLength { + if err := tlv.DBytes32(r, &v.paymentAddr, buf, 32); err != nil { + return err + } + + var total uint64 + if err := tlv.DTUint64(r, &total, buf, l-32); err != nil { + return err + } + v.totalMsat = lnwire.MilliSatoshi(total) + + return nil + } + + return tlv.NewTypeForDecodingErr(val, "MPP", l, maxMPPLength) +} + +// AMP is a record that encodes the fields necessary for atomic multi-path +// payments. +type AMP struct { + rootShare [32]byte + setID [32]byte + childIndex uint32 +} + +// AMPEncoder writes the AMP record to the provided io.Writer. +func AMPEncoder(w io.Writer, val interface{}, buf *[8]byte) error { + if v, ok := val.(*AMP); ok { + if err := tlv.EBytes32(w, &v.rootShare, buf); err != nil { + return err + } + + if err := tlv.EBytes32(w, &v.setID, buf); err != nil { + return err + } + + return tlv.ETUint32T(w, v.childIndex, buf) + } + + return tlv.NewTypeForEncodingErr(val, "AMP") +} + +const ( + // minAMPLength is the minimum length of a serialized AMP TLV record, + // which occurs when the truncated encoding of child_index takes 0 + // bytes, leaving only the root_share and set_id. + minAMPLength = 64 + + // maxAMPLength is the maximum length of a serialized AMP TLV record, + // which occurs when the truncated encoding of a child_index takes 2 + // bytes. + maxAMPLength = 68 +) + +// AMPDecoder reads the AMP record from the provided io.Reader. +func AMPDecoder(r io.Reader, val interface{}, buf *[8]byte, l uint64) error { + if v, ok := val.(*AMP); ok && minAMPLength <= l && l <= maxAMPLength { + if err := tlv.DBytes32(r, &v.rootShare, buf, 32); err != nil { + return err + } + + if err := tlv.DBytes32(r, &v.setID, buf, 32); err != nil { + return err + } + + return tlv.DTUint32(r, &v.childIndex, buf, l-minAMPLength) + } + + return tlv.NewTypeForDecodingErr(val, "AMP", l, maxAMPLength) +} + +// Record returns a tlv.Record that can be used to encode or decode this record. +func (a *AMP) Record() tlv.Record { + return tlv.MakeDynamicRecord( + AMPOnionType, a, a.PayloadSize, AMPEncoder, AMPDecoder, + ) +} + +// PayloadSize returns the size this record takes up in encoded form. +func (a *AMP) PayloadSize() uint64 { + return 32 + 32 + tlv.SizeTUint32(a.childIndex) +} + +const ( + // CustomTypeStart is the start of the custom tlv type range as defined + // in BOLT 01. + CustomTypeStart = 65536 +) + +// CustomSet stores a set of custom key/value pairs. +type CustomSet map[uint64][]byte + +// Validate checks that all custom records are in the custom type range. +func (c CustomSet) Validate() error { + for key := range c { + if key < CustomTypeStart { + return fmt.Errorf("no custom records with types "+ + "below %v allowed", CustomTypeStart) + } + } + + return nil +} + +// SerializeRoute serializes a route. +func SerializeRoute(w io.Writer, r Route) error { + if err := WriteElements(w, + r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:], + ); err != nil { + return err + } + + if err := WriteElements(w, uint32(len(r.Hops))); err != nil { + return err + } + + for _, h := range r.Hops { + if err := serializeHop(w, h); err != nil { + return err + } + } + + return nil +} + +func serializeHop(w io.Writer, h *Hop) error { + if err := WriteElements(w, + h.PubKeyBytes[:], + h.ChannelID, + h.OutgoingTimeLock, + h.AmtToForward, + ); err != nil { + return err + } + + if err := binary.Write(w, byteOrder, h.LegacyPayload); err != nil { + return err + } + + // For legacy payloads, we don't need to write any TLV records, so + // we'll write a zero indicating the our serialized TLV map has no + // records. + if h.LegacyPayload { + return WriteElements(w, uint32(0)) + } + + // Gather all non-primitive TLV records so that they can be serialized + // as a single blob. + // + // TODO(conner): add migration to unify all fields in a single TLV + // blobs. The split approach will cause headaches down the road as more + // fields are added, which we can avoid by having a single TLV stream + // for all payload fields. + var records []tlv.Record + if h.MPP != nil { + records = append(records, h.MPP.Record()) + } + + // Add blinding point and encrypted data if present. + if h.EncryptedData != nil { + records = append(records, NewEncryptedDataRecord( + &h.EncryptedData, + )) + } + + if h.BlindingPoint != nil { + records = append(records, NewBlindingPointRecord( + &h.BlindingPoint, + )) + } + + if h.AMP != nil { + records = append(records, h.AMP.Record()) + } + + if h.Metadata != nil { + records = append(records, NewMetadataRecord(&h.Metadata)) + } + + if h.TotalAmtMsat != 0 { + totalMsatInt := uint64(h.TotalAmtMsat) + records = append( + records, NewTotalAmtMsatBlinded(&totalMsatInt), + ) + } + + // Final sanity check to absolutely rule out custom records that are not + // custom and write into the standard range. + if err := h.CustomRecords.Validate(); err != nil { + return err + } + + // Convert custom records to tlv and add to the record list. + // MapToRecords sorts the list, so adding it here will keep the list + // canonical. + tlvRecords := tlv.MapToRecords(h.CustomRecords) + records = append(records, tlvRecords...) + + // Otherwise, we'll transform our slice of records into a map of the + // raw bytes, then serialize them in-line with a length (number of + // elements) prefix. + mapRecords, err := tlv.RecordsToMap(records) + if err != nil { + return err + } + + numRecords := uint32(len(mapRecords)) + if err := WriteElements(w, numRecords); err != nil { + return err + } + + for recordType, rawBytes := range mapRecords { + if err := WriteElements(w, recordType); err != nil { + return err + } + + if err := wire.WriteVarBytes(w, 0, rawBytes); err != nil { + return err + } + } + + return nil +} + +// DeserializeRoute deserializes a route. +func DeserializeRoute(r io.Reader) (Route, error) { + rt := Route{} + if err := ReadElements(r, + &rt.TotalTimeLock, &rt.TotalAmount, + ); err != nil { + return rt, err + } + + var pub []byte + if err := ReadElements(r, &pub); err != nil { + return rt, err + } + copy(rt.SourcePubKey[:], pub) + + var numHops uint32 + if err := ReadElements(r, &numHops); err != nil { + return rt, err + } + + var hops []*Hop + for i := uint32(0); i < numHops; i++ { + hop, err := deserializeHop(r) + if err != nil { + return rt, err + } + hops = append(hops, hop) + } + rt.Hops = hops + + return rt, nil +} + +// maxOnionPayloadSize is the largest Sphinx payload possible, so we don't need +// to read/write a TLV stream larger than this. +const maxOnionPayloadSize = 1300 + +func deserializeHop(r io.Reader) (*Hop, error) { + h := &Hop{} + + var pub []byte + if err := ReadElements(r, &pub); err != nil { + return nil, err + } + copy(h.PubKeyBytes[:], pub) + + if err := ReadElements(r, + &h.ChannelID, &h.OutgoingTimeLock, &h.AmtToForward, + ); err != nil { + return nil, err + } + + // TODO(roasbeef): change field to allow LegacyPayload false to be the + // legacy default? + err := binary.Read(r, byteOrder, &h.LegacyPayload) + if err != nil { + return nil, err + } + + var numElements uint32 + if err := ReadElements(r, &numElements); err != nil { + return nil, err + } + + // If there're no elements, then we can return early. + if numElements == 0 { + return h, nil + } + + tlvMap := make(map[uint64][]byte) + for i := uint32(0); i < numElements; i++ { + var tlvType uint64 + if err := ReadElements(r, &tlvType); err != nil { + return nil, err + } + + rawRecordBytes, err := wire.ReadVarBytes( + r, 0, maxOnionPayloadSize, "tlv", + ) + if err != nil { + return nil, err + } + + tlvMap[tlvType] = rawRecordBytes + } + + // If the MPP type is present, remove it from the generic TLV map and + // parse it back into a proper MPP struct. + // + // TODO(conner): add migration to unify all fields in a single TLV + // blobs. The split approach will cause headaches down the road as more + // fields are added, which we can avoid by having a single TLV stream + // for all payload fields. + mppType := uint64(MPPOnionType) + if mppBytes, ok := tlvMap[mppType]; ok { + delete(tlvMap, mppType) + + var ( + mpp = &MPP{} + mppRec = mpp.Record() + r = bytes.NewReader(mppBytes) + ) + err := mppRec.Decode(r, uint64(len(mppBytes))) + if err != nil { + return nil, err + } + h.MPP = mpp + } + + // If encrypted data or blinding key are present, remove them from + // the TLV map and parse into proper types. + encryptedDataType := uint64(EncryptedDataOnionType) + if data, ok := tlvMap[encryptedDataType]; ok { + delete(tlvMap, encryptedDataType) + h.EncryptedData = data + } + + blindingType := uint64(BlindingPointOnionType) + if blindingPoint, ok := tlvMap[blindingType]; ok { + delete(tlvMap, blindingType) + + h.BlindingPoint, err = btcec.ParsePubKey(blindingPoint) + if err != nil { + return nil, fmt.Errorf("invalid blinding point: %w", + err) + } + } + + ampType := uint64(AMPOnionType) + if ampBytes, ok := tlvMap[ampType]; ok { + delete(tlvMap, ampType) + + var ( + amp = &{} + ampRec = amp.Record() + r = bytes.NewReader(ampBytes) + ) + err := ampRec.Decode(r, uint64(len(ampBytes))) + if err != nil { + return nil, err + } + h.AMP = amp + } + + // If the metadata type is present, remove it from the tlv map and + // populate directly on the hop. + metadataType := uint64(MetadataOnionType) + if metadata, ok := tlvMap[metadataType]; ok { + delete(tlvMap, metadataType) + + h.Metadata = metadata + } + + totalAmtMsatType := uint64(TotalAmtMsatBlindedType) + if totalAmtMsat, ok := tlvMap[totalAmtMsatType]; ok { + delete(tlvMap, totalAmtMsatType) + + var ( + totalAmtMsatInt uint64 + buf [8]byte + ) + if err := tlv.DTUint64( + bytes.NewReader(totalAmtMsat), + &totalAmtMsatInt, + &buf, + uint64(len(totalAmtMsat)), + ); err != nil { + return nil, err + } + + h.TotalAmtMsat = lnwire.MilliSatoshi(totalAmtMsatInt) + } + + h.CustomRecords = tlvMap + + return h, nil +} From 13fed35578c3cfd3151c40131359d8ebe2548a24 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 6 Aug 2024 14:33:07 +0200 Subject: [PATCH 03/11] routing+channeldb: migrate MC store to use minimal Route encoding Add a new mcRoute type that houses the data about a route that MC actually uses. Then add a migration (channeldb/migration32) that migrates the existing store from its current serialisation to the new, more minimal serialisation. --- channeldb/db.go | 5 + channeldb/migration32/migration.go | 53 +++ channeldb/migration32/migration_test.go | 214 ++++++++++++ .../migration32/mission_control_store.go | 317 ++++++++++++++++++ routing/missioncontrol.go | 6 +- routing/missioncontrol_store.go | 91 ++++- routing/missioncontrol_store_test.go | 9 +- routing/result_interpretation.go | 146 +++++--- routing/result_interpretation_test.go | 133 ++++---- 9 files changed, 838 insertions(+), 136 deletions(-) create mode 100644 channeldb/migration32/migration.go create mode 100644 channeldb/migration32/migration_test.go create mode 100644 channeldb/migration32/mission_control_store.go diff --git a/channeldb/db.go b/channeldb/db.go index 1e210d032a..d8ba05f3d2 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -26,6 +26,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration29" "github.com/lightningnetwork/lnd/channeldb/migration30" "github.com/lightningnetwork/lnd/channeldb/migration31" + "github.com/lightningnetwork/lnd/channeldb/migration32" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/invoices" @@ -286,6 +287,10 @@ var ( number: 31, migration: migration31.DeleteLastPublishedTxTLB, }, + { + number: 32, + migration: migration32.MigrateMCRouteSerialisation, + }, } // optionalVersions stores all optional migrations that are applied diff --git a/channeldb/migration32/migration.go b/channeldb/migration32/migration.go new file mode 100644 index 0000000000..14a1c11f5c --- /dev/null +++ b/channeldb/migration32/migration.go @@ -0,0 +1,53 @@ +package migration32 + +import ( + "bytes" + "fmt" + + "github.com/lightningnetwork/lnd/kvdb" +) + +// MigrateMCRouteSerialisation reads all the mission control store entries and +// re-serializes them using a minimal route serialisation so that only the parts +// of the route that are actually required for mission control are persisted +func MigrateMCRouteSerialisation(tx kvdb.RwTx) error { + log.Infof("Migrating Mission Control store to use a more minimal " + + "encoding for routes") + + resultBucket := tx.ReadWriteBucket(resultsKey) + + // If the results bucket does not exist then there are no entries in + // the mission control store yet and so there is nothing to migrate. + if resultBucket == nil { + return nil + } + + // For each entry, read it into memory using the old encoding. Then, + // extract the more minimal route, re-encode and persist the entry. + return resultBucket.ForEach(func(k, v []byte) error { + // Read the entry using the old encoding. + resultOld, err := deserializeOldResult(k, v) + if err != nil { + return err + } + + // Convert to the new payment result format with the minimal + // route. + resultNew := convertPaymentResult(resultOld) + + // Serialise the new payment result using the new encoding. + key, resultNewBytes, err := serializeNewResult(resultNew) + if err != nil { + return err + } + + // Make sure that the derived key is the same. + if !bytes.Equal(key, k) { + return fmt.Errorf("new payment result key (%v) is "+ + "not the same as the old key (%v)", key, k) + } + + // Finally, overwrite the previous value with the new encoding. + return resultBucket.Put(k, resultNewBytes) + }) +} diff --git a/channeldb/migration32/migration_test.go b/channeldb/migration32/migration_test.go new file mode 100644 index 0000000000..ae33a5802c --- /dev/null +++ b/channeldb/migration32/migration_test.go @@ -0,0 +1,214 @@ +package migration32 + +import ( + "encoding/hex" + "testing" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21" + "github.com/lightningnetwork/lnd/channeldb/migtest" + "github.com/lightningnetwork/lnd/kvdb" +) + +var ( + failureIndex = 8 + testPub = Vertex{2, 202, 4} + testPub2 = Vertex{22, 202, 4} + + pubkeyBytes, _ = hex.DecodeString( + "598ec453728e0ffe0ae2f5e174243cf58f2" + + "a3f2c83d2457b43036db568b11093", + ) + pubKeyY = new(btcec.FieldVal) + _ = pubKeyY.SetByteSlice(pubkeyBytes) + pubkey = btcec.NewPublicKey(new(btcec.FieldVal).SetInt(4), pubKeyY) + + paymentResultCommon1 = paymentResultCommon{ + id: 0, + timeFwd: time.Unix(0, 1), + timeReply: time.Unix(0, 2), + success: false, + failureSourceIdx: &failureIndex, + failure: &lnwire.FailFeeInsufficient{}, + } + + paymentResultCommon2 = paymentResultCommon{ + id: 2, + timeFwd: time.Unix(0, 4), + timeReply: time.Unix(0, 7), + success: true, + } +) + +// TestMigrateMCRouteSerialisation tests that the MigrateMCRouteSerialisation +// migration function correctly migrates the MC store from using the old route +// encoding to using the newer, more minimal route encoding. +func TestMigrateMCRouteSerialisation(t *testing.T) { + customRecord := map[uint64][]byte{ + 65536: {4, 2, 2}, + } + + resultsOld := []*paymentResultOld{ + { + paymentResultCommon: paymentResultCommon1, + route: &Route{ + TotalTimeLock: 100, + TotalAmount: 400, + SourcePubKey: testPub, + Hops: []*Hop{ + // A hop with MPP, AMP + { + PubKeyBytes: testPub, + ChannelID: 100, + OutgoingTimeLock: 300, + AmtToForward: 500, + MPP: &MPP{ + paymentAddr: [32]byte{ + 4, 5, + }, + totalMsat: 900, + }, + AMP: &{ + rootShare: [32]byte{ + 0, 0, + }, + setID: [32]byte{ + 5, 5, 5, + }, + childIndex: 90, + }, + CustomRecords: customRecord, + Metadata: []byte{6, 7, 7}, + }, + // A legacy hop. + { + PubKeyBytes: testPub, + ChannelID: 800, + OutgoingTimeLock: 4, + AmtToForward: 4, + LegacyPayload: true, + }, + // A hop with a blinding key. + { + PubKeyBytes: testPub, + ChannelID: 800, + OutgoingTimeLock: 4, + AmtToForward: 4, + BlindingPoint: pubkey, + EncryptedData: []byte{ + 1, 2, 3, + }, + TotalAmtMsat: 600, + }, + }, + }, + }, + { + paymentResultCommon: paymentResultCommon2, + route: &Route{ + TotalTimeLock: 101, + TotalAmount: 401, + SourcePubKey: testPub2, + Hops: []*Hop{ + { + PubKeyBytes: testPub, + ChannelID: 800, + OutgoingTimeLock: 4, + AmtToForward: 4, + BlindingPoint: pubkey, + EncryptedData: []byte{ + 1, 2, 3, + }, + TotalAmtMsat: 600, + }, + }, + }, + }, + } + + expectedResultsNew := []*paymentResultNew{ + { + paymentResultCommon: paymentResultCommon1, + route: &mcRoute{ + sourcePubKey: testPub, + totalAmount: 400, + hops: []*mcHop{ + { + channelID: 100, + pubKeyBytes: testPub, + amtToFwd: 500, + }, + { + channelID: 800, + pubKeyBytes: testPub, + amtToFwd: 4, + }, + { + channelID: 800, + pubKeyBytes: testPub, + amtToFwd: 4, + hasBlindingPoint: true, + }, + }, + }, + }, + { + paymentResultCommon: paymentResultCommon2, + route: &mcRoute{ + sourcePubKey: testPub2, + totalAmount: 401, + hops: []*mcHop{ + { + channelID: 800, + pubKeyBytes: testPub, + amtToFwd: 4, + hasBlindingPoint: true, + }, + }, + }, + }, + } + + // Prime the database with some mission control data that uses the + // old route encoding. + before := func(tx kvdb.RwTx) error { + resultBucket, err := tx.CreateTopLevelBucket(resultsKey) + if err != nil { + return err + } + + for _, result := range resultsOld { + k, v, err := serializeOldResult(result) + if err != nil { + return err + } + + if err := resultBucket.Put(k, v); err != nil { + return err + } + } + + return nil + } + + // After the migration, ensure that all the relevant info was + // maintained. + after := func(tx kvdb.RwTx) error { + m := make(map[string]interface{}) + for _, result := range expectedResultsNew { + k, v, err := serializeNewResult(result) + if err != nil { + return err + } + + m[string(k)] = string(v) + } + + return migtest.VerifyDB(tx, resultsKey, m) + } + + migtest.ApplyMigration( + t, before, after, MigrateMCRouteSerialisation, false, + ) +} diff --git a/channeldb/migration32/mission_control_store.go b/channeldb/migration32/mission_control_store.go new file mode 100644 index 0000000000..d50f525357 --- /dev/null +++ b/channeldb/migration32/mission_control_store.go @@ -0,0 +1,317 @@ +package migration32 + +import ( + "bytes" + "io" + "math" + "time" + + "github.com/btcsuite/btcd/wire" + lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21" +) + +const ( + // unknownFailureSourceIdx is the database encoding of an unknown error + // source. + unknownFailureSourceIdx = -1 +) + +var ( + // resultsKey is the fixed key under which the attempt results are + // stored. + resultsKey = []byte("missioncontrol-results") +) + +// paymentResultCommon holds the fields that are shared by the old and new +// payment result encoding. +type paymentResultCommon struct { + id uint64 + timeFwd, timeReply time.Time + success bool + failureSourceIdx *int + failure lnwire.FailureMessage +} + +// paymentResultOld is the information that becomes available when a payment +// attempt completes. +type paymentResultOld struct { + paymentResultCommon + route *Route +} + +// deserializeOldResult deserializes a payment result using the old encoding. +func deserializeOldResult(k, v []byte) (*paymentResultOld, error) { + // Parse payment id. + result := paymentResultOld{ + paymentResultCommon: paymentResultCommon{ + id: byteOrder.Uint64(k[8:]), + }, + } + + r := bytes.NewReader(v) + + // Read timestamps, success status and failure source index. + var ( + timeFwd, timeReply uint64 + dbFailureSourceIdx int32 + ) + + err := ReadElements( + r, &timeFwd, &timeReply, &result.success, &dbFailureSourceIdx, + ) + if err != nil { + return nil, err + } + + // Convert time stamps to local time zone for consistent logging. + result.timeFwd = time.Unix(0, int64(timeFwd)).Local() + result.timeReply = time.Unix(0, int64(timeReply)).Local() + + // Convert from unknown index magic number to nil value. + if dbFailureSourceIdx != unknownFailureSourceIdx { + failureSourceIdx := int(dbFailureSourceIdx) + result.failureSourceIdx = &failureSourceIdx + } + + // Read route. + route, err := DeserializeRoute(r) + if err != nil { + return nil, err + } + result.route = &route + + // Read failure. + failureBytes, err := wire.ReadVarBytes(r, 0, math.MaxUint16, "failure") + if err != nil { + return nil, err + } + if len(failureBytes) > 0 { + result.failure, err = lnwire.DecodeFailureMessage( + bytes.NewReader(failureBytes), 0, + ) + if err != nil { + return nil, err + } + } + + return &result, nil +} + +// convertPaymentResult converts a paymentResultOld to a paymentResultNew. +func convertPaymentResult(old *paymentResultOld) *paymentResultNew { + return &paymentResultNew{ + paymentResultCommon: old.paymentResultCommon, + route: extractMCRoute(old.route), + } +} + +// paymentResultNew is the information that becomes available when a payment +// attempt completes. +type paymentResultNew struct { + paymentResultCommon + route *mcRoute +} + +// extractMCRoute extracts the fields required by MC from the Route struct to +// create the more minima mcRoute struct. +func extractMCRoute(route *Route) *mcRoute { + return &mcRoute{ + sourcePubKey: route.SourcePubKey, + totalAmount: route.TotalAmount, + hops: extractMCHops(route.Hops), + } +} + +// extractMCHops extracts the Hop fields that MC actually uses from a slice of +// Hops. +func extractMCHops(hops []*Hop) []*mcHop { + mcHops := make([]*mcHop, len(hops)) + for i, hop := range hops { + mcHops[i] = extractMCHop(hop) + } + + return mcHops +} + +// extractMCHop extracts the Hop fields that MC actually uses from a Hop. +func extractMCHop(hop *Hop) *mcHop { + return &mcHop{ + channelID: hop.ChannelID, + pubKeyBytes: hop.PubKeyBytes, + amtToFwd: hop.AmtToForward, + hasBlindingPoint: hop.BlindingPoint != nil, + } +} + +// mcRoute holds the bare minimum info about a payment attempt route that MC +// requires. +type mcRoute struct { + sourcePubKey Vertex + totalAmount lnwire.MilliSatoshi + hops []*mcHop +} + +// mcHop holds the bare minimum info about a payment attempt route hop that MC +// requires. +type mcHop struct { + channelID uint64 + pubKeyBytes Vertex + amtToFwd lnwire.MilliSatoshi + hasBlindingPoint bool +} + +// serializeOldResult serializes a payment result and returns a key and value +// byte slice to insert into the bucket. +func serializeOldResult(rp *paymentResultOld) ([]byte, []byte, error) { + // Write timestamps, success status, failure source index and route. + var b bytes.Buffer + var dbFailureSourceIdx int32 + if rp.failureSourceIdx == nil { + dbFailureSourceIdx = unknownFailureSourceIdx + } else { + dbFailureSourceIdx = int32(*rp.failureSourceIdx) + } + err := WriteElements( + &b, + uint64(rp.timeFwd.UnixNano()), + uint64(rp.timeReply.UnixNano()), + rp.success, dbFailureSourceIdx, + ) + if err != nil { + return nil, nil, err + } + + if err := SerializeRoute(&b, *rp.route); err != nil { + return nil, nil, err + } + + // Write failure. If there is no failure message, write an empty + // byte slice. + var failureBytes bytes.Buffer + if rp.failure != nil { + err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0) + if err != nil { + return nil, nil, err + } + } + err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes()) + if err != nil { + return nil, nil, err + } + // Compose key that identifies this result. + key := getResultKeyOld(rp) + + return key, b.Bytes(), nil +} + +// getResultKeyOld returns a byte slice representing a unique key for this +// payment result. +func getResultKeyOld(rp *paymentResultOld) []byte { + var keyBytes [8 + 8 + 33]byte + + // Identify records by a combination of time, payment id and sender pub + // key. This allows importing mission control data from an external + // source without key collisions and keeps the records sorted + // chronologically. + byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano())) + byteOrder.PutUint64(keyBytes[8:], rp.id) + copy(keyBytes[16:], rp.route.SourcePubKey[:]) + + return keyBytes[:] +} + +// serializeNewResult serializes a payment result and returns a key and value +// byte slice to insert into the bucket. +func serializeNewResult(rp *paymentResultNew) ([]byte, []byte, error) { + // Write timestamps, success status, failure source index and route. + var b bytes.Buffer + + var dbFailureSourceIdx int32 + if rp.failureSourceIdx == nil { + dbFailureSourceIdx = unknownFailureSourceIdx + } else { + dbFailureSourceIdx = int32(*rp.failureSourceIdx) + } + + err := WriteElements( + &b, + uint64(rp.timeFwd.UnixNano()), + uint64(rp.timeReply.UnixNano()), + rp.success, dbFailureSourceIdx, + ) + if err != nil { + return nil, nil, err + } + + if err := serializeMCRoute(&b, rp.route); err != nil { + return nil, nil, err + } + + // Write failure. If there is no failure message, write an empty + // byte slice. + var failureBytes bytes.Buffer + if rp.failure != nil { + err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0) + if err != nil { + return nil, nil, err + } + } + err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes()) + if err != nil { + return nil, nil, err + } + + // Compose key that identifies this result. + key := getResultKeyNew(rp) + + return key, b.Bytes(), nil +} + +// getResultKeyNew returns a byte slice representing a unique key for this +// payment result. +func getResultKeyNew(rp *paymentResultNew) []byte { + var keyBytes [8 + 8 + 33]byte + + // Identify records by a combination of time, payment id and sender pub + // key. This allows importing mission control data from an external + // source without key collisions and keeps the records sorted + // chronologically. + byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano())) + byteOrder.PutUint64(keyBytes[8:], rp.id) + copy(keyBytes[16:], rp.route.sourcePubKey[:]) + + return keyBytes[:] +} + +// serializeMCRoute serializes an mcRoute and writes the bytes to the given +// io.Writer. +func serializeMCRoute(w io.Writer, r *mcRoute) error { + if err := WriteElements( + w, r.totalAmount, r.sourcePubKey[:], + ); err != nil { + return err + } + + if err := WriteElements(w, uint32(len(r.hops))); err != nil { + return err + } + + for _, h := range r.hops { + if err := serializeNewHop(w, h); err != nil { + return err + } + } + + return nil +} + +// serializeMCRoute serializes an mcHop and writes the bytes to the given +// io.Writer. +func serializeNewHop(w io.Writer, h *mcHop) error { + return WriteElements(w, + h.pubKeyBytes[:], + h.channelID, + h.amtToFwd, + h.hasBlindingPoint, + ) +} diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 1dceace7c0..cf358a2533 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -199,7 +199,7 @@ type MissionControlPairSnapshot struct { type paymentResult struct { id uint64 timeFwd, timeReply time.Time - route *route.Route + route *mcRoute success bool failureSourceIdx *int failure lnwire.FailureMessage @@ -421,7 +421,7 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, id: paymentID, failureSourceIdx: failureSourceIdx, failure: failure, - route: rt, + route: extractMCRoute(rt), } return m.processPaymentResult(result) @@ -439,7 +439,7 @@ func (m *MissionControl) ReportPaymentSuccess(paymentID uint64, timeReply: timestamp, id: paymentID, success: true, - route: rt, + route: extractMCRoute(rt), } _, err := m.processPaymentResult(result) diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go index e07a46136e..c38a252de1 100644 --- a/routing/missioncontrol_store.go +++ b/routing/missioncontrol_store.go @@ -5,6 +5,7 @@ import ( "container/list" "encoding/binary" "fmt" + "io" "math" "sync" "time" @@ -187,7 +188,7 @@ func serializeResult(rp *paymentResult) ([]byte, []byte, error) { return nil, nil, err } - if err := channeldb.SerializeRoute(&b, *rp.route); err != nil { + if err := serializeRoute(&b, rp.route); err != nil { return nil, nil, err } @@ -211,6 +212,88 @@ func serializeResult(rp *paymentResult) ([]byte, []byte, error) { return key, b.Bytes(), nil } +// deserializeRoute deserializes the mcRoute from the given io.Reader. +func deserializeRoute(r io.Reader) (*mcRoute, error) { + var rt mcRoute + if err := channeldb.ReadElements(r, &rt.totalAmount); err != nil { + return nil, err + } + + var pub []byte + if err := channeldb.ReadElements(r, &pub); err != nil { + return nil, err + } + copy(rt.sourcePubKey[:], pub) + + var numHops uint32 + if err := channeldb.ReadElements(r, &numHops); err != nil { + return nil, err + } + + var hops []*mcHop + for i := uint32(0); i < numHops; i++ { + hop, err := deserializeHop(r) + if err != nil { + return nil, err + } + hops = append(hops, hop) + } + rt.hops = hops + + return &rt, nil +} + +// deserializeHop deserializes the mcHop from the given io.Reader. +func deserializeHop(r io.Reader) (*mcHop, error) { + var h mcHop + + var pub []byte + if err := channeldb.ReadElements(r, &pub); err != nil { + return nil, err + } + copy(h.pubKeyBytes[:], pub) + + if err := channeldb.ReadElements(r, + &h.channelID, &h.amtToFwd, &h.hasBlindingPoint, + ); err != nil { + return nil, err + } + + return &h, nil +} + +// serializeRoute serializes a mcRouter and writes the resulting bytes to the +// given io.Writer. +func serializeRoute(w io.Writer, r *mcRoute) error { + err := channeldb.WriteElements(w, r.totalAmount, r.sourcePubKey[:]) + if err != nil { + return err + } + + if err := channeldb.WriteElements(w, uint32(len(r.hops))); err != nil { + return err + } + + for _, h := range r.hops { + if err := serializeHop(w, h); err != nil { + return err + } + } + + return nil +} + +// serializeHop serializes a mcHop and writes the resulting bytes to the given +// io.Writer. +func serializeHop(w io.Writer, h *mcHop) error { + return channeldb.WriteElements(w, + h.pubKeyBytes[:], + h.channelID, + h.amtToFwd, + h.hasBlindingPoint, + ) +} + // deserializeResult deserializes a payment result. func deserializeResult(k, v []byte) (*paymentResult, error) { // Parse payment id. @@ -244,11 +327,11 @@ func deserializeResult(k, v []byte) (*paymentResult, error) { } // Read route. - route, err := channeldb.DeserializeRoute(r) + route, err := deserializeRoute(r) if err != nil { return nil, err } - result.route = &route + result.route = route // Read failure. failureBytes, err := wire.ReadVarBytes( @@ -499,7 +582,7 @@ func getResultKey(rp *paymentResult) []byte { // chronologically. byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano())) byteOrder.PutUint64(keyBytes[8:], rp.id) - copy(keyBytes[16:], rp.route.SourcePubKey[:]) + copy(keyBytes[16:], rp.route.sourcePubKey[:]) return keyBytes[:] } diff --git a/routing/missioncontrol_store_test.go b/routing/missioncontrol_store_test.go index 34b925a3e1..ff9b270260 100644 --- a/routing/missioncontrol_store_test.go +++ b/routing/missioncontrol_store_test.go @@ -18,12 +18,11 @@ const testMaxRecords = 2 var ( // mcStoreTestRoute is a test route for the mission control store tests. - mcStoreTestRoute = route.Route{ - SourcePubKey: route.Vertex{1}, - Hops: []*route.Hop{ + mcStoreTestRoute = mcRoute{ + sourcePubKey: route.Vertex{1}, + hops: []*mcHop{ { - PubKeyBytes: route.Vertex{2}, - LegacyPayload: true, + pubKeyBytes: route.Vertex{2}, }, }, } diff --git a/routing/result_interpretation.go b/routing/result_interpretation.go index 4118286e64..825e7ebbd8 100644 --- a/routing/result_interpretation.go +++ b/routing/result_interpretation.go @@ -76,7 +76,7 @@ type interpretedResult struct { // interpretResult interprets a payment outcome and returns an object that // contains information required to update mission control. -func interpretResult(rt *route.Route, success bool, failureSrcIdx *int, +func interpretResult(rt *mcRoute, success bool, failureSrcIdx *int, failure lnwire.FailureMessage) *interpretedResult { i := &interpretedResult{ @@ -92,15 +92,14 @@ func interpretResult(rt *route.Route, success bool, failureSrcIdx *int, } // processSuccess processes a successful payment attempt. -func (i *interpretedResult) processSuccess(route *route.Route) { +func (i *interpretedResult) processSuccess(route *mcRoute) { // For successes, all nodes must have acted in the right way. Therefore // we mark all of them with a success result. - i.successPairRange(route, 0, len(route.Hops)-1) + i.successPairRange(route, 0, len(route.hops)-1) } // processFail processes a failed payment attempt. -func (i *interpretedResult) processFail( - rt *route.Route, errSourceIdx *int, +func (i *interpretedResult) processFail(rt *mcRoute, errSourceIdx *int, failure lnwire.FailureMessage) { if errSourceIdx == nil { @@ -125,10 +124,8 @@ func (i *interpretedResult) processFail( i.processPaymentOutcomeSelf(rt, failure) // A failure from the final hop was received. - case len(rt.Hops): - i.processPaymentOutcomeFinal( - rt, failure, - ) + case len(rt.hops): + i.processPaymentOutcomeFinal(rt, failure) // An intermediate hop failed. Interpret the outcome, update reputation // and try again. @@ -144,7 +141,7 @@ func (i *interpretedResult) processFail( // node. This indicates that the introduction node is not obeying the route // blinding specification, as we expect all errors from the introduction node // to be source from it. -func (i *interpretedResult) processPaymentOutcomeBadIntro(route *route.Route, +func (i *interpretedResult) processPaymentOutcomeBadIntro(route *mcRoute, introIdx, errSourceIdx int) { // We fail the introduction node for not obeying the specification. @@ -161,14 +158,14 @@ func (i *interpretedResult) processPaymentOutcomeBadIntro(route *route.Route, // a final failure reason because the recipient can't process the // payment (independent of the introduction failing to convert the // error, we can't complete the payment if the last hop fails). - if errSourceIdx == len(route.Hops) { + if errSourceIdx == len(route.hops) { i.finalFailureReason = &reasonError } } // processPaymentOutcomeSelf handles failures sent by ourselves. -func (i *interpretedResult) processPaymentOutcomeSelf( - rt *route.Route, failure lnwire.FailureMessage) { +func (i *interpretedResult) processPaymentOutcomeSelf(rt *mcRoute, + failure lnwire.FailureMessage) { switch failure.(type) { @@ -181,7 +178,7 @@ func (i *interpretedResult) processPaymentOutcomeSelf( i.failNode(rt, 1) // If this was a payment to a direct peer, we can stop trying. - if len(rt.Hops) == 1 { + if len(rt.hops) == 1 { i.finalFailureReason = &reasonError } @@ -191,15 +188,15 @@ func (i *interpretedResult) processPaymentOutcomeSelf( // available in the link has been updated. default: log.Warnf("Routing failure for local channel %v occurred", - rt.Hops[0].ChannelID) + rt.hops[0].channelID) } } // processPaymentOutcomeFinal handles failures sent by the final hop. -func (i *interpretedResult) processPaymentOutcomeFinal( - route *route.Route, failure lnwire.FailureMessage) { +func (i *interpretedResult) processPaymentOutcomeFinal(route *mcRoute, + failure lnwire.FailureMessage) { - n := len(route.Hops) + n := len(route.hops) failNode := func() { i.failNode(route, n) @@ -292,9 +289,10 @@ func (i *interpretedResult) processPaymentOutcomeFinal( // processPaymentOutcomeIntermediate handles failures sent by an intermediate // hop. -func (i *interpretedResult) processPaymentOutcomeIntermediate( - route *route.Route, errorSourceIdx int, - failure lnwire.FailureMessage) { +// +//nolint:funlen +func (i *interpretedResult) processPaymentOutcomeIntermediate(route *mcRoute, + errorSourceIdx int, failure lnwire.FailureMessage) { reportOutgoing := func() { i.failPair( @@ -398,8 +396,8 @@ func (i *interpretedResult) processPaymentOutcomeIntermediate( // Set the node pair for which a channel update may be out of // date. The second chance logic uses the policyFailure field. i.policyFailure = &DirectedNodePair{ - From: route.Hops[errorSourceIdx-1].PubKeyBytes, - To: route.Hops[errorSourceIdx].PubKeyBytes, + From: route.hops[errorSourceIdx-1].pubKeyBytes, + To: route.hops[errorSourceIdx].pubKeyBytes, } reportOutgoing() @@ -427,8 +425,8 @@ func (i *interpretedResult) processPaymentOutcomeIntermediate( // Set the node pair for which a channel update may be out of // date. The second chance logic uses the policyFailure field. i.policyFailure = &DirectedNodePair{ - From: route.Hops[errorSourceIdx-1].PubKeyBytes, - To: route.Hops[errorSourceIdx].PubKeyBytes, + From: route.hops[errorSourceIdx-1].pubKeyBytes, + To: route.hops[errorSourceIdx].pubKeyBytes, } // We report incoming channel. If a second pair is granted in @@ -502,16 +500,14 @@ func (i *interpretedResult) processPaymentOutcomeIntermediate( // Note that if LND is extended to support multiple blinded // routes, this will terminate the payment without re-trying // the other routes. - if introIdx == len(route.Hops)-1 { + if introIdx == len(route.hops)-1 { i.finalFailureReason = &reasonError } else { // If there are other hops between the recipient and // introduction node, then we just penalize the last // hop in the blinded route to minimize the storage of // results for ephemeral keys. - i.failPairBalance( - route, len(route.Hops)-1, - ) + i.failPairBalance(route, len(route.hops)-1) } // In all other cases, we penalize the reporting node. These are all @@ -525,9 +521,9 @@ func (i *interpretedResult) processPaymentOutcomeIntermediate( // route, using the same indexing in the route that we use for errorSourceIdx // (i.e., that we consider our own node to be at index zero). A boolean is // returned to indicate whether the route contains a blinded portion at all. -func introductionPointIndex(route *route.Route) (int, bool) { - for i, hop := range route.Hops { - if hop.BlindingPoint != nil { +func introductionPointIndex(route *mcRoute) (int, bool) { + for i, hop := range route.hops { + if hop.hasBlindingPoint { return i + 1, true } } @@ -537,8 +533,8 @@ func introductionPointIndex(route *route.Route) (int, bool) { // processPaymentOutcomeUnknown processes a payment outcome for which no failure // message or source is available. -func (i *interpretedResult) processPaymentOutcomeUnknown(route *route.Route) { - n := len(route.Hops) +func (i *interpretedResult) processPaymentOutcomeUnknown(route *mcRoute) { + n := len(route.hops) // If this is a direct payment, the destination must be at fault. if n == 1 { @@ -553,12 +549,60 @@ func (i *interpretedResult) processPaymentOutcomeUnknown(route *route.Route) { i.failPairRange(route, 0, n-1) } +// extractMCRoute extracts the fields required by MC from the Route struct to +// create the more minima mcRoute struct. +func extractMCRoute(route *route.Route) *mcRoute { + return &mcRoute{ + sourcePubKey: route.SourcePubKey, + totalAmount: route.TotalAmount, + hops: extractMCHops(route.Hops), + } +} + +// extractMCHops extracts the Hop fields that MC actually uses from a slice of +// Hops. +func extractMCHops(hops []*route.Hop) []*mcHop { + mcHops := make([]*mcHop, len(hops)) + for i, hop := range hops { + mcHops[i] = extractMCHop(hop) + } + + return mcHops +} + +// extractMCHop extracts the Hop fields that MC actually uses from a Hop. +func extractMCHop(hop *route.Hop) *mcHop { + return &mcHop{ + channelID: hop.ChannelID, + pubKeyBytes: hop.PubKeyBytes, + amtToFwd: hop.AmtToForward, + hasBlindingPoint: hop.BlindingPoint != nil, + } +} + +// mcRoute holds the bare minimum info about a payment attempt route that MC +// requires. +type mcRoute struct { + sourcePubKey route.Vertex + totalAmount lnwire.MilliSatoshi + hops []*mcHop +} + +// mcHop holds the bare minimum info about a payment attempt route hop that MC +// requires. +type mcHop struct { + channelID uint64 + pubKeyBytes route.Vertex + amtToFwd lnwire.MilliSatoshi + hasBlindingPoint bool +} + // failNode marks the node indicated by idx in the route as failed. It also // marks the incoming and outgoing channels of the node as failed. This function // intentionally panics when the self node is failed. -func (i *interpretedResult) failNode(rt *route.Route, idx int) { +func (i *interpretedResult) failNode(rt *mcRoute, idx int) { // Mark the node as failing. - i.nodeFailure = &rt.Hops[idx-1].PubKeyBytes + i.nodeFailure = &rt.hops[idx-1].pubKeyBytes // Mark the incoming connection as failed for the node. We intent to // penalize as much as we can for a node level failure, including future @@ -574,7 +618,7 @@ func (i *interpretedResult) failNode(rt *route.Route, idx int) { // If not the ultimate node, mark the outgoing connection as failed for // the node. - if idx < len(rt.Hops) { + if idx < len(rt.hops) { outgoingChannelIdx := idx outPair, _ := getPair(rt, outgoingChannelIdx) i.pairResults[outPair] = failPairResult(0) @@ -584,18 +628,14 @@ func (i *interpretedResult) failNode(rt *route.Route, idx int) { // failPairRange marks the node pairs from node fromIdx to node toIdx as failed // in both direction. -func (i *interpretedResult) failPairRange( - rt *route.Route, fromIdx, toIdx int) { - +func (i *interpretedResult) failPairRange(rt *mcRoute, fromIdx, toIdx int) { for idx := fromIdx; idx <= toIdx; idx++ { i.failPair(rt, idx) } } // failPair marks a pair as failed in both directions. -func (i *interpretedResult) failPair( - rt *route.Route, idx int) { - +func (i *interpretedResult) failPair(rt *mcRoute, idx int) { pair, _ := getPair(rt, idx) // Report pair in both directions without a minimum penalization amount. @@ -604,9 +644,7 @@ func (i *interpretedResult) failPair( } // failPairBalance marks a pair as failed with a minimum penalization amount. -func (i *interpretedResult) failPairBalance( - rt *route.Route, channelIdx int) { - +func (i *interpretedResult) failPairBalance(rt *mcRoute, channelIdx int) { pair, amt := getPair(rt, channelIdx) i.pairResults[pair] = failPairResult(amt) @@ -614,9 +652,7 @@ func (i *interpretedResult) failPairBalance( // successPairRange marks the node pairs from node fromIdx to node toIdx as // succeeded. -func (i *interpretedResult) successPairRange( - rt *route.Route, fromIdx, toIdx int) { - +func (i *interpretedResult) successPairRange(rt *mcRoute, fromIdx, toIdx int) { for idx := fromIdx; idx <= toIdx; idx++ { pair, amt := getPair(rt, idx) @@ -626,21 +662,21 @@ func (i *interpretedResult) successPairRange( // getPair returns a node pair from the route and the amount passed between that // pair. -func getPair(rt *route.Route, channelIdx int) (DirectedNodePair, +func getPair(rt *mcRoute, channelIdx int) (DirectedNodePair, lnwire.MilliSatoshi) { - nodeTo := rt.Hops[channelIdx].PubKeyBytes + nodeTo := rt.hops[channelIdx].pubKeyBytes var ( nodeFrom route.Vertex amt lnwire.MilliSatoshi ) if channelIdx == 0 { - nodeFrom = rt.SourcePubKey - amt = rt.TotalAmount + nodeFrom = rt.sourcePubKey + amt = rt.totalAmount } else { - nodeFrom = rt.Hops[channelIdx-1].PubKeyBytes - amt = rt.Hops[channelIdx-1].AmtToForward + nodeFrom = rt.hops[channelIdx-1].pubKeyBytes + amt = rt.hops[channelIdx-1].amtToFwd } pair := NewDirectedNodePair(nodeFrom, nodeTo) diff --git a/routing/result_interpretation_test.go b/routing/result_interpretation_test.go index bf7d6d3edd..8d452a6e8c 100644 --- a/routing/result_interpretation_test.go +++ b/routing/result_interpretation_test.go @@ -4,7 +4,6 @@ import ( "reflect" "testing" - "github.com/btcsuite/btcd/btcec/v2" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" @@ -15,109 +14,105 @@ var ( {1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, } - // blindingPoint provides a non-nil blinding point (value is never - // used). - blindingPoint = &btcec.PublicKey{} - - routeOneHop = route.Route{ - SourcePubKey: hops[0], - TotalAmount: 100, - Hops: []*route.Hop{ - {PubKeyBytes: hops[1], AmtToForward: 99}, + routeOneHop = mcRoute{ + sourcePubKey: hops[0], + totalAmount: 100, + hops: []*mcHop{ + {pubKeyBytes: hops[1], amtToFwd: 99}, }, } - routeTwoHop = route.Route{ - SourcePubKey: hops[0], - TotalAmount: 100, - Hops: []*route.Hop{ - {PubKeyBytes: hops[1], AmtToForward: 99}, - {PubKeyBytes: hops[2], AmtToForward: 97}, + routeTwoHop = mcRoute{ + sourcePubKey: hops[0], + totalAmount: 100, + hops: []*mcHop{ + {pubKeyBytes: hops[1], amtToFwd: 99}, + {pubKeyBytes: hops[2], amtToFwd: 97}, }, } - routeThreeHop = route.Route{ - SourcePubKey: hops[0], - TotalAmount: 100, - Hops: []*route.Hop{ - {PubKeyBytes: hops[1], AmtToForward: 99}, - {PubKeyBytes: hops[2], AmtToForward: 97}, - {PubKeyBytes: hops[3], AmtToForward: 94}, + routeThreeHop = mcRoute{ + sourcePubKey: hops[0], + totalAmount: 100, + hops: []*mcHop{ + {pubKeyBytes: hops[1], amtToFwd: 99}, + {pubKeyBytes: hops[2], amtToFwd: 97}, + {pubKeyBytes: hops[3], amtToFwd: 94}, }, } - routeFourHop = route.Route{ - SourcePubKey: hops[0], - TotalAmount: 100, - Hops: []*route.Hop{ - {PubKeyBytes: hops[1], AmtToForward: 99}, - {PubKeyBytes: hops[2], AmtToForward: 97}, - {PubKeyBytes: hops[3], AmtToForward: 94}, - {PubKeyBytes: hops[4], AmtToForward: 90}, + routeFourHop = mcRoute{ + sourcePubKey: hops[0], + totalAmount: 100, + hops: []*mcHop{ + {pubKeyBytes: hops[1], amtToFwd: 99}, + {pubKeyBytes: hops[2], amtToFwd: 97}, + {pubKeyBytes: hops[3], amtToFwd: 94}, + {pubKeyBytes: hops[4], amtToFwd: 90}, }, } // blindedMultiHop is a blinded path where there are cleartext hops // before the introduction node, and an intermediate blinded hop before // the recipient after it. - blindedMultiHop = route.Route{ - SourcePubKey: hops[0], - TotalAmount: 100, - Hops: []*route.Hop{ - {PubKeyBytes: hops[1], AmtToForward: 99}, + blindedMultiHop = mcRoute{ + sourcePubKey: hops[0], + totalAmount: 100, + hops: []*mcHop{ + {pubKeyBytes: hops[1], amtToFwd: 99}, { - PubKeyBytes: hops[2], - AmtToForward: 95, - BlindingPoint: blindingPoint, + pubKeyBytes: hops[2], + amtToFwd: 95, + hasBlindingPoint: true, }, - {PubKeyBytes: hops[3], AmtToForward: 88}, - {PubKeyBytes: hops[4], AmtToForward: 77}, + {pubKeyBytes: hops[3], amtToFwd: 88}, + {pubKeyBytes: hops[4], amtToFwd: 77}, }, } // blindedSingleHop is a blinded path with a single blinded hop after // the introduction node. - blindedSingleHop = route.Route{ - SourcePubKey: hops[0], - TotalAmount: 100, - Hops: []*route.Hop{ - {PubKeyBytes: hops[1], AmtToForward: 99}, + blindedSingleHop = mcRoute{ + sourcePubKey: hops[0], + totalAmount: 100, + hops: []*mcHop{ + {pubKeyBytes: hops[1], amtToFwd: 99}, { - PubKeyBytes: hops[2], - AmtToForward: 95, - BlindingPoint: blindingPoint, + pubKeyBytes: hops[2], + amtToFwd: 95, + hasBlindingPoint: true, }, - {PubKeyBytes: hops[3], AmtToForward: 88}, + {pubKeyBytes: hops[3], amtToFwd: 88}, }, } // blindedMultiToIntroduction is a blinded path which goes directly // to the introduction node, with multiple blinded hops after it. - blindedMultiToIntroduction = route.Route{ - SourcePubKey: hops[0], - TotalAmount: 100, - Hops: []*route.Hop{ + blindedMultiToIntroduction = mcRoute{ + sourcePubKey: hops[0], + totalAmount: 100, + hops: []*mcHop{ { - PubKeyBytes: hops[1], - AmtToForward: 90, - BlindingPoint: blindingPoint, + pubKeyBytes: hops[1], + amtToFwd: 90, + hasBlindingPoint: true, }, - {PubKeyBytes: hops[2], AmtToForward: 75}, - {PubKeyBytes: hops[3], AmtToForward: 58}, + {pubKeyBytes: hops[2], amtToFwd: 75}, + {pubKeyBytes: hops[3], amtToFwd: 58}, }, } // blindedIntroReceiver is a blinded path where the introduction node // is the recipient. - blindedIntroReceiver = route.Route{ - SourcePubKey: hops[0], - TotalAmount: 100, - Hops: []*route.Hop{ - {PubKeyBytes: hops[1], AmtToForward: 95}, + blindedIntroReceiver = mcRoute{ + sourcePubKey: hops[0], + totalAmount: 100, + hops: []*mcHop{ + {pubKeyBytes: hops[1], amtToFwd: 95}, { - PubKeyBytes: hops[2], - AmtToForward: 90, - BlindingPoint: blindingPoint, + pubKeyBytes: hops[2], + amtToFwd: 90, + hasBlindingPoint: true, }, }, } @@ -134,7 +129,7 @@ func getPolicyFailure(from, to int) *DirectedNodePair { type resultTestCase struct { name string - route *route.Route + route *mcRoute success bool failureSrcIdx int failure lnwire.FailureMessage @@ -159,7 +154,7 @@ var resultTestCases = []resultTestCase{ }, }, - // Tests that a expiry too soon failure result is properly interpreted. + // Tests that an expiry too soon failure result is properly interpreted. { name: "fail expiry too soon", route: &routeFourHop, From 1c0bba25d8502fcc006f6aabe47992367462d877 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 6 Aug 2024 14:36:14 +0200 Subject: [PATCH 04/11] docs: add release notes entry --- docs/release-notes/release-notes-0.19.0.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 35d93ed107..a806f2dba6 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -47,10 +47,15 @@ ## Testing ## Database +* [Migrate the mission control + store](https://github.com/lightningnetwork/lnd/pull/8911) to use a more + minimal encoding for payment attempt routes. + ## Code Health ## Tooling and Documentation # Contributors (Alphabetical Order) +* Elle Mouton * Ziggie From 8fa3683954d19b223af1d6f0ddbeab002f90fd45 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 13 Aug 2024 18:00:55 +0200 Subject: [PATCH 05/11] routing: introduce `missionControlDB` abstraction So that `missionControlStore` can be unaware of the backing DB structure it is writing to. In an upcoming commit when we change mission control to write to namespaced buckets instead, we then only need to update the `namespacedDB` implementation. --- routing/missioncontrol.go | 75 +++++++++++++++++++++++++++- routing/missioncontrol_store.go | 46 ++++++++--------- routing/missioncontrol_store_test.go | 12 +++-- 3 files changed, 105 insertions(+), 28 deletions(-) diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index cf358a2533..919e1e231c 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -7,6 +7,7 @@ import ( "time" "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcwallet/walletdb" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" @@ -217,7 +218,8 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, } store, err := newMissionControlStore( - db, cfg.MaxMcHistory, cfg.McFlushInterval, + newNamespacedDB(db), cfg.MaxMcHistory, + cfg.McFlushInterval, ) if err != nil { return nil, err @@ -528,3 +530,74 @@ func (m *MissionControl) applyPaymentResult( return i.finalFailureReason } + +// namespacedDB is an implementation of the missionControlDB that gives a user +// of the interface access to the top level mission control bucket. In a +// follow-up commit (accompanied by a migration), this will change to giving +// the user of the interface access to a namespaced sub-bucket instead. +type namespacedDB struct { + topLevelBucketKey []byte + db kvdb.Backend +} + +// A compile-time check to ensure that namespacedDB implements missionControlDB. +var _ missionControlDB = (*namespacedDB)(nil) + +// newNamespacedDB creates a new instance of missionControlDB where the DB will +// have access to the top level bucket. +func newNamespacedDB(db kvdb.Backend) missionControlDB { + return &namespacedDB{ + db: db, + topLevelBucketKey: resultsKey, + } +} + +// update can be used to perform reads and writes on the given bucket. +// +// NOTE: this is part of the missionControlDB interface. +func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error, + reset func()) error { + + return n.db.Update(func(tx kvdb.RwTx) error { + mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey) + if err != nil { + return fmt.Errorf("cannot create top level mission "+ + "control bucket: %w", err) + } + + return f(mcStoreBkt) + }, reset) +} + +// view can be used to perform reads on the given bucket. +// +// NOTE: this is part of the missionControlDB interface. +func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error, + reset func()) error { + + return n.db.View(func(tx kvdb.RTx) error { + mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey) + if mcStoreBkt == nil { + return fmt.Errorf("top level mission control bucket " + + "not found") + } + + return f(mcStoreBkt) + }, reset) +} + +// clear can be used to reset the contents of the store's bucket. +// +// NOTE: this is part of the missionControlDB interface. +func (n *namespacedDB) clear() error { + return n.db.Update(func(tx kvdb.RwTx) error { + err := tx.DeleteTopLevelBucket(n.topLevelBucketKey) + if err != nil { + return err + } + + _, err = tx.CreateTopLevelBucket(n.topLevelBucketKey) + + return err + }, func() {}) +} diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go index c38a252de1..00948f058e 100644 --- a/routing/missioncontrol_store.go +++ b/routing/missioncontrol_store.go @@ -32,6 +32,21 @@ const ( unknownFailureSourceIdx = -1 ) +// missionControlDB is an interface that defines the database methods that a +// single missionControlStore has access to. It allows the missionControlStore +// to be unaware of the overall DB structure and restricts its access to the DB +// by only providing it the bucket that it needs to care about. +type missionControlDB interface { + // update can be used to perform reads and writes on the given bucket. + update(f func(bkt kvdb.RwBucket) error, reset func()) error + + // view can be used to perform reads on the given bucket. + view(f func(bkt kvdb.RBucket) error, reset func()) error + + // clear can be used to reset the contents of the store's bucket. + clear() error +} + // missionControlStore is a bolt db based implementation of a mission control // store. It stores the raw payment attempt data from which the internal mission // controls state can be rederived on startup. This allows the mission control @@ -41,7 +56,7 @@ const ( type missionControlStore struct { done chan struct{} wg sync.WaitGroup - db kvdb.Backend + db missionControlDB // queueCond is signalled when items are put into the queue. queueCond *sync.Cond @@ -67,7 +82,7 @@ type missionControlStore struct { flushInterval time.Duration } -func newMissionControlStore(db kvdb.Backend, maxRecords int, +func newMissionControlStore(db missionControlDB, maxRecords int, flushInterval time.Duration) (*missionControlStore, error) { var ( @@ -76,13 +91,7 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int, ) // Create buckets if not yet existing. - err := kvdb.Update(db, func(tx kvdb.RwTx) error { - resultsBucket, err := tx.CreateTopLevelBucket(resultsKey) - if err != nil { - return fmt.Errorf("cannot create results bucket: %w", - err) - } - + err := db.update(func(resultsBucket kvdb.RwBucket) error { // Collect all keys to be able to quickly calculate the // difference when updating the DB state. c := resultsBucket.ReadCursor() @@ -119,20 +128,12 @@ func (b *missionControlStore) clear() error { b.queueCond.L.Lock() defer b.queueCond.L.Unlock() - err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { - if err := tx.DeleteTopLevelBucket(resultsKey); err != nil { - return err - } - - _, err := tx.CreateTopLevelBucket(resultsKey) - return err - }, func() {}) - - if err != nil { + if err := b.db.clear(); err != nil { return err } b.queue = list.New() + return nil } @@ -140,8 +141,7 @@ func (b *missionControlStore) clear() error { func (b *missionControlStore) fetchAll() ([]*paymentResult, error) { var results []*paymentResult - err := kvdb.View(b.db, func(tx kvdb.RTx) error { - resultBucket := tx.ReadBucket(resultsKey) + err := b.db.view(func(resultBucket kvdb.RBucket) error { results = make([]*paymentResult, 0) return resultBucket.ForEach(func(k, v []byte) error { @@ -509,9 +509,7 @@ func (b *missionControlStore) storeResults() error { } } - err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { - bucket := tx.ReadWriteBucket(resultsKey) - + err := b.db.update(func(bucket kvdb.RwBucket) error { for e := l.Front(); e != nil; e = e.Next() { pr, ok := e.Value.(*paymentResult) if !ok { diff --git a/routing/missioncontrol_store_test.go b/routing/missioncontrol_store_test.go index ff9b270260..28196d50e1 100644 --- a/routing/missioncontrol_store_test.go +++ b/routing/missioncontrol_store_test.go @@ -57,7 +57,9 @@ func newMCStoreTestHarness(t testing.TB, maxRecords int, require.NoError(t, db.Close()) }) - store, err := newMissionControlStore(db, maxRecords, flushInterval) + store, err := newMissionControlStore( + newNamespacedDB(db), maxRecords, flushInterval, + ) require.NoError(t, err) return mcStoreTestHarness{db: db, store: store} @@ -110,7 +112,9 @@ func TestMissionControlStore(t *testing.T) { require.Equal(t, &result2, results[1]) // Recreate store to test pruning. - store, err = newMissionControlStore(db, testMaxRecords, time.Second) + store, err = newMissionControlStore( + newNamespacedDB(db), testMaxRecords, time.Second, + ) require.NoError(t, err) // Add a newer result which failed due to mpp timeout. @@ -208,7 +212,9 @@ func TestMissionControlStoreFlushing(t *testing.T) { store.stop() // Recreate store. - store, err := newMissionControlStore(db, testMaxRecords, flushInterval) + store, err := newMissionControlStore( + newNamespacedDB(db), testMaxRecords, flushInterval, + ) require.NoError(t, err) store.run() defer store.stop() From 9d7514fdcd05fb352aa243b7356b6f5120ec7557 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 13 Aug 2024 12:57:46 +0200 Subject: [PATCH 06/11] channeldb/migration33: migrate MC store pairs to default namespace In this commit, the mission control store is migrated such that all existing pairs which are currently stored directly in the top level results bucket are now instead moved to a "default" namespace bucket. Note that this migration is not yet invoked in this commit. The migration will be invoked in the same commit that starts writing and reading the new format. --- channeldb/migration33/log.go | 14 +++++ channeldb/migration33/migration.go | 69 +++++++++++++++++++++++++ channeldb/migration33/migration_test.go | 41 +++++++++++++++ 3 files changed, 124 insertions(+) create mode 100644 channeldb/migration33/log.go create mode 100644 channeldb/migration33/migration.go create mode 100644 channeldb/migration33/migration_test.go diff --git a/channeldb/migration33/log.go b/channeldb/migration33/log.go new file mode 100644 index 0000000000..e9b271f5df --- /dev/null +++ b/channeldb/migration33/log.go @@ -0,0 +1,14 @@ +package migration33 + +import ( + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized as disabled. This means the package will +// not perform any logging by default until a logger is set. +var log = btclog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/channeldb/migration33/migration.go b/channeldb/migration33/migration.go new file mode 100644 index 0000000000..6e2069a373 --- /dev/null +++ b/channeldb/migration33/migration.go @@ -0,0 +1,69 @@ +package migration33 + +import ( + "bytes" + + "github.com/lightningnetwork/lnd/kvdb" +) + +var ( + // resultsKey is the fixed key under which the attempt results are + // stored. + resultsKey = []byte("missioncontrol-results") + + // defaultMCNamespaceKey is the key of the default mission control store + // namespace. + defaultMCNamespaceKey = []byte("default") +) + +// MigrateMCStoreNameSpacedResults reads in all the current mission control +// entries and re-writes them under a new default namespace. +func MigrateMCStoreNameSpacedResults(tx kvdb.RwTx) error { + log.Infof("Migrating Mission Control store to use namespaced results") + + // Get the top level bucket. All the MC results are currently stored + // as KV pairs in this bucket + topLevelBucket := tx.ReadWriteBucket(resultsKey) + + // If the results bucket does not exist then there are no entries in + // the mission control store yet and so there is nothing to migrate. + if topLevelBucket == nil { + return nil + } + + // Create a new default namespace bucket under the top-level bucket. + defaultNSBkt, err := topLevelBucket.CreateBucket(defaultMCNamespaceKey) + if err != nil { + return err + } + + // Iterate through each of the existing result pairs, write them to the + // new namespaced bucket. Also collect the set of keys so that we can + // later delete them from the top level bucket. + var keys [][]byte + err = topLevelBucket.ForEach(func(k, v []byte) error { + // Skip the new default namespace key. + if bytes.Equal(k, defaultMCNamespaceKey) { + return nil + } + + // Collect the key. + keys = append(keys, k) + + // Write the pair under the default namespace. + return defaultNSBkt.Put(k, v) + }) + if err != nil { + return err + } + + // Finally, iterate through the set of keys and delete them from the + // top level bucket. + for _, k := range keys { + if err := topLevelBucket.Delete(k); err != nil { + return err + } + } + + return err +} diff --git a/channeldb/migration33/migration_test.go b/channeldb/migration33/migration_test.go new file mode 100644 index 0000000000..851e2467e3 --- /dev/null +++ b/channeldb/migration33/migration_test.go @@ -0,0 +1,41 @@ +package migration33 + +import ( + "testing" + + "github.com/lightningnetwork/lnd/channeldb/migtest" + "github.com/lightningnetwork/lnd/kvdb" +) + +var ( + // before represents the structure of the MC store before the migration. + before = map[string]interface{}{ + "key1": "result1", + "key2": "result2", + "key3": "result3", + "key4": "result4", + } + + // after represents the expected structure of the store after the + // migration. It should be identical to before except all the kv pairs + // are now under a new default namespace key. + after = map[string]interface{}{ + string(defaultMCNamespaceKey): before, + } +) + +// TestMigrateMCStoreNameSpacedResults tests that the MC store results are +// correctly moved to be under a new default namespace bucket. +func TestMigrateMCStoreNameSpacedResults(t *testing.T) { + before := func(tx kvdb.RwTx) error { + return migtest.RestoreDB(tx, resultsKey, before) + } + + after := func(tx kvdb.RwTx) error { + return migtest.VerifyDB(tx, resultsKey, after) + } + + migtest.ApplyMigration( + t, before, after, MigrateMCStoreNameSpacedResults, false, + ) +} From 169464656d266ae0ca1d71086a12f5bf9421d69f Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 13 Aug 2024 18:24:46 +0200 Subject: [PATCH 07/11] routing: start writing and reading from namespaced MC and invoke the associated mission control migration. --- channeldb/db.go | 5 +++ channeldb/log.go | 2 ++ routing/missioncontrol.go | 53 ++++++++++++++++++++++------ routing/missioncontrol_store_test.go | 6 ++-- 4 files changed, 53 insertions(+), 13 deletions(-) diff --git a/channeldb/db.go b/channeldb/db.go index d8ba05f3d2..92e0498ece 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -27,6 +27,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration30" "github.com/lightningnetwork/lnd/channeldb/migration31" "github.com/lightningnetwork/lnd/channeldb/migration32" + "github.com/lightningnetwork/lnd/channeldb/migration33" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/invoices" @@ -291,6 +292,10 @@ var ( number: 32, migration: migration32.MigrateMCRouteSerialisation, }, + { + number: 33, + migration: migration33.MigrateMCStoreNameSpacedResults, + }, } // optionalVersions stores all optional migrations that are applied diff --git a/channeldb/log.go b/channeldb/log.go index e50e5054ef..10b1b54d3c 100644 --- a/channeldb/log.go +++ b/channeldb/log.go @@ -11,6 +11,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration30" "github.com/lightningnetwork/lnd/channeldb/migration31" "github.com/lightningnetwork/lnd/channeldb/migration32" + "github.com/lightningnetwork/lnd/channeldb/migration33" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/kvdb" ) @@ -44,5 +45,6 @@ func UseLogger(logger btclog.Logger) { migration30.UseLogger(logger) migration31.UseLogger(logger) migration32.UseLogger(logger) + migration33.UseLogger(logger) kvdb.UseLogger(logger) } diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 919e1e231c..63caaf2009 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -68,6 +68,11 @@ const ( // FeeEstimationTimeout. It defines the maximum duration that the // probing fee estimation is allowed to take. DefaultFeeEstimationTimeout = time.Minute + + // DefaultMissionControlNamespace is the name of the default mission + // control name space. This is used as the sub-bucket key within the + // top level DB bucket to store mission control results. + DefaultMissionControlNamespace = "default" ) var ( @@ -218,7 +223,7 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, } store, err := newMissionControlStore( - newNamespacedDB(db), cfg.MaxMcHistory, + newDefaultNamespacedStore(db), cfg.MaxMcHistory, cfg.McFlushInterval, ) if err != nil { @@ -532,22 +537,30 @@ func (m *MissionControl) applyPaymentResult( } // namespacedDB is an implementation of the missionControlDB that gives a user -// of the interface access to the top level mission control bucket. In a -// follow-up commit (accompanied by a migration), this will change to giving -// the user of the interface access to a namespaced sub-bucket instead. +// of the interface access to a namespaced bucket within the top level mission +// control bucket. type namespacedDB struct { topLevelBucketKey []byte + namespace []byte db kvdb.Backend } // A compile-time check to ensure that namespacedDB implements missionControlDB. var _ missionControlDB = (*namespacedDB)(nil) +// newDefaultNamespacedStore creates an instance of namespaceDB that uses the +// default namespace. +func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB { + return newNamespacedDB(db, DefaultMissionControlNamespace) +} + // newNamespacedDB creates a new instance of missionControlDB where the DB will -// have access to the top level bucket. -func newNamespacedDB(db kvdb.Backend) missionControlDB { +// have access to a namespaced bucket within the top level mission control +// bucket. +func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB { return &namespacedDB{ db: db, + namespace: []byte(namespace), topLevelBucketKey: resultsKey, } } @@ -565,7 +578,16 @@ func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error, "control bucket: %w", err) } - return f(mcStoreBkt) + namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists( + n.namespace, + ) + if err != nil { + return fmt.Errorf("cannot create namespaced bucket "+ + "(%s) in mission control store: %w", + n.namespace, err) + } + + return f(namespacedBkt) }, reset) } @@ -582,7 +604,13 @@ func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error, "not found") } - return f(mcStoreBkt) + namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace) + if namespacedBkt == nil { + return fmt.Errorf("namespaced bucket (%s) not found "+ + "in mission control store", n.namespace) + } + + return f(namespacedBkt) }, reset) } @@ -591,12 +619,17 @@ func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error, // NOTE: this is part of the missionControlDB interface. func (n *namespacedDB) clear() error { return n.db.Update(func(tx kvdb.RwTx) error { - err := tx.DeleteTopLevelBucket(n.topLevelBucketKey) + mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey) + if mcStoreBkt == nil { + return nil + } + + err := mcStoreBkt.DeleteNestedBucket(n.namespace) if err != nil { return err } - _, err = tx.CreateTopLevelBucket(n.topLevelBucketKey) + _, err = mcStoreBkt.CreateBucket(n.namespace) return err }, func() {}) diff --git a/routing/missioncontrol_store_test.go b/routing/missioncontrol_store_test.go index 28196d50e1..bb69db92f5 100644 --- a/routing/missioncontrol_store_test.go +++ b/routing/missioncontrol_store_test.go @@ -58,7 +58,7 @@ func newMCStoreTestHarness(t testing.TB, maxRecords int, }) store, err := newMissionControlStore( - newNamespacedDB(db), maxRecords, flushInterval, + newDefaultNamespacedStore(db), maxRecords, flushInterval, ) require.NoError(t, err) @@ -113,7 +113,7 @@ func TestMissionControlStore(t *testing.T) { // Recreate store to test pruning. store, err = newMissionControlStore( - newNamespacedDB(db), testMaxRecords, time.Second, + newDefaultNamespacedStore(db), testMaxRecords, time.Second, ) require.NoError(t, err) @@ -213,7 +213,7 @@ func TestMissionControlStoreFlushing(t *testing.T) { // Recreate store. store, err := newMissionControlStore( - newNamespacedDB(db), testMaxRecords, flushInterval, + newDefaultNamespacedStore(db), testMaxRecords, flushInterval, ) require.NoError(t, err) store.run() From 748014c93004df9460a0042345f77cb24a8324c5 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 13 Aug 2024 18:26:54 +0200 Subject: [PATCH 08/11] routing: unexport MissionControl mutex Only the MissionControl instance should use this variable and it should not be accessible to users of MissionControl. --- routing/missioncontrol.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 63caaf2009..fcfe565ad5 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -116,7 +116,7 @@ type MissionControl struct { // results that mission control collects. estimator Estimator - sync.Mutex + mu sync.Mutex // TODO(roasbeef): further counters, if vertex continually unavailable, // add to another generation @@ -283,8 +283,8 @@ func (m *MissionControl) init() error { // with. All fields are copied by value, so we do not need to worry about // mutation. func (m *MissionControl) GetConfig() *MissionControlConfig { - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() return &MissionControlConfig{ Estimator: m.estimator, @@ -305,8 +305,8 @@ func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error { return err } - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() log.Infof("Active mission control cfg: %v, estimator: %v", cfg, cfg.Estimator) @@ -321,8 +321,8 @@ func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error { // ResetHistory resets the history of MissionControl returning it to a state as // if no payment attempts have been made. func (m *MissionControl) ResetHistory() error { - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() if err := m.store.clear(); err != nil { return err @@ -340,8 +340,8 @@ func (m *MissionControl) ResetHistory() error { func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex, amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 { - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() now := m.now() results, _ := m.state.getLastPairResult(fromNode) @@ -359,8 +359,8 @@ func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex, // GetHistorySnapshot takes a snapshot from the current mission control state // and actual probability estimates. func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot { - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() log.Debugf("Requesting history snapshot from mission control") @@ -377,8 +377,8 @@ func (m *MissionControl) ImportHistory(history *MissionControlSnapshot, return errors.New("cannot import nil history") } - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() log.Infof("Importing history snapshot with %v pairs to mission control", len(history.Pairs)) @@ -394,8 +394,8 @@ func (m *MissionControl) ImportHistory(history *MissionControlSnapshot, func (m *MissionControl) GetPairHistorySnapshot( fromNode, toNode route.Vertex) TimedPairResult { - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() results, ok := m.state.getLastPairResult(fromNode) if !ok { @@ -461,8 +461,8 @@ func (m *MissionControl) processPaymentResult(result *paymentResult) ( // Store complete result in database. m.store.AddResult(result) - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() // Apply result to update mission control state. reason := m.applyPaymentResult(result) From 517fb4d85960f47b984e62fcc1a3c92ab46e422a Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 13 Aug 2024 18:52:16 +0200 Subject: [PATCH 09/11] routing: separate MissionControl from MissionControlManager This commit renames the previous MissionControl to MissionControlManager since it will soon back multiple namespaced MissionControl instances. For now, it just houses a single MissionControl in the default namespace. --- routing/integrated_routing_context_test.go | 8 +-- routing/missioncontrol.go | 83 ++++++++++++++++------ routing/missioncontrol_state.go | 2 +- routing/missioncontrol_test.go | 6 +- routing/mock_test.go | 2 +- routing/payment_session.go | 4 +- routing/payment_session_source.go | 2 +- routing/router_test.go | 11 +-- rpcserver.go | 5 +- server.go | 8 +-- 10 files changed, 89 insertions(+), 42 deletions(-) diff --git a/routing/integrated_routing_context_test.go b/routing/integrated_routing_context_test.go index ee6fed295a..cbf548111b 100644 --- a/routing/integrated_routing_context_test.go +++ b/routing/integrated_routing_context_test.go @@ -159,10 +159,10 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32, // Instantiate a new mission control with the current configuration // values. - mc, err := NewMissionControl(db, c.source.pubkey, &c.mcCfg) - if err != nil { - c.t.Fatal(err) - } + mcMgr, err := NewMissionControlMgr(db, c.source.pubkey, &c.mcCfg) + require.NoError(c.t, err) + + mc := mcMgr.GetDefaultStore() getBandwidthHints := func(_ Graph) (bandwidthHints, error) { // Create bandwidth hints based on local channel balances. diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index fcfe565ad5..bbcf6402ce 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -89,6 +89,17 @@ var ( // NodeResults contains previous results from a node to its peers. type NodeResults map[route.Vertex]TimedPairResult +// mcConfig holds various config members that will be required by all +// MissionControl instances and will be the same regardless of namespace. +type mcConfig struct { + // now is expected to return the current time. It is supplied as an + // external function to enable deterministic unit tests. + now func() time.Time + + // selfNode is our pubkey. + selfNode route.Vertex +} + // MissionControl contains state which summarizes the past attempts of HTLC // routing by external callers when sending payments throughout the network. It // acts as a shared memory during routing attempts with the goal to optimize the @@ -99,17 +110,12 @@ type NodeResults map[route.Vertex]TimedPairResult // since the last failure is used to estimate a success probability that is fed // into the path finding process for subsequent payment attempts. type MissionControl struct { + *mcConfig + // state is the internal mission control state that is input for // probability estimation. state *missionControlState - // now is expected to return the current time. It is supplied as an - // external function to enable deterministic unit tests. - now func() time.Time - - // selfNode is our pubkey. - selfNode route.Vertex - store *missionControlStore // estimator is the probability estimator that is used with the payment @@ -117,6 +123,16 @@ type MissionControl struct { estimator Estimator mu sync.Mutex +} + +// MissionControlManager manages MissionControl instances in various namespaces. +// +// NOTE: currently it only has a MissionControl in the default namespace. +type MissionControlManager struct { + *mcConfig + + mc *MissionControl + mu sync.Mutex // TODO(roasbeef): further counters, if vertex continually unavailable, // add to another generation @@ -124,6 +140,14 @@ type MissionControl struct { // TODO(roasbeef): also add favorable metrics for nodes } +// GetDefaultStore returns the MissionControl in the default namespace. +func (m *MissionControlManager) GetDefaultStore() *MissionControl { + m.mu.Lock() + defer m.mu.Unlock() + + return m.mc +} + // MissionControlConfig defines parameters that control mission control // behaviour. type MissionControlConfig struct { @@ -211,9 +235,9 @@ type paymentResult struct { failure lnwire.FailureMessage } -// NewMissionControl returns a new instance of missionControl. -func NewMissionControl(db kvdb.Backend, self route.Vertex, - cfg *MissionControlConfig) (*MissionControl, error) { +// NewMissionControlMgr returns a new instance of MissionControlManager. +func NewMissionControlMgr(db kvdb.Backend, self route.Vertex, + cfg *MissionControlConfig) (*MissionControlManager, error) { log.Debugf("Instantiating mission control with config: %v, %v", cfg, cfg.Estimator) @@ -230,14 +254,24 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, return nil, err } - mc := &MissionControl{ + mcCfg := &mcConfig{ + now: time.Now, + selfNode: self, + } + + // Create a mission control in the default namespace. + defaultMC := &MissionControl{ + mcConfig: mcCfg, state: newMissionControlState(cfg.MinFailureRelaxInterval), - now: time.Now, - selfNode: self, store: store, estimator: cfg.Estimator, } + mc := &MissionControlManager{ + mcConfig: mcCfg, + mc: defaultMC, + } + if err := mc.init(); err != nil { return nil, err } @@ -246,31 +280,40 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, } // RunStoreTicker runs the mission control store's ticker. -func (m *MissionControl) RunStoreTicker() { - m.store.run() +func (m *MissionControlManager) RunStoreTicker() { + m.mu.Lock() + defer m.mu.Unlock() + + m.mc.store.run() } // StopStoreTicker stops the mission control store's ticker. -func (m *MissionControl) StopStoreTicker() { +func (m *MissionControlManager) StopStoreTicker() { log.Debug("Stopping mission control store ticker") defer log.Debug("Mission control store ticker stopped") - m.store.stop() + m.mu.Lock() + defer m.mu.Unlock() + + m.mc.store.stop() } // init initializes mission control with historical data. -func (m *MissionControl) init() error { +func (m *MissionControlManager) init() error { log.Debugf("Mission control state reconstruction started") + m.mu.Lock() + defer m.mu.Unlock() + start := time.Now() - results, err := m.store.fetchAll() + results, err := m.mc.store.fetchAll() if err != nil { return err } for _, result := range results { - m.applyPaymentResult(result) + _ = m.mc.applyPaymentResult(result) } log.Debugf("Mission control state reconstruction finished: "+ diff --git a/routing/missioncontrol_state.go b/routing/missioncontrol_state.go index 7d6633f2bd..416f6c464b 100644 --- a/routing/missioncontrol_state.go +++ b/routing/missioncontrol_state.go @@ -45,7 +45,7 @@ func (m *missionControlState) getLastPairResult(node route.Vertex) (NodeResults, return result, ok } -// ResetHistory resets the history of MissionControl returning it to a state as +// ResetHistory resets the history of MissionControlManager returning it to a state as // if no payment attempts have been made. func (m *missionControlState) resetHistory() { m.lastPairResult = make(map[route.Vertex]NodeResults) diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 27391d53e2..2286702b7c 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -85,7 +85,7 @@ func createMcTestContext(t *testing.T) *mcTestContext { // restartMc creates a new instances of mission control on the same database. func (ctx *mcTestContext) restartMc() { // Since we don't run a timer to store results in unit tests, we store - // them here before fetching back everything in NewMissionControl. + // them here before fetching back everything in NewMissionControlMgr. if ctx.mc != nil { require.NoError(ctx.t, ctx.mc.store.storeResults()) } @@ -99,7 +99,7 @@ func (ctx *mcTestContext) restartMc() { estimator, err := NewAprioriEstimator(aCfg) require.NoError(ctx.t, err) - mc, err := NewMissionControl( + mc, err := NewMissionControlMgr( ctx.db, mcTestSelf, &MissionControlConfig{Estimator: estimator}, ) @@ -108,7 +108,7 @@ func (ctx *mcTestContext) restartMc() { } mc.now = func() time.Time { return ctx.now } - ctx.mc = mc + ctx.mc = mc.GetDefaultStore() } // Assert that mission control returns a probability for an edge. diff --git a/routing/mock_test.go b/routing/mock_test.go index 306c182107..4f7379a60c 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -122,7 +122,7 @@ func (m *mockPaymentSessionSourceOld) NewPaymentSessionEmpty() PaymentSession { } type mockMissionControlOld struct { - MissionControl + MissionControlManager } var _ MissionController = (*mockMissionControlOld)(nil) diff --git a/routing/payment_session.go b/routing/payment_session.go index 00b4ab70ed..99f60a7f06 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -157,7 +157,7 @@ type PaymentSession interface { // paymentSession is used during an HTLC routings session to prune the local // chain view in response to failures, and also report those failures back to -// MissionControl. The snapshot copied for this session will only ever grow, +// MissionControlManager. The snapshot copied for this session will only ever grow, // and will now be pruned after a decay like the main view within mission // control. We do this as we want to avoid the case where we continually try a // bad edge or route multiple times in a session. This can lead to an infinite @@ -263,7 +263,7 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi, // Taking into account this prune view, we'll attempt to locate a path // to our destination, respecting the recommendations from - // MissionControl. + // MissionControlManager. restrictions := &RestrictParams{ ProbabilitySource: p.missionControl.GetProbability, FeeLimit: feeLimit, diff --git a/routing/payment_session_source.go b/routing/payment_session_source.go index 46e7a42aa1..16db2a02cd 100644 --- a/routing/payment_session_source.go +++ b/routing/payment_session_source.go @@ -9,7 +9,7 @@ import ( "github.com/lightningnetwork/lnd/zpay32" ) -// A compile time assertion to ensure MissionControl meets the +// A compile time assertion to ensure MissionControlManager meets the // PaymentSessionSource interface. var _ PaymentSessionSource = (*SessionSource)(nil) diff --git a/routing/router_test.go b/routing/router_test.go index 411a6c81cd..88004451f3 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -128,10 +128,11 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, mcConfig := &MissionControlConfig{Estimator: estimator} - mc, err := NewMissionControl( + mcMgr, err := NewMissionControlMgr( graphInstance.graphBackend, route.Vertex{}, mcConfig, ) require.NoError(t, err, "failed to create missioncontrol") + mc := mcMgr.GetDefaultStore() sourceNode, err := graphInstance.graph.SourceNode() require.NoError(t, err) @@ -1078,11 +1079,12 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { return preImage, nil }) - ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() + err := ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() + require.NoError(t, err) // When we try to dispatch that payment, we should receive an error as // both attempts should fail and cause both routes to be pruned. - _, _, err := ctx.router.SendPayment(payment) + _, _, err = ctx.router.SendPayment(payment) require.Error(t, err, "payment didn't return error") // The final error returned should also indicate that the peer wasn't @@ -1141,7 +1143,8 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { getAliasFromPubKey(rt.Hops[0].PubKeyBytes, ctx.aliases), ) - ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() + err = ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() + require.NoError(t, err) // Finally, we'll modify the SendToSwitch function to indicate that the // roasbeef -> luoji channel has insufficient capacity. This should diff --git a/rpcserver.go b/rpcserver.go index 7fef873266..9fba1fb60a 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -716,7 +716,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, return info.NodeKey1Bytes, info.NodeKey2Bytes, nil }, FindRoute: s.chanRouter.FindRoute, - MissionControl: s.missionControl, + MissionControl: s.missionControl.GetDefaultStore(), ActiveNetParams: r.cfg.ActiveNetParams.Params, Tower: s.controlTower, MaxTotalTimelock: r.cfg.MaxOutgoingCltvExpiry, @@ -5868,7 +5868,8 @@ func (r *rpcServer) AddInvoice(ctx context.Context, return r.server.chanRouter.FindBlindedPaths( r.selfNode, amt, - r.server.missionControl.GetProbability, + r.server.missionControl.GetDefaultStore(). + GetProbability, blindingRestrictions, ) }, diff --git a/server.go b/server.go index 555ee26274..1a623c0f6b 100644 --- a/server.go +++ b/server.go @@ -271,7 +271,7 @@ type server struct { breachArbitrator *contractcourt.BreachArbitrator - missionControl *routing.MissionControl + missionControl *routing.MissionControlManager graphBuilder *graph.Builder @@ -933,7 +933,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, McFlushInterval: routingConfig.McFlushInterval, MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval, } - s.missionControl, err = routing.NewMissionControl( + s.missionControl, err = routing.NewMissionControlMgr( dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg, ) if err != nil { @@ -963,7 +963,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanGraph, ), SourceNode: sourceNode, - MissionControl: s.missionControl, + MissionControl: s.missionControl.GetDefaultStore(), GetLink: s.htlcSwitch.GetLinkByShortID, PathFindingConfig: pathFindingConfig, } @@ -998,7 +998,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, Chain: cc.ChainIO, Payer: s.htlcSwitch, Control: s.controlTower, - MissionControl: s.missionControl, + MissionControl: s.missionControl.GetDefaultStore(), SessionSource: paymentSessionSource, GetLink: s.htlcSwitch.GetLinkByShortID, NextPaymentID: sequencer.NextID, From 25503253919b36008224a6d3f87205feb0c32f7c Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 13 Aug 2024 19:01:42 +0200 Subject: [PATCH 10/11] routing: prefix MC logs with namespace --- routing/missioncontrol.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index bbcf6402ce..c528783d71 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -7,7 +7,9 @@ import ( "time" "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btclog" "github.com/btcsuite/btcwallet/walletdb" + "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" @@ -122,6 +124,8 @@ type MissionControl struct { // results that mission control collects. estimator Estimator + log btclog.Logger + mu sync.Mutex } @@ -265,6 +269,10 @@ func NewMissionControlMgr(db kvdb.Backend, self route.Vertex, state: newMissionControlState(cfg.MinFailureRelaxInterval), store: store, estimator: cfg.Estimator, + log: build.NewPrefixLog( + fmt.Sprintf("[%s]:", DefaultMissionControlNamespace), + log, + ), } mc := &MissionControlManager{ @@ -351,7 +359,7 @@ func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error { m.mu.Lock() defer m.mu.Unlock() - log.Infof("Active mission control cfg: %v, estimator: %v", cfg, + m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg, cfg.Estimator) m.store.maxRecords = cfg.MaxMcHistory @@ -373,7 +381,7 @@ func (m *MissionControl) ResetHistory() error { m.state.resetHistory() - log.Debugf("Mission control history cleared") + m.log.Debugf("Mission control history cleared") return nil } @@ -405,7 +413,7 @@ func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot { m.mu.Lock() defer m.mu.Unlock() - log.Debugf("Requesting history snapshot from mission control") + m.log.Debugf("Requesting history snapshot from mission control") return m.state.getSnapshot() } @@ -423,12 +431,12 @@ func (m *MissionControl) ImportHistory(history *MissionControlSnapshot, m.mu.Lock() defer m.mu.Unlock() - log.Infof("Importing history snapshot with %v pairs to mission control", + m.log.Infof("Importing history snapshot with %v pairs to mission control", len(history.Pairs)) imported := m.state.importSnapshot(history, force) - log.Infof("Imported %v results to mission control", imported) + m.log.Infof("Imported %v results to mission control", imported) return nil } @@ -552,7 +560,7 @@ func (m *MissionControl) applyPaymentResult( // that case, a node-level failure would not be applied to untried // channels. if i.nodeFailure != nil { - log.Debugf("Reporting node failure to Mission Control: "+ + m.log.Debugf("Reporting node failure to Mission Control: "+ "node=%v", *i.nodeFailure) m.state.setAllFail(*i.nodeFailure, result.timeReply) @@ -562,11 +570,11 @@ func (m *MissionControl) applyPaymentResult( pairResult := pairResult if pairResult.success { - log.Debugf("Reporting pair success to Mission "+ + m.log.Debugf("Reporting pair success to Mission "+ "Control: pair=%v, amt=%v", pair, pairResult.amt) } else { - log.Debugf("Reporting pair failure to Mission "+ + m.log.Debugf("Reporting pair failure to Mission "+ "Control: pair=%v, amt=%v", pair, pairResult.amt) } From 66ee6a341d1522344478afd7db32f8c6dc717dfb Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 13 Aug 2024 19:19:06 +0200 Subject: [PATCH 11/11] routing: support multiple namespaced MissionControls --- routing/integrated_routing_context_test.go | 3 +- routing/missioncontrol.go | 133 ++++++++++++++++----- routing/missioncontrol_test.go | 3 +- routing/router_test.go | 4 +- rpcserver.go | 5 +- server.go | 25 ++-- 6 files changed, 127 insertions(+), 46 deletions(-) diff --git a/routing/integrated_routing_context_test.go b/routing/integrated_routing_context_test.go index cbf548111b..46b4971a5f 100644 --- a/routing/integrated_routing_context_test.go +++ b/routing/integrated_routing_context_test.go @@ -162,7 +162,8 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32, mcMgr, err := NewMissionControlMgr(db, c.source.pubkey, &c.mcCfg) require.NoError(c.t, err) - mc := mcMgr.GetDefaultStore() + mc, err := mcMgr.GetNamespacedStore(DefaultMissionControlNamespace) + require.NoError(c.t, err) getBandwidthHints := func(_ Graph) (bandwidthHints, error) { // Create bandwidth hints based on local channel balances. diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index c528783d71..02e2cfe480 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -133,9 +133,13 @@ type MissionControl struct { // // NOTE: currently it only has a MissionControl in the default namespace. type MissionControlManager struct { + db kvdb.Backend + + defaultMCCfg *MissionControlConfig + *mcConfig - mc *MissionControl + mc map[string]*MissionControl mu sync.Mutex // TODO(roasbeef): further counters, if vertex continually unavailable, @@ -144,12 +148,19 @@ type MissionControlManager struct { // TODO(roasbeef): also add favorable metrics for nodes } -// GetDefaultStore returns the MissionControl in the default namespace. -func (m *MissionControlManager) GetDefaultStore() *MissionControl { +// GetNamespacedStore returns the MissionControl in the given namespace. If one +// does not yet exist, then it is initialised. +func (m *MissionControlManager) GetNamespacedStore(ns string) (*MissionControl, + error) { + m.mu.Lock() defer m.mu.Unlock() - return m.mc + if mc, ok := m.mc[ns]; ok { + return mc, nil + } + + return m.initMissionControl(ns) } // MissionControlConfig defines parameters that control mission control @@ -250,64 +261,122 @@ func NewMissionControlMgr(db kvdb.Backend, self route.Vertex, return nil, err } + mcCfg := &mcConfig{ + now: time.Now, + selfNode: self, + } + + mgr := &MissionControlManager{ + db: db, + defaultMCCfg: cfg, + mcConfig: mcCfg, + mc: make(map[string]*MissionControl), + } + + if err := mgr.loadMissionControls(); err != nil { + return nil, err + } + + for _, mc := range mgr.mc { + if err := mc.init(); err != nil { + return nil, err + } + } + + return mgr, nil +} + +// loadMissionControls initialises a MissionControl in the default namespace if +// one does not yet exist. It then initialises a MissionControl for all other +// namespaces found in the DB. +func (m *MissionControlManager) loadMissionControls() error { + // Always initialise the default namespace. + _, err := m.initMissionControl(DefaultMissionControlNamespace) + if err != nil { + return err + } + + return m.db.View(func(tx walletdb.ReadTx) error { + mcStoreBkt := tx.ReadBucket(resultsKey) + if mcStoreBkt == nil { + return fmt.Errorf("top level mission control bucket " + + "not found") + } + + // Iterate through all the keys in bucket and initialise an + // in-memory mission control for each namespace. + return mcStoreBkt.ForEach(func(k, _ []byte) error { + _, err := m.initMissionControl(string(k)) + + return err + }) + }, func() {}) +} + +// initMissionControl creates a new MissionControl instance with the given +// namespace if one does not yet exist. +func (m *MissionControlManager) initMissionControl(namespace string) ( + *MissionControl, error) { + + m.mu.Lock() + defer m.mu.Unlock() + + // If a mission control with this namespace has already been initialised + // then there is nothing left to do. + if mc, ok := m.mc[namespace]; ok { + return mc, nil + } + + cfg := m.defaultMCCfg + store, err := newMissionControlStore( - newDefaultNamespacedStore(db), cfg.MaxMcHistory, + newNamespacedDB(m.db, namespace), cfg.MaxMcHistory, cfg.McFlushInterval, ) if err != nil { return nil, err } - mcCfg := &mcConfig{ - now: time.Now, - selfNode: self, - } - - // Create a mission control in the default namespace. - defaultMC := &MissionControl{ - mcConfig: mcCfg, + mc := &MissionControl{ + mcConfig: m.mcConfig, state: newMissionControlState(cfg.MinFailureRelaxInterval), store: store, estimator: cfg.Estimator, log: build.NewPrefixLog( - fmt.Sprintf("[%s]:", DefaultMissionControlNamespace), - log, + fmt.Sprintf("[%s]:", namespace), log, ), } - mc := &MissionControlManager{ - mcConfig: mcCfg, - mc: defaultMC, - } - - if err := mc.init(); err != nil { - return nil, err - } + m.mc[namespace] = mc return mc, nil } -// RunStoreTicker runs the mission control store's ticker. -func (m *MissionControlManager) RunStoreTicker() { +// RunStoreTickers runs the mission control store's tickers. +func (m *MissionControlManager) RunStoreTickers() { m.mu.Lock() defer m.mu.Unlock() - m.mc.store.run() + for _, mc := range m.mc { + mc.store.run() + } } -// StopStoreTicker stops the mission control store's ticker. -func (m *MissionControlManager) StopStoreTicker() { +// StopStoreTickers stops the mission control store's tickers. +func (m *MissionControlManager) StopStoreTickers() { log.Debug("Stopping mission control store ticker") defer log.Debug("Mission control store ticker stopped") m.mu.Lock() defer m.mu.Unlock() - m.mc.store.stop() + for _, mc := range m.mc { + mc.store.stop() + } } // init initializes mission control with historical data. -func (m *MissionControlManager) init() error { +func (m *MissionControl) init() error { log.Debugf("Mission control state reconstruction started") m.mu.Lock() @@ -315,13 +384,13 @@ func (m *MissionControlManager) init() error { start := time.Now() - results, err := m.mc.store.fetchAll() + results, err := m.store.fetchAll() if err != nil { return err } for _, result := range results { - _ = m.mc.applyPaymentResult(result) + _ = m.applyPaymentResult(result) } log.Debugf("Mission control state reconstruction finished: "+ diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 2286702b7c..30dabb6283 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -108,7 +108,8 @@ func (ctx *mcTestContext) restartMc() { } mc.now = func() time.Time { return ctx.now } - ctx.mc = mc.GetDefaultStore() + ctx.mc, err = mc.GetNamespacedStore(DefaultMissionControlNamespace) + require.NoError(ctx.t, err) } // Assert that mission control returns a probability for an edge. diff --git a/routing/router_test.go b/routing/router_test.go index 88004451f3..251833b520 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -132,7 +132,9 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, graphInstance.graphBackend, route.Vertex{}, mcConfig, ) require.NoError(t, err, "failed to create missioncontrol") - mc := mcMgr.GetDefaultStore() + + mc, err := mcMgr.GetNamespacedStore(DefaultMissionControlNamespace) + require.NoError(t, err) sourceNode, err := graphInstance.graph.SourceNode() require.NoError(t, err) diff --git a/rpcserver.go b/rpcserver.go index 9fba1fb60a..f814472768 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -716,7 +716,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, return info.NodeKey1Bytes, info.NodeKey2Bytes, nil }, FindRoute: s.chanRouter.FindRoute, - MissionControl: s.missionControl.GetDefaultStore(), + MissionControl: s.defaultMC, ActiveNetParams: r.cfg.ActiveNetParams.Params, Tower: s.controlTower, MaxTotalTimelock: r.cfg.MaxOutgoingCltvExpiry, @@ -5868,8 +5868,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context, return r.server.chanRouter.FindBlindedPaths( r.selfNode, amt, - r.server.missionControl.GetDefaultStore(). - GetProbability, + r.server.defaultMC.GetProbability, blindingRestrictions, ) }, diff --git a/server.go b/server.go index 1a623c0f6b..128faf9446 100644 --- a/server.go +++ b/server.go @@ -271,7 +271,8 @@ type server struct { breachArbitrator *contractcourt.BreachArbitrator - missionControl *routing.MissionControlManager + missionControlMgr *routing.MissionControlManager + defaultMC *routing.MissionControl graphBuilder *graph.Builder @@ -933,11 +934,19 @@ func newServer(cfg *Config, listenAddrs []net.Addr, McFlushInterval: routingConfig.McFlushInterval, MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval, } - s.missionControl, err = routing.NewMissionControlMgr( + s.missionControlMgr, err = routing.NewMissionControlMgr( dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg, ) if err != nil { - return nil, fmt.Errorf("can't create mission control: %w", err) + return nil, fmt.Errorf("can't create mission control "+ + "manager: %w", err) + } + s.defaultMC, err = s.missionControlMgr.GetNamespacedStore( + routing.DefaultMissionControlNamespace, + ) + if err != nil { + return nil, fmt.Errorf("can't create mission control in the "+ + "default namespace: %w", err) } srvrLog.Debugf("Instantiating payment session source with config: "+ @@ -963,7 +972,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanGraph, ), SourceNode: sourceNode, - MissionControl: s.missionControl.GetDefaultStore(), + MissionControl: s.defaultMC, GetLink: s.htlcSwitch.GetLinkByShortID, PathFindingConfig: pathFindingConfig, } @@ -998,7 +1007,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, Chain: cc.ChainIO, Payer: s.htlcSwitch, Control: s.controlTower, - MissionControl: s.missionControl.GetDefaultStore(), + MissionControl: s.defaultMC, SessionSource: paymentSessionSource, GetLink: s.htlcSwitch.GetLinkByShortID, NextPaymentID: sequencer.NextID, @@ -2115,10 +2124,10 @@ func (s *server) Start() error { } cleanup.add(func() error { - s.missionControl.StopStoreTicker() + s.missionControlMgr.StopStoreTickers() return nil }) - s.missionControl.RunStoreTicker() + s.missionControlMgr.RunStoreTickers() // Before we start the connMgr, we'll check to see if we have // any backups to recover. We do this now as we want to ensure @@ -2392,7 +2401,7 @@ func (s *server) Stop() error { srvrLog.Warnf("Unable to stop ChannelEventStore: %v", err) } - s.missionControl.StopStoreTicker() + s.missionControlMgr.StopStoreTickers() // Disconnect from each active peers to ensure that // peerTerminationWatchers signal completion to each peer.