Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autorelay: send addresses on eventbus; dont wrap address factory #3071

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 8 additions & 26 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.B
DisableIdentifyAddressDiscovery: cfg.DisableIdentifyAddressDiscovery,
EnableAutoNATv2: cfg.EnableAutoNATv2,
AutoNATv2Dialer: autonatv2Dialer,
EnableAutoRelay: cfg.EnableAutoRelay,
AutoRelayOpts: cfg.AutoRelayOpts,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -518,28 +520,6 @@ func (cfg *Config) NewNode() (host.Host, error) {
)
}

// enable autorelay
fxopts = append(fxopts,
fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) error {
if cfg.EnableAutoRelay {
if !cfg.DisableMetrics {
mt := autorelay.WithMetricsTracer(
autorelay.NewMetricsTracer(autorelay.WithRegisterer(cfg.PrometheusRegisterer)))
mtOpts := []autorelay.Option{mt}
cfg.AutoRelayOpts = append(mtOpts, cfg.AutoRelayOpts...)
}

ar, err := autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...)
if err != nil {
return err
}
lifecycle.Append(fx.StartStopHook(ar.Start, ar.Close))
return nil
}
return nil
}),
)

var bh *bhost.BasicHost
fxopts = append(fxopts, fx.Invoke(func(bho *bhost.BasicHost) { bh = bho }))
fxopts = append(fxopts, fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) {
Expand All @@ -554,12 +534,10 @@ func (cfg *Config) NewNode() (host.Host, error) {
fxopts = append(fxopts, cfg.UserFxOptions...)

app := fx.New(fxopts...)
if err := app.Start(context.Background()); err != nil {
return nil, err
if app.Err() != nil {
return nil, fmt.Errorf("failed to create host: %w", app.Err())
}

if err := cfg.addAutoNAT(bh); err != nil {
app.Stop(context.Background())
if cfg.Routing != nil {
rh.Close()
} else {
Expand All @@ -568,6 +546,10 @@ func (cfg *Config) NewNode() (host.Host, error) {
return nil, err
}

if err := app.Start(context.Background()); err != nil {
return nil, err
}

if cfg.Routing != nil {
return &closableRoutedHost{App: app, RoutedHost: rh}, nil
}
Expand Down
5 changes: 5 additions & 0 deletions core/event/addrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,8 @@ type EvtLocalAddressesUpdated struct {
// wrapped in a record.Envelope and signed by the Host's private key.
SignedPeerRecord *record.Envelope
}

// EvtAutoRelayAddrsUpdated is sent by the autorelay when the node's relay addresses are updated
type EvtAutoRelayAddrs struct {
RelayAddrs []ma.Multiaddr
}
15 changes: 4 additions & 11 deletions p2p/host/autorelay/addrsplosion.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (

// This function cleans up a relay's address set to remove private addresses and curtail
// addrsplosion.
// TODO: Remove this, we don't need this. The current method tries to select the
// best address for the relay. Instead we should rely on the addresses provided by the
// relay in response to the reservation request.
func cleanupAddressSet(addrs []ma.Multiaddr) []ma.Multiaddr {
var public, private []ma.Multiaddr

Expand All @@ -17,7 +20,7 @@ func cleanupAddressSet(addrs []ma.Multiaddr) []ma.Multiaddr {
continue
}

if manet.IsPublicAddr(a) || isDNSAddr(a) {
if manet.IsPublicAddr(a) {
public = append(public, a)
continue
}
Expand Down Expand Up @@ -51,16 +54,6 @@ func isRelayAddr(a ma.Multiaddr) bool {
return isRelay
}

func isDNSAddr(a ma.Multiaddr) bool {
if first, _ := ma.SplitFirst(a); first != nil {
switch first.Protocol().Code {
case ma.P_DNS, ma.P_DNS4, ma.P_DNS6, ma.P_DNSADDR:
return true
}
}
return false
}

// we have addrsplosion if for some protocol we advertise multiple ports on
// the same base address.
func hasAddrsplosion(addrs []ma.Multiaddr) bool {
Expand Down
37 changes: 13 additions & 24 deletions p2p/host/autorelay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package autorelay
import (
"context"
"errors"
"fmt"
"sync"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
ma "github.com/multiformats/go-multiaddr"

logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
)

var log = logging.Logger("autorelay")
Expand All @@ -22,8 +22,6 @@ type AutoRelay struct {
ctx context.Context
ctxCancel context.CancelFunc

conf *config

mx sync.Mutex
status network.Reachability

Expand All @@ -34,9 +32,9 @@ type AutoRelay struct {
metricsTracer MetricsTracer
}

func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
func NewAutoRelay(host host.Host, opts ...Option) (*AutoRelay, error) {
r := &AutoRelay{
host: bhost,
host: host,
status: network.ReachabilityUnknown,
}
conf := defaultConfig
Expand All @@ -46,29 +44,20 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
}
}
r.ctx, r.ctxCancel = context.WithCancel(context.Background())
r.conf = &conf
r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf)
r.metricsTracer = &wrappedMetricsTracer{conf.metricsTracer}

// Update the host address factory to use autorelay addresses if we're private
//
// TODO: Don't update host address factory. Instead send our relay addresses on the eventbus.
// The host can decide how to handle those.
addrF := bhost.AddrsFactory
bhost.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
addrs = addrF(addrs)
r.mx.Lock()
defer r.mx.Unlock()

if r.status != network.ReachabilityPrivate {
return addrs
}
return r.relayFinder.relayAddrs(addrs)
rf, err := newRelayFinder(host, &conf)
if err != nil {
return nil, fmt.Errorf("failed to create autorelay: %w", err)
}
r.relayFinder = rf
r.metricsTracer = &wrappedMetricsTracer{conf.metricsTracer}

return r, nil
}

func (r *AutoRelay) RelayAddrs() []ma.Multiaddr {
return r.relayFinder.RelayAddrs()
}

func (r *AutoRelay) Start() {
r.refCount.Add(1)
go func() {
Expand Down
70 changes: 69 additions & 1 deletion p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package autorelay_test
import (
"context"
"fmt"
"slices"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -96,7 +98,10 @@ func newRelay(t *testing.T) host.Host {
saddr := addr.String()
if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") {
addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1")
addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP)
// .internal is classified as a public address as users
// are free to map this dns to a public ip address for
// use within a LAN
addrs[i] = ma.StringCast("/dns/libp2p.internal" + addrNoIP)
}
}
return addrs
Expand Down Expand Up @@ -517,3 +522,66 @@ func TestNoBusyLoop0MinInterval(t *testing.T) {
val := atomic.LoadUint64(&calledTimes)
require.Less(t, val, uint64(2))
}
func TestAutoRelayAddrsEvent(t *testing.T) {
cl := newMockClock()
r1, r2 := newRelay(t), newRelay(t)
t.Cleanup(func() {
r1.Close()
r2.Close()
})

relayFromP2PAddr := func(a ma.Multiaddr) peer.ID {
r, c := ma.SplitLast(a)
if c.Protocol().Code != ma.P_CIRCUIT {
return ""
}
if id, err := peer.IDFromP2PAddr(r); err == nil {
return id
}
return ""
}

checkPeersExist := func(addrs []ma.Multiaddr, peers ...peer.ID) bool {
for _, p := range peers {
if !slices.ContainsFunc(addrs, func(a ma.Multiaddr) bool { return relayFromP2PAddr(a) == p }) {
return false
}
}
return true
}
peerChan := make(chan peer.AddrInfo, 3)
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
return peerChan
},
autorelay.WithClock(cl),
autorelay.WithMinCandidates(1),
autorelay.WithMaxCandidates(10),
autorelay.WithNumRelays(3),
autorelay.WithBootDelay(1*time.Second),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()

sub, err := h.EventBus().Subscribe(new(event.EvtAutoRelayAddrs))
require.NoError(t, err)

peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()}
cl.AdvanceBy(time.Second)

require.Eventually(t, func() bool {
e := <-sub.Out()
if !checkPeersExist(e.(event.EvtAutoRelayAddrs).RelayAddrs, r1.ID()) {
return false
}
if checkPeersExist(e.(event.EvtAutoRelayAddrs).RelayAddrs, r2.ID()) {
return false
}
return true
}, 5*time.Second, 50*time.Millisecond)
peerChan <- peer.AddrInfo{ID: r2.ID(), Addrs: r2.Addrs()}
require.Eventually(t, func() bool {
e := <-sub.Out()
return checkPeersExist(e.(event.EvtAutoRelayAddrs).RelayAddrs, r1.ID(), r2.ID())
}, 5*time.Second, 50*time.Millisecond)
}
19 changes: 0 additions & 19 deletions p2p/host/autorelay/relay.go

This file was deleted.

Loading
Loading