Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add active peer probing and a cached addr book #90

Merged
merged 80 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
3896470
feat: add cached peer book with higher ttls
2color Nov 27, 2024
7dd33ca
feat: initial implementation of active peer probing
2color Nov 27, 2024
0e86ea4
feat: use the cached router
2color Nov 27, 2024
ec2a67a
chore: go mod tidy
2color Nov 27, 2024
fe68140
feat: log probe duration
2color Nov 27, 2024
06c2d0c
chore: log in probe loop
2color Nov 27, 2024
fc76783
fix: update peer state if doesn't exist
2color Nov 27, 2024
e904c3e
fix: add addresses to cached address book
2color Nov 27, 2024
814ae58
fix: wrap with cached router only if available
2color Nov 27, 2024
a4d6456
feat: make everything a little bit better
2color Nov 27, 2024
81feca7
chore: small refinements
2color Nov 28, 2024
e75992f
test: add test for cached addr book
2color Nov 28, 2024
a20a4c3
chore: rename files
2color Nov 28, 2024
c5f1d62
feat: add options to cached addr book
2color Nov 28, 2024
e678be8
feat: add instrumentation
2color Nov 28, 2024
a0965bc
fix: thread safety
2color Nov 28, 2024
d82ad0f
docs: update changelog
2color Nov 28, 2024
a84d5f6
fix: small fixes
2color Nov 28, 2024
9ab02e1
fix: simplify cached router
2color Nov 28, 2024
9658af8
feat(metric): cached_router_peer_addr_lookups
lidel Nov 28, 2024
7cdb5be
Apply suggestions from code review
2color Nov 29, 2024
762136e
Update CHANGELOG.md
2color Nov 29, 2024
2cf46d4
chore: use service name for namespace
2color Nov 29, 2024
a0d5c62
fix: type errors and missing imports
2color Nov 29, 2024
75f1bf2
feat: add queue probe
2color Nov 29, 2024
4cbaa91
Revert "feat: add queue probe"
2color Nov 29, 2024
d038301
chore: simplify composite literal
2color Dec 2, 2024
796e94f
fix: implement custom cache fallback iterator
2color Dec 3, 2024
2e4d12c
fix: add cancel and simplify
2color Dec 3, 2024
811dce8
fix: move select to Val function
2color Dec 3, 2024
b4da9cd
fix: concurrency bug from the ongoingLookups
2color Dec 4, 2024
d00fcb4
chore: clean up comments
2color Dec 4, 2024
6219804
fix: add lint ignores
2color Dec 4, 2024
662f0d4
docs: update changelog
2color Dec 4, 2024
c812cf4
fix: increase bucket sizes for probe duration
2color Dec 4, 2024
8646f38
chore: remove unused peer state fields
2color Dec 4, 2024
46a74a3
feat: enable caching for FindPeer in cached router
2color Dec 4, 2024
d9601e4
fix: handle peer not found case
2color Dec 4, 2024
986b010
Apply suggestions from code review
2color Dec 5, 2024
ecd0757
fix: wait longer during cleanup function
2color Dec 5, 2024
a0443d0
test: remove bitswap record test
2color Dec 5, 2024
22aacd7
refactor: extract connectedness checks to a func
2color Dec 5, 2024
fe372ac
fix: set ttl for both signed and unsigned addrs
2color Dec 5, 2024
03a4078
fix: prevent race condition
2color Dec 5, 2024
84393fd
feat: use 2q-lru cache for peer state
2color Dec 5, 2024
d466dc7
chore: remove return count
2color Dec 5, 2024
8078cb5
test: improve reliability of tests
2color Dec 5, 2024
7decf6c
fix: record failed connections
2color Dec 5, 2024
b536e82
feat: add exponential backoff for probes/peer lookups
2color Dec 5, 2024
7182699
fix: return peers with no addrs that wont probe
2color Dec 5, 2024
b0b24e0
fix: brittle test
2color Dec 6, 2024
697457d
feat: add probed peers counter
2color Dec 6, 2024
7fcf45f
fix: adjust probe duration metric buckets
2color Dec 6, 2024
1718215
fix: prevent race conditions
2color Dec 6, 2024
dc57e9f
feat: increase cache size and add max backoff
2color Dec 6, 2024
c5abeec
fix: omit providers whose peer cannot be found
2color Dec 9, 2024
0cc76f9
chore: remove unused function
2color Dec 10, 2024
f0e0bd4
deps: upgrade go-libp2p
2color Dec 10, 2024
2211aae
fix: avoid using the cache in FindPeers
2color Dec 10, 2024
be5958a
fix: do not return cached results for FindPeers
2color Dec 11, 2024
af7c3a8
refactor: small optimisation
2color Dec 11, 2024
62c0d9f
chore: re-add comment
2color Dec 11, 2024
8b36b0c
Apply suggestions from code review
2color Dec 16, 2024
b58b50d
Apply suggestions from code review
2color Dec 16, 2024
41922af
fix: use separate context for dispatched jobs
2color Dec 16, 2024
06cef21
fix: ensure proper cleanup of cache fallback iter
2color Dec 16, 2024
7a2160a
Update main.go
2color Dec 16, 2024
84bc4f7
fix: formatting
2color Dec 16, 2024
0c28c6b
fix: let consumer handle cleanup
2color Dec 16, 2024
e0a601f
fix: remove from address book when removed from peer state
2color Dec 17, 2024
7f0ec50
fix: use normal lru cache instead of 2Q
2color Dec 17, 2024
2e025eb
fix: update the metric when removing from the peer cache
2color Dec 17, 2024
6b4b40d
fix: increase max backoff to 48 hours
2color Dec 17, 2024
fe7ad54
feat: add env var for recently connected ttl
2color Dec 17, 2024
49efe9b
feat: add env var to control active probing
2color Dec 17, 2024
8ca4d19
fix: bug from closing the iterator twice
2color Dec 17, 2024
317ccb7
docs: update comment
2color Dec 17, 2024
327f9cb
docs: improve changelog
2color Dec 17, 2024
48e1943
test: fix background test
2color Dec 17, 2024
c1ac41b
feat(metrics): track online vs offline probe ratio
lidel Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ The following emojis are used to highlight certain changes:

