Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tem #96

Closed
wants to merge 18 commits into from
Closed

Tem #96

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions channeldb/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ var (
// but it is marked as a zombie within the zombie index.
ErrZombieEdge = errors.New("edge marked as zombie")

// ErrNotZombieEdge is an error returned when we attempt to find an
// edge in the zombie index which is not there.
ErrNotZombieEdge = errors.New("edge not found in zombie index")

// ErrEdgeAlreadyExist is returned when edge with specific
// channel id can't be added because it already exist.
ErrEdgeAlreadyExist = fmt.Errorf("edge already exist")
Expand Down
177 changes: 154 additions & 23 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ func (c *ChannelGraph) getChannelMap(edges kvdb.RBucket) (
var graphTopLevelBuckets = [][]byte{
nodeBucket,
edgeBucket,
edgeIndexBucket,
graphMetaBucket,
}

Expand Down Expand Up @@ -2087,10 +2086,12 @@ func (c *ChannelGraph) NodeUpdatesInHorizon(startTime,
// words, we perform a set difference of our set of chan ID's and the ones
// passed in. This method can be used by callers to determine the set of
// channels another peer knows of that we don't.
func (c *ChannelGraph) FilterKnownChanIDs(chanIDs []uint64) ([]uint64, error) {
func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {

var newChanIDs []uint64

err := kvdb.View(c.db, func(tx kvdb.RTx) error {
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
Expand All @@ -2108,23 +2109,45 @@ func (c *ChannelGraph) FilterKnownChanIDs(chanIDs []uint64) ([]uint64, error) {
// We'll run through the set of chanIDs and collate only the
// set of channel that are unable to be found within our db.
var cidBytes [8]byte
for _, cid := range chanIDs {
byteOrder.PutUint64(cidBytes[:], cid)
for _, info := range chansInfo {
scid := info.ShortChannelID.ToUint64()
byteOrder.PutUint64(cidBytes[:], scid)

// If the edge is already known, skip it.
if v := edgeIndex.Get(cidBytes[:]); v != nil {
continue
}

// If the edge is a known zombie, skip it.
if zombieIndex != nil {
isZombie, _, _ := isZombieEdge(zombieIndex, cid)
if isZombie {
isZombie, _, _ := isZombieEdge(
zombieIndex, scid,
)

isStillZombie := isZombieChan(
info.Node1UpdateTimestamp,
info.Node2UpdateTimestamp,
)

switch {
// If the edge is a known zombie and if we
// would still consider it a zombie given the
// latest update timestamps, then we skip this
// channel.
case isZombie && isStillZombie:
continue

// Otherwise, if we have marked it as a zombie
// but the latest update timestamps could bring
// it back from the dead, then we mark it alive.
case isZombie && !isStillZombie:
err := c.markEdgeLive(tx, scid)
if err != nil {
return err
}
}
}

newChanIDs = append(newChanIDs, cid)
newChanIDs = append(newChanIDs, scid)
}

return nil
Expand All @@ -2135,7 +2158,12 @@ func (c *ChannelGraph) FilterKnownChanIDs(chanIDs []uint64) ([]uint64, error) {
// If we don't know of any edges yet, then we'll return the entire set
// of chan IDs specified.
case err == ErrGraphNoEdgesFound:
return chanIDs, nil
ogChanIDs := make([]uint64, len(chansInfo))
for i, info := range chansInfo {
ogChanIDs[i] = info.ShortChannelID.ToUint64()
}

return ogChanIDs, nil

case err != nil:
return nil, err
Expand All @@ -2144,6 +2172,23 @@ func (c *ChannelGraph) FilterKnownChanIDs(chanIDs []uint64) ([]uint64, error) {
return newChanIDs, nil
}

// ChannelUpdateInfo couples the SCID of a channel with the timestamps of the
// latest received channel updates for the channel.
type ChannelUpdateInfo struct {
// ShortChannelID is the SCID identifier of the channel.
ShortChannelID lnwire.ShortChannelID

// Node1UpdateTimestamp is the timestamp of the latest received update
// from the node 1 channel peer. This will be set to zero time if no
// update has yet been received from this node.
Node1UpdateTimestamp time.Time

// Node2UpdateTimestamp is the timestamp of the latest received update
// from the node 2 channel peer. This will be set to zero time if no
// update has yet been received from this node.
Node2UpdateTimestamp time.Time
}

// BlockChannelRange represents a range of channels for a given block height.
type BlockChannelRange struct {
// Height is the height of the block all of the channels below were
Expand All @@ -2152,17 +2197,20 @@ type BlockChannelRange struct {

// Channels is the list of channels identified by their short ID
// representation known to us that were included in the block height
// above.
Channels []lnwire.ShortChannelID
// above. The list may include channel update timestamp information if
// requested.
Channels []ChannelUpdateInfo
}

// FilterChannelRange returns the channel ID's of all known channels which were
// mined in a block height within the passed range. The channel IDs are grouped
// by their common block height. This method can be used to quickly share with a
// peer the set of channels we know of within a particular range to catch them
// up after a period of time offline.
// up after a period of time offline. If withTimestamps is true then the
// timestamp info of the latest received channel update messages of the channel
// will be included in the response.
func (c *ChannelGraph) FilterChannelRange(startHeight,
endHeight uint32) ([]BlockChannelRange, error) {
endHeight uint32, withTimestamps bool) ([]BlockChannelRange, error) {

startChanID := &lnwire.ShortChannelID{
BlockHeight: startHeight,
Expand All @@ -2181,7 +2229,7 @@ func (c *ChannelGraph) FilterChannelRange(startHeight,
byteOrder.PutUint64(chanIDStart[:], startChanID.ToUint64())
byteOrder.PutUint64(chanIDEnd[:], endChanID.ToUint64())

var channelsPerBlock map[uint32][]lnwire.ShortChannelID
var channelsPerBlock map[uint32][]ChannelUpdateInfo
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
Expand Down Expand Up @@ -2213,14 +2261,60 @@ func (c *ChannelGraph) FilterChannelRange(startHeight,
// we'll add it to our returned set.
rawCid := byteOrder.Uint64(k)
cid := lnwire.NewShortChanIDFromInt(rawCid)

chanInfo := ChannelUpdateInfo{
ShortChannelID: cid,
}

if !withTimestamps {
channelsPerBlock[cid.BlockHeight] = append(
channelsPerBlock[cid.BlockHeight],
chanInfo,
)

continue
}

node1Key, node2Key := computeEdgePolicyKeys(&edgeInfo)

rawPolicy := edges.Get(node1Key)
if len(rawPolicy) != 0 {
r := bytes.NewReader(rawPolicy)

edge, err := deserializeChanEdgePolicyRaw(r)
if err != nil && !errors.Is(
err, ErrEdgePolicyOptionalFieldNotFound,
) {

return err
}

chanInfo.Node1UpdateTimestamp = edge.LastUpdate
}

rawPolicy = edges.Get(node2Key)
if len(rawPolicy) != 0 {
r := bytes.NewReader(rawPolicy)

edge, err := deserializeChanEdgePolicyRaw(r)
if err != nil && !errors.Is(
err, ErrEdgePolicyOptionalFieldNotFound,
) {

return err
}

chanInfo.Node2UpdateTimestamp = edge.LastUpdate
}

channelsPerBlock[cid.BlockHeight] = append(
channelsPerBlock[cid.BlockHeight], cid,
channelsPerBlock[cid.BlockHeight], chanInfo,
)
}

return nil
}, func() {
channelsPerBlock = make(map[uint32][]lnwire.ShortChannelID)
channelsPerBlock = make(map[uint32][]ChannelUpdateInfo)
})

switch {
Expand Down Expand Up @@ -3119,6 +3213,24 @@ func (c *ChannelGraph) FetchOtherNode(tx kvdb.RTx,
return targetNode, err
}

// computeEdgePolicyKeys is a helper function that can be used to compute the
// keys used to index the channel edge policy info for the two nodes of the
// edge. The keys for node 1 and node 2 are returned respectively.
func computeEdgePolicyKeys(info *models.ChannelEdgeInfo) ([]byte, []byte) {
var (
node1Key [33 + 8]byte
node2Key [33 + 8]byte
)

copy(node1Key[:], info.NodeKey1Bytes[:])
copy(node2Key[:], info.NodeKey2Bytes[:])

byteOrder.PutUint64(node1Key[33:], info.ChannelID)
byteOrder.PutUint64(node2Key[33:], info.ChannelID)

return node1Key[:], node2Key[:]
}

// FetchChannelEdgesByOutpoint attempts to lookup the two directed edges for
// the channel identified by the funding outpoint. If the channel can't be
// found, then ErrEdgeNotFound is returned. A struct which houses the general
Expand Down Expand Up @@ -3497,10 +3609,14 @@ func markEdgeZombie(zombieIndex kvdb.RwBucket, chanID uint64, pubKey1,

// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
return c.markEdgeLive(nil, chanID)
}

func (c *ChannelGraph) markEdgeLive(tx kvdb.RwTx, chanID uint64) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
dbFn := func(tx kvdb.RwTx) error {
edges := tx.ReadWriteBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
Expand All @@ -3512,8 +3628,22 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {

var k [8]byte
byteOrder.PutUint64(k[:], chanID)

if len(zombieIndex.Get(k[:])) == 0 {
return ErrNotZombieEdge
}

return zombieIndex.Delete(k[:])
}, func() {})
}

// If the transaction is nil, we'll create a new one. Otherwise, we use
// the existing transaction
var err error
if tx == nil {
err = kvdb.Update(c.db, dbFn, func() {})
} else {
err = dbFn(tx)
}
if err != nil {
return err
}
Expand All @@ -3523,11 +3653,12 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {

// We need to add the channel back into our graph cache, otherwise we
// won't use it for path finding.
edgeInfos, err := c.FetchChanInfos([]uint64{chanID})
if err != nil {
return err
}
if c.graphCache != nil {
edgeInfos, err := c.FetchChanInfos([]uint64{chanID})
if err != nil {
return err
}

for _, edgeInfo := range edgeInfos {
c.graphCache.AddChannel(
edgeInfo.Info, edgeInfo.Policy1,
Expand Down
Loading
Loading