Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add active peer probing and a cached addr book #90

Merged
merged 80 commits into from
Dec 18, 2024
Merged
Changes from 1 commit
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
3896470
feat: add cached peer book with higher ttls
2color Nov 27, 2024
7dd33ca
feat: initial implementation of active peer probing
2color Nov 27, 2024
0e86ea4
feat: use the cached router
2color Nov 27, 2024
ec2a67a
chore: go mod tidy
2color Nov 27, 2024
fe68140
feat: log probe duration
2color Nov 27, 2024
06c2d0c
chore: log in probe loop
2color Nov 27, 2024
fc76783
fix: update peer state if doesn't exist
2color Nov 27, 2024
e904c3e
fix: add addresses to cached address book
2color Nov 27, 2024
814ae58
fix: wrap with cached router only if available
2color Nov 27, 2024
a4d6456
feat: make everything a little bit better
2color Nov 27, 2024
81feca7
chore: small refinements
2color Nov 28, 2024
e75992f
test: add test for cached addr book
2color Nov 28, 2024
a20a4c3
chore: rename files
2color Nov 28, 2024
c5f1d62
feat: add options to cached addr book
2color Nov 28, 2024
e678be8
feat: add instrumentation
2color Nov 28, 2024
a0965bc
fix: thread safety
2color Nov 28, 2024
d82ad0f
docs: update changelog
2color Nov 28, 2024
a84d5f6
fix: small fixes
2color Nov 28, 2024
9ab02e1
fix: simplify cached router
2color Nov 28, 2024
9658af8
feat(metric): cached_router_peer_addr_lookups
lidel Nov 28, 2024
7cdb5be
Apply suggestions from code review
2color Nov 29, 2024
762136e
Update CHANGELOG.md
2color Nov 29, 2024
2cf46d4
chore: use service name for namespace
2color Nov 29, 2024
a0d5c62
fix: type errors and missing imports
2color Nov 29, 2024
75f1bf2
feat: add queue probe
2color Nov 29, 2024
4cbaa91
Revert "feat: add queue probe"
2color Nov 29, 2024
d038301
chore: simplify composite literal
2color Dec 2, 2024
796e94f
fix: implement custom cache fallback iterator
2color Dec 3, 2024
2e4d12c
fix: add cancel and simplify
2color Dec 3, 2024
811dce8
fix: move select to Val function
2color Dec 3, 2024
b4da9cd
fix: concurrency bug from the ongoingLookups
2color Dec 4, 2024
d00fcb4
chore: clean up comments
2color Dec 4, 2024
6219804
fix: add lint ignores
2color Dec 4, 2024
662f0d4
docs: update changelog
2color Dec 4, 2024
c812cf4
fix: increase bucket sizes for probe duration
2color Dec 4, 2024
8646f38
chore: remove unused peer state fields
2color Dec 4, 2024
46a74a3
feat: enable caching for FindPeer in cached router
2color Dec 4, 2024
d9601e4
fix: handle peer not found case
2color Dec 4, 2024
986b010
Apply suggestions from code review
2color Dec 5, 2024
ecd0757
fix: wait longer during cleanup function
2color Dec 5, 2024
a0443d0
test: remove bitswap record test
2color Dec 5, 2024
22aacd7
refactor: extract connectedness checks to a func
2color Dec 5, 2024
fe372ac
fix: set ttl for both signed and unsigned addrs
2color Dec 5, 2024
03a4078
fix: prevent race condition
2color Dec 5, 2024
84393fd
feat: use 2q-lru cache for peer state
2color Dec 5, 2024
d466dc7
chore: remove return count
2color Dec 5, 2024
8078cb5
test: improve reliability of tests
2color Dec 5, 2024
7decf6c
fix: record failed connections
2color Dec 5, 2024
b536e82
feat: add exponential backoff for probes/peer lookups
2color Dec 5, 2024
7182699
fix: return peers with no addrs that wont probe
2color Dec 5, 2024
b0b24e0
fix: brittle test
2color Dec 6, 2024
697457d
feat: add probed peers counter
2color Dec 6, 2024
7fcf45f
fix: adjust probe duration metric buckets
2color Dec 6, 2024
1718215
fix: prevent race conditions
2color Dec 6, 2024
dc57e9f
feat: increase cache size and add max backoff
2color Dec 6, 2024
c5abeec
fix: omit providers whose peer cannot be found
2color Dec 9, 2024
0cc76f9
chore: remove unused function
2color Dec 10, 2024
f0e0bd4
deps: upgrade go-libp2p
2color Dec 10, 2024
2211aae
fix: avoid using the cache in FindPeers
2color Dec 10, 2024
be5958a
fix: do not return cached results for FindPeers
2color Dec 11, 2024
af7c3a8
refactor: small optimisation
2color Dec 11, 2024
62c0d9f
chore: re-add comment
2color Dec 11, 2024
8b36b0c
Apply suggestions from code review
2color Dec 16, 2024
b58b50d
Apply suggestions from code review
2color Dec 16, 2024
41922af
fix: use separate context for dispatched jobs
2color Dec 16, 2024
06cef21
fix: ensure proper cleanup of cache fallback iter
2color Dec 16, 2024
7a2160a
Update main.go
2color Dec 16, 2024
84bc4f7
fix: formatting
2color Dec 16, 2024
0c28c6b
fix: let consumer handle cleanup
2color Dec 16, 2024
e0a601f
fix: remove from address book when removed from peer state
2color Dec 17, 2024
7f0ec50
fix: use normal lru cache instead of 2Q
2color Dec 17, 2024
2e025eb
fix: update the metric when removing from the peer cache
2color Dec 17, 2024
6b4b40d
fix: increase max backoff to 48 hours
2color Dec 17, 2024
fe7ad54
feat: add env var for recently connected ttl
2color Dec 17, 2024
49efe9b
feat: add env var to control active probing
2color Dec 17, 2024
8ca4d19
fix: bug from closing the iterator twice
2color Dec 17, 2024
317ccb7
docs: update comment
2color Dec 17, 2024
327f9cb
docs: improve changelog
2color Dec 17, 2024
48e1943
test: fix background test
2color Dec 17, 2024
c1ac41b
feat(metrics): track online vs offline probe ratio
lidel Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 67 additions & 16 deletions server_cached_router.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main