### Added

2color marked this conversation as resolved.
Show resolved Hide resolved
- Added a new `cachedAddrBook` implementation that caches peer addresses and probes them in the background.
- Added a new `cachedRouter` that uses `cachedAddrBook` to retrieve cached addresses for peers without multiaddrs.

### Changed

### Removed
Expand Down
275 changes: 275 additions & 0 deletions cached_addr_book.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
package main

import (
"context"
"io"
"math"
"sync"
"sync/atomic"
"time"

"github.com/ipfs/boxo/routing/http/types"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
probeDurationHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "probe_duration_seconds",
Namespace: "someguy",
Subsystem: "cached_addr_book",
Help: "Duration of peer probing operations in seconds",
// Buckets optimized for expected probe durations from ms to full timeout
Buckets: []float64{0.5, 1, 2, 5, 10, 30, 60, 120},
})

peerStateSize = promauto.NewGauge(prometheus.GaugeOpts{
Name: "peer_state_size",
Subsystem: "cached_addr_book",
Namespace: "someguy",
Help: "Number of peers object currently in the peer state",
})
)

const (
// The TTL to keep recently connected peers for. Same as DefaultProviderAddrTTL in go-libp2p-kad-dht
RecentlyConnectedAddrTTL = time.Hour * 24
2color marked this conversation as resolved.
Show resolved Hide resolved

// Connected peers don't expire until they disconnect
ConnectedAddrTTL = math.MaxInt64
2color marked this conversation as resolved.
Show resolved Hide resolved

// How long to wait since last connection before probing a peer again
PeerProbeThreshold = time.Hour
lidel marked this conversation as resolved.
Show resolved Hide resolved

// How often to run the probe peers function
ProbeInterval = time.Minute * 5
2color marked this conversation as resolved.
Show resolved Hide resolved

// How many concurrent probes to run at once
MaxConcurrentProbes = 20

// How many connect failures to tolerate before clearing a peer's addresses
MaxConnectFailures = 3

// How long to wait for a connect in a probe to complete
ConnectTimeout = time.Second * 10
2color marked this conversation as resolved.
Show resolved Hide resolved
)

type peerState struct {
lastConnTime time.Time // last time we successfully connected to this peer
lastConnAddr ma.Multiaddr // last address we connected to this peer on
returnCount int // number of times we've returned this peer from the cache
lastReturnTime time.Time // last time we returned this peer from the cache
connectFailures int // number of times we've failed to connect to this peer
}

type cachedAddrBook struct {
addrBook peerstore.AddrBook
peers map[peer.ID]*peerState
mu sync.RWMutex // Add mutex for thread safety
isProbing atomic.Bool
allowPrivateIPs bool // for testing
}

type AddrBookOption func(*cachedAddrBook) error

func WithAllowPrivateIPs() AddrBookOption {
return func(cab *cachedAddrBook) error {
cab.allowPrivateIPs = true
return nil
}
}

func newCachedAddrBook(opts ...AddrBookOption) (*cachedAddrBook, error) {
cab := &cachedAddrBook{
peers: make(map[peer.ID]*peerState),
addrBook: pstoremem.NewAddrBook(),
}

for _, opt := range opts {
err := opt(cab)
if err != nil {
return nil, err
}

Check warning on line 100 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}
return cab, nil
}

func (cab *cachedAddrBook) background(ctx context.Context, host host.Host) {
sub, err := host.EventBus().Subscribe([]interface{}{
&event.EvtPeerIdentificationCompleted{},
&event.EvtPeerConnectednessChanged{},
})
if err != nil {
logger.Errorf("failed to subscribe to peer identification events: %v", err)
return
}

Check warning on line 113 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L111-L113

Added lines #L111 - L113 were not covered by tests
defer sub.Close()

probeTicker := time.NewTicker(ProbeInterval)
defer probeTicker.Stop()

for {
select {
case <-ctx.Done():
cabCloser, ok := cab.addrBook.(io.Closer)
if ok {
errClose := cabCloser.Close()
if errClose != nil {
logger.Warnf("failed to close addr book: %v", errClose)
}

Check warning on line 127 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L126-L127

Added lines #L126 - L127 were not covered by tests
}
return
case ev := <-sub.Out():
switch ev := ev.(type) {
case event.EvtPeerIdentificationCompleted:
cab.mu.Lock()
pState, exists := cab.peers[ev.Peer]
if !exists {
pState = &peerState{}
cab.peers[ev.Peer] = pState
peerStateSize.Set(float64(len(cab.peers)))
}
pState.lastConnTime = time.Now()
pState.lastConnAddr = ev.Conn.RemoteMultiaddr()
pState.connectFailures = 0 // reset connect failures on successful connection
cab.mu.Unlock()

if ev.SignedPeerRecord != nil {
logger.Debug("Caching signed peer record")
cab, ok := peerstore.GetCertifiedAddrBook(cab.addrBook)
if ok {
ttl := RecentlyConnectedAddrTTL
if host.Network().Connectedness(ev.Peer) == network.Connected || host.Network().Connectedness(ev.Peer) == network.Limited {
ttl = ConnectedAddrTTL
}
2color marked this conversation as resolved.
Show resolved Hide resolved
_, err := cab.ConsumePeerRecord(ev.SignedPeerRecord, ttl)
if err != nil {
logger.Warnf("failed to consume signed peer record: %v", err)
}

Check warning on line 156 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L146-L156

Added lines #L146 - L156 were not covered by tests
}
} else {
logger.Debug("No signed peer record, caching listen addresses")
// We don't have a signed peer record, so we use the listen addresses
cab.addrBook.AddAddrs(ev.Peer, ev.ListenAddrs, ConnectedAddrTTL)
}
case event.EvtPeerConnectednessChanged:
// If the peer is not connected or limited, we update the TTL
if ev.Connectedness != network.Connected && ev.Connectedness != network.Limited {
cab.addrBook.UpdateAddrs(ev.Peer, ConnectedAddrTTL, RecentlyConnectedAddrTTL)
}

Check warning on line 167 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L163-L167

Added lines #L163 - L167 were not covered by tests
}
case <-probeTicker.C:
if cab.isProbing.Load() {
logger.Debug("Skipping peer probe, still running")
continue

Check warning on line 172 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L169-L172

Added lines #L169 - L172 were not covered by tests
}
logger.Debug("Starting to probe peers")
go cab.probePeers(ctx, host)

Check warning on line 175 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L174-L175

Added lines #L174 - L175 were not covered by tests
}
// TODO: Add some cleanup logic to remove peers that haven't been returned from the cache in a while or have failed to connect too many times
2color marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Loops over all peers with addresses and probes them if they haven't been probed recently
func (cab *cachedAddrBook) probePeers(ctx context.Context, host host.Host) {
cab.isProbing.Store(true)
defer cab.isProbing.Store(false)

start := time.Now()
defer func() {
duration := time.Since(start).Seconds()
probeDurationHistogram.Observe(duration)
logger.Debugf("Finished probing peers in %s", duration)
}()

wg := sync.WaitGroup{}
2color marked this conversation as resolved.
Show resolved Hide resolved
// semaphore channel to limit the number of concurrent probes
semaphore := make(chan struct{}, MaxConcurrentProbes)

for i, p := range cab.addrBook.PeersWithAddrs() {
connectedness := host.Network().Connectedness(p)
if connectedness == network.Connected || connectedness == network.Limited {
continue // don't probe connected peers

Check warning on line 200 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L200

Added line #L200 was not covered by tests
}

cab.mu.RLock()
if time.Since(cab.peers[p].lastConnTime) < PeerProbeThreshold {
cab.mu.RUnlock()
continue // don't probe peers below the probe threshold

Check warning on line 206 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L205-L206

Added lines #L205 - L206 were not covered by tests
}
if cab.peers[p].connectFailures > MaxConnectFailures {
cab.addrBook.ClearAddrs(p) // clear the peer's addresses
cab.mu.RUnlock()
continue // don't probe this peer

Check warning on line 211 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L209-L211

Added lines #L209 - L211 were not covered by tests
}
cab.mu.RUnlock()
addrs := cab.addrBook.Addrs(p)

if !cab.allowPrivateIPs {
addrs = ma.FilterAddrs(addrs, manet.IsPublicAddr)
}

Check warning on line 218 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L217-L218

Added lines #L217 - L218 were not covered by tests

if len(addrs) == 0 {
continue // no addresses to probe

Check warning on line 221 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L221

Added line #L221 was not covered by tests
}

wg.Add(1)
go func() {
semaphore <- struct{}{}
2color marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
<-semaphore // Release semaphore
wg.Done()
}()

ctx, cancel := context.WithTimeout(ctx, ConnectTimeout)
defer cancel()
logger.Debugf("Probe %d: PeerID: %s, Addrs: %v", i+1, p, addrs)
// if connect succeeds and identify runs, the background loop will take care of updating the peer state and cache
err := host.Connect(ctx, peer.AddrInfo{
ID: p,
// TODO: Should we should probe the last connected address or all addresses?
Addrs: addrs,
2color marked this conversation as resolved.
Show resolved Hide resolved
})
if err != nil {
logger.Debugf("failed to connect to peer %s: %v", p, err)
cab.mu.Lock() // Lock before accessing shared state
cab.peers[p].connectFailures++
cab.mu.Unlock()
}
}()
}
wg.Wait()
}

// Returns the cached addresses for a peer, incrementing the return count
func (cab *cachedAddrBook) GetCachedAddrs(p *peer.ID) []types.Multiaddr {
cachedAddrs := cab.addrBook.Addrs(*p)

if len(cachedAddrs) == 0 {
return nil
}

Check warning on line 258 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L257-L258

Added lines #L257 - L258 were not covered by tests

cab.mu.Lock()
// Initialize peer state if it doesn't exist
if _, exists := cab.peers[*p]; !exists {
cab.peers[*p] = &peerState{}
peerStateSize.Set(float64(len(cab.peers)))
}

Check warning on line 265 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L263-L265

Added lines #L263 - L265 were not covered by tests
cab.peers[*p].returnCount++
cab.peers[*p].lastReturnTime = time.Now()
cab.mu.Unlock()

var result []types.Multiaddr // convert to local Multiaddr type 🙃
for _, addr := range cachedAddrs {
result = append(result, types.Multiaddr{Multiaddr: addr})
}
return result
}
Loading
Loading