Check failure on line 1 in server_cached_router.go

View workflow job for this annotation

GitHub Actions / go-check / All

File is not gofmt-ed.

import (
"context"
Expand All @@ -11,9 +11,37 @@
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var _ server.ContentRouter = cachedRouter{}
var (
_ server.ContentRouter = cachedRouter{}

// peerAddrLookups allows us reason if/how effective peer addr cache is
peerAddrLookups = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "peer_addr_lookups",
Subsystem: "cached_router",
Namespace: "someguy",
Help: "Number of peer addr info lookups per origin and cache state",
},
[]string{addrCacheStateLabel, addrQueryOriginLabel},
2color marked this conversation as resolved.
Show resolved Hide resolved
)
)

const (
// cache=unused|hit|miss, indicates how effective cache is
addrCacheStateLabel = "cache"
addrCacheStateUnused = "unused"
addrCacheStateHit = "hit"
addrCacheStateMiss = "miss"

// source=providers|peers indicates if query originated from provider or peer endpoint
addrQueryOriginLabel = "origin"
addrQueryOriginProviders = "providers"
addrQueryOriginPeers = "peers"
addrQueryOriginUnknown = "unknown"
)

// cachedRouter wraps a router with the cachedAddrBook to retrieve cached addresses for peers without multiaddrs in FindProviders
type cachedRouter struct {
Expand All @@ -21,66 +49,89 @@
cachedAddrBook *cachedAddrBook
}

func (r cachedRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) {
it, err := r.router.FindProviders(ctx, key, limit)
if err != nil {
return nil, err
}

return iter.Map(it, func(v iter.Result[types.Record]) iter.Result[types.Record] {
if v.Err != nil || v.Val == nil {
return v
}

switch v.Val.GetSchema() {
case types.SchemaPeer:
result, ok := v.Val.(*types.PeerRecord)
if !ok {
logger.Errorw("problem casting find providers result", "Schema", v.Val.GetSchema(), "Type", reflect.TypeOf(v).String())
return v
}
if len(result.Addrs) == 0 {
result.Addrs = r.getMaddrsFromCache(result.ID)
}

result.Addrs = r.withAddrsFromCache(addrQueryOriginProviders, result.ID, result.Addrs)
v.Val = result

Check warning on line 69 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L52-L69

Added lines #L52 - L69 were not covered by tests

//lint:ignore SA1019 // ignore staticcheck
case types.SchemaBitswap:
//lint:ignore SA1019 // ignore staticcheck
result, ok := v.Val.(*types.BitswapRecord)
if !ok {
logger.Errorw("problem casting find providers result", "Schema", v.Val.GetSchema(), "Type", reflect.TypeOf(v).String())
return v
}

if len(result.Addrs) == 0 {
result.Addrs = r.getMaddrsFromCache(result.ID)
}
result.Addrs = r.withAddrsFromCache(addrQueryOriginProviders, result.ID, result.Addrs)
v.Val = result

Check warning on line 79 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L71-L79

Added lines #L71 - L79 were not covered by tests
}

return v

Check warning on line 82 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L82

Added line #L82 was not covered by tests
}), nil
}

func (r cachedRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
2color marked this conversation as resolved.
Show resolved Hide resolved
2color marked this conversation as resolved.
Show resolved Hide resolved
// If FindPeers fails, it seems like there's no point returning results from the cache?
return r.router.FindPeers(ctx, pid, limit)
it, err := r.router.FindPeers(ctx, pid, limit)
if err != nil {
// check cache, if peer is unknown, return original error
cachedAddrs := r.withAddrsFromCache(addrQueryOriginPeers, &pid, nil)
if len(cachedAddrs) == 0 {
return nil, err
}

Check warning on line 93 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L86-L93

Added lines #L86 - L93 were not covered by tests
// if found in cache, return synthetic peer result based on cached addrs
var sliceIt iter.Iter[*types.PeerRecord] = iter.FromSlice([]*types.PeerRecord{&types.PeerRecord{
Schema: types.SchemaPeer,
ID: &pid,
Addrs: cachedAddrs,
}})
it = iter.ToResultIter(sliceIt)

Check warning on line 100 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L95-L100

Added lines #L95 - L100 were not covered by tests
}
return iter.Map(it, func(v iter.Result[*types.PeerRecord]) iter.Result[*types.PeerRecord] {
if v.Err != nil || v.Val == nil {
return v
}
switch v.Val.GetSchema() {
case types.SchemaPeer:
v.Val.Addrs = r.withAddrsFromCache(addrQueryOriginPeers, v.Val.ID, v.Val.Addrs)

Check warning on line 108 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L102-L108

Added lines #L102 - L108 were not covered by tests
}
return v

Check warning on line 110 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L110

Added line #L110 was not covered by tests
}), nil
}

//lint:ignore SA1019 // ignore staticcheck
func (r cachedRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {
return 0, routing.ErrNotSupported

Check warning on line 116 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L115-L116

Added lines #L115 - L116 were not covered by tests
}

// GetPeer returns a peer record for a given peer ID, or nil if the peer is not found
func (r cachedRouter) getMaddrsFromCache(pid *peer.ID) []types.Multiaddr {
// withAddrsFromCache returns the best list of addrs for specified [peer.ID].
// It will consult cache only if the addrs slice passed to it is empty.
func (r cachedRouter) withAddrsFromCache(queryOrigin string, pid *peer.ID, addrs []types.Multiaddr) []types.Multiaddr {
// skip cache if we already have addrs
if len(addrs) > 0 {
peerAddrLookups.WithLabelValues(addrCacheStateUnused, queryOrigin).Inc()
return addrs
}

Check warning on line 126 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L121-L126

Added lines #L121 - L126 were not covered by tests

cachedAddrs := r.cachedAddrBook.GetCachedAddrs(pid)
if len(cachedAddrs) > 0 {
logger.Debugw("found cached addresses", "peer", pid, "cachedAddrs", cachedAddrs)
peerAddrLookups.WithLabelValues(addrCacheStateHit, queryOrigin).Inc()
return cachedAddrs
} else {
peerAddrLookups.WithLabelValues(addrCacheStateMiss, queryOrigin).Inc()
return nil
}

Check warning on line 136 in server_cached_router.go

View check run for this annotation

Codecov / codecov/patch

server_cached_router.go#L128-L136

Added lines #L128 - L136 were not covered by tests
}
Loading