From f212a0f823017c60e30bd284f8f20be19873e882 Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 5 Jun 2024 22:08:11 +0530 Subject: [PATCH] review comments --- p2p/host/basic/basic_host.go | 6 +++ p2p/net/swarm/black_hole_detector.go | 2 +- p2p/protocol/autonatv2/autonat.go | 71 +++++++++++++------------- p2p/protocol/autonatv2/autonat_test.go | 20 ++++---- p2p/protocol/autonatv2/client.go | 33 +++++++++--- p2p/protocol/autonatv2/options.go | 8 +-- p2p/protocol/autonatv2/server.go | 58 ++++++++++----------- p2p/protocol/autonatv2/server_test.go | 23 ++++----- 8 files changed, 119 insertions(+), 102 deletions(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index e5b1ae0b34..766b1b13bc 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -410,6 +410,12 @@ func (h *BasicHost) Start() { h.psManager.Start() h.refCount.Add(1) h.ids.Start() + if h.autonatv2 != nil { + err := h.autonatv2.Start() + if err != nil { + log.Errorf("autonat v2 failed to start: %s", err) + } + } go h.background() } diff --git a/p2p/net/swarm/black_hole_detector.go b/p2p/net/swarm/black_hole_detector.go index 918ee5b07e..0aec8e6696 100644 --- a/p2p/net/swarm/black_hole_detector.go +++ b/p2p/net/swarm/black_hole_detector.go @@ -39,7 +39,7 @@ func (st blackHoleState) String() string { type BlackHoleFilter struct { // N is // 1. The minimum number of completed dials required before evaluating black hole state - // 2. the minimum number of requests after which we probe the state of the black hole in + // 2. the minimum number of requests after which we probe the state of the black hole in // blocked state N int // MinSuccesses is the minimum number of Success required in the last n dials diff --git a/p2p/protocol/autonatv2/autonat.go b/p2p/protocol/autonatv2/autonat.go index 3c699c171f..1ae11edbfa 100644 --- a/p2p/protocol/autonatv2/autonat.go +++ b/p2p/protocol/autonatv2/autonat.go @@ -55,8 +55,6 @@ type Request struct { // Result is the result of the CheckReachability call type Result struct { - // Idx is the index of the dialed address - Idx int // Addr is the dialed address Addr ma.Multiaddr // Reachability of the dialed address @@ -70,7 +68,6 @@ type Result struct { // The server provides amplification attack prevention and rate limiting. type AutoNAT struct { host host.Host - sub event.Subscription // for cleanly closing ctx context.Context @@ -82,10 +79,9 @@ type AutoNAT struct { mx sync.Mutex peers *peersMap - - // allowAllAddrs enables using private and localhost addresses for reachability checks. + // allowPrivateAddrs enables using private and localhost addresses for reachability checks. // This is only useful for testing. - allowAllAddrs bool + allowPrivateAddrs bool } // New returns a new AutoNAT instance. @@ -98,47 +94,28 @@ func New(host host.Host, dialerHost host.Host, opts ...AutoNATOption) (*AutoNAT, return nil, fmt.Errorf("failed to apply option: %w", err) } } - // Listen on event.EvtPeerProtocolsUpdated, event.EvtPeerConnectednessChanged - // event.EvtPeerIdentificationCompleted to maintain our set of autonat supporting peers. - sub, err := host.EventBus().Subscribe([]interface{}{ - new(event.EvtPeerProtocolsUpdated), - new(event.EvtPeerConnectednessChanged), - new(event.EvtPeerIdentificationCompleted), - }) - if err != nil { - return nil, fmt.Errorf("event subscription failed: %w", err) - } ctx, cancel := context.WithCancel(context.Background()) an := &AutoNAT{ - host: host, - ctx: ctx, - cancel: cancel, - sub: sub, - srv: newServer(host, dialerHost, s), - cli: newClient(host), - allowAllAddrs: s.allowAllAddrs, - peers: newPeersMap(), + host: host, + ctx: ctx, + cancel: cancel, + srv: newServer(host, dialerHost, s), + cli: newClient(host), + allowPrivateAddrs: s.allowPrivateAddrs, + peers: newPeersMap(), } - an.cli.RegisterDialBack() - an.srv.Enable() - - an.wg.Add(1) - go an.background() return an, nil } -func (an *AutoNAT) background() { +func (an *AutoNAT) background(sub event.Subscription) { for { select { case <-an.ctx.Done(): - an.srv.Disable() - an.srv.Close() - an.sub.Close() - an.peers = nil + sub.Close() an.wg.Done() return - case e := <-an.sub.Out(): + case e := <-sub.Out(): switch evt := e.(type) { case event.EvtPeerProtocolsUpdated: an.updatePeer(evt.Peer) @@ -151,14 +128,36 @@ func (an *AutoNAT) background() { } } +func (an *AutoNAT) Start() error { + // Listen on event.EvtPeerProtocolsUpdated, event.EvtPeerConnectednessChanged + // event.EvtPeerIdentificationCompleted to maintain our set of autonat supporting peers. + sub, err := an.host.EventBus().Subscribe([]interface{}{ + new(event.EvtPeerProtocolsUpdated), + new(event.EvtPeerConnectednessChanged), + new(event.EvtPeerIdentificationCompleted), + }) + if err != nil { + return fmt.Errorf("event subscription failed: %w", err) + } + an.cli.Start() + an.srv.Start() + + an.wg.Add(1) + go an.background(sub) + return nil +} + func (an *AutoNAT) Close() { an.cancel() an.wg.Wait() + an.srv.Close() + an.cli.Close() + an.peers = nil } // GetReachability makes a single dial request for checking reachability for requested addresses func (an *AutoNAT) GetReachability(ctx context.Context, reqs []Request) (Result, error) { - if !an.allowAllAddrs { + if !an.allowPrivateAddrs { for _, r := range reqs { if !manet.IsPublicAddr(r.Addr) { return Result{}, fmt.Errorf("private address cannot be verified by autonatv2: %s", r.Addr) diff --git a/p2p/protocol/autonatv2/autonat_test.go b/p2p/protocol/autonatv2/autonat_test.go index 147cce735d..4fb6a7e0ea 100644 --- a/p2p/protocol/autonatv2/autonat_test.go +++ b/p2p/protocol/autonatv2/autonat_test.go @@ -40,8 +40,8 @@ func newAutoNAT(t testing.TB, dialer host.Host, opts ...AutoNATOption) *AutoNAT if err != nil { t.Error(err) } - an.srv.Enable() - an.cli.RegisterDialBack() + an.Start() + t.Cleanup(an.Close) return an } @@ -92,7 +92,7 @@ func TestAutoNATPrivateAddr(t *testing.T) { } func TestClientRequest(t *testing.T) { - an := newAutoNAT(t, nil, allowAllAddrs) + an := newAutoNAT(t, nil, allowPrivateAddrs) defer an.Close() defer an.host.Close() @@ -127,7 +127,7 @@ func TestClientRequest(t *testing.T) { } func TestClientServerError(t *testing.T) { - an := newAutoNAT(t, nil, allowAllAddrs) + an := newAutoNAT(t, nil, allowPrivateAddrs) defer an.Close() defer an.host.Close() @@ -184,7 +184,7 @@ func TestClientServerError(t *testing.T) { } func TestClientDataRequest(t *testing.T) { - an := newAutoNAT(t, nil, allowAllAddrs) + an := newAutoNAT(t, nil, allowPrivateAddrs) defer an.Close() defer an.host.Close() @@ -299,7 +299,7 @@ func TestClientDataRequest(t *testing.T) { } func TestClientDialBacks(t *testing.T) { - an := newAutoNAT(t, nil, allowAllAddrs) + an := newAutoNAT(t, nil, allowPrivateAddrs) defer an.Close() defer an.host.Close() @@ -605,10 +605,10 @@ func TestAreAddrsConsistency(t *testing.T) { success: true, }, { - name: "nat64 match", + name: "nat64", localAddr: ma.StringCast("/ip6/1::1/tcp/12345"), dialAddr: ma.StringCast("/ip4/1.2.3.4/tcp/23232"), - success: true, + success: false, }, { name: "simple mismatch", @@ -623,8 +623,8 @@ func TestAreAddrsConsistency(t *testing.T) { success: false, }, { - name: "nat64 mismatch", - localAddr: ma.StringCast("/ip4/192.168.0.1/udp/12345/quic-v1"), + name: "dns", + localAddr: ma.StringCast("/dns/lib.p2p/udp/12345/quic-v1"), dialAddr: ma.StringCast("/ip6/1::1/udp/123/quic-v1/"), success: false, }, diff --git a/p2p/protocol/autonatv2/client.go b/p2p/protocol/autonatv2/client.go index 62f14d99c1..947815ea4b 100644 --- a/p2p/protocol/autonatv2/client.go +++ b/p2p/protocol/autonatv2/client.go @@ -33,11 +33,14 @@ func newClient(h host.Host) *client { return &client{host: h, dialData: make([]byte, 4000), dialBackQueues: make(map[uint64]chan ma.Multiaddr)} } -// RegisterDialBack registers the client to receive DialBack streams initiated by the server to send the nonce. -func (ac *client) RegisterDialBack() { +func (ac *client) Start() { ac.host.SetStreamHandler(DialBackProtocol, ac.handleDialBack) } +func (ac *client) Close() { + ac.host.RemoveStreamHandler(DialBackProtocol) +} + // GetReachability verifies address reachability with a AutoNAT v2 server p. func (ac *client) GetReachability(ctx context.Context, p peer.ID, reqs []Request) (Result, error) { ctx, cancel := context.WithTimeout(ctx, streamTimeout) @@ -166,7 +169,7 @@ func (ac *client) newResult(resp *pb.DialResponse, reqs []Request, dialBackAddr // the server is misinforming us about the address it successfully dialed // either we received no dialback or the address on the dialback is inconsistent with // what the server is telling us - return Result{}, fmt.Errorf("invalid repsonse: dialBackAddr: %s, respAddr: %s", dialBackAddr, addr) + return Result{}, fmt.Errorf("invalid response: dialBackAddr: %s, respAddr: %s", dialBackAddr, addr) } rch = network.ReachabilityPublic case pb.DialStatus_E_DIAL_ERROR: @@ -187,7 +190,6 @@ func (ac *client) newResult(resp *pb.DialResponse, reqs []Request, dialBackAddr } return Result{ - Idx: idx, Addr: addr, Reachability: rch, Status: resp.DialStatus, @@ -293,11 +295,26 @@ func areAddrsConsistent(local, external ma.Multiaddr) bool { } for i := 0; i < len(localProtos); i++ { if i == 0 { - if localProtos[i].Code == externalProtos[i].Code || - localProtos[i].Code == ma.P_IP6 && externalProtos[i].Code == ma.P_IP4 /* NAT64 */ { - continue + switch externalProtos[i].Code { + case ma.P_DNS, ma.P_DNSADDR: + if localProtos[i].Code == ma.P_IP4 || localProtos[i].Code == ma.P_IP6 { + continue + } + return false + case ma.P_DNS4: + if localProtos[i].Code == ma.P_IP4 { + continue + } + return false + case ma.P_DNS6: + if localProtos[i].Code == ma.P_IP6 { + continue + } + return false + } + if localProtos[i].Code != externalProtos[i].Code { + return false } - return false } else { if localProtos[i].Code != externalProtos[i].Code { return false diff --git a/p2p/protocol/autonatv2/options.go b/p2p/protocol/autonatv2/options.go index 3a59d8d823..0bbe012920 100644 --- a/p2p/protocol/autonatv2/options.go +++ b/p2p/protocol/autonatv2/options.go @@ -4,7 +4,7 @@ import "time" // autoNATSettings is used to configure AutoNAT type autoNATSettings struct { - allowAllAddrs bool + allowPrivateAddrs bool serverRPM int serverPerPeerRPM int serverDialDataRPM int @@ -14,7 +14,7 @@ type autoNATSettings struct { func defaultSettings() *autoNATSettings { return &autoNATSettings{ - allowAllAddrs: false, + allowPrivateAddrs: false, // TODO: confirm rate limiting defaults serverRPM: 20, serverPerPeerRPM: 2, @@ -42,7 +42,7 @@ func withDataRequestPolicy(drp dataRequestPolicyFunc) AutoNATOption { } } -func allowAllAddrs(s *autoNATSettings) error { - s.allowAllAddrs = true +func allowPrivateAddrs(s *autoNATSettings) error { + s.allowPrivateAddrs = true return nil } diff --git a/p2p/protocol/autonatv2/server.go b/p2p/protocol/autonatv2/server.go index 4f165d0ec7..4dbe8525f5 100644 --- a/p2p/protocol/autonatv2/server.go +++ b/p2p/protocol/autonatv2/server.go @@ -42,7 +42,7 @@ func newServer(host, dialer host.Host, s *autoNATSettings) *server { dialerHost: dialer, host: host, dialDataRequestPolicy: s.dataRequestPolicy, - allowPrivateAddrs: s.allowAllAddrs, + allowPrivateAddrs: s.allowPrivateAddrs, limiter: &rateLimiter{ RPM: s.serverRPM, PerPeerRPM: s.serverPerPeerRPM, @@ -54,16 +54,12 @@ func newServer(host, dialer host.Host, s *autoNATSettings) *server { } // Enable attaches the stream handler to the host. -func (as *server) Enable() { +func (as *server) Start() { as.host.SetStreamHandler(DialProtocol, as.handleDialRequest) } -// Disable removes the stream handles from the host. -func (as *server) Disable() { - as.host.RemoveStreamHandler(DialProtocol) -} - func (as *server) Close() { + as.host.RemoveStreamHandler(DialProtocol) as.dialerHost.Close() } @@ -120,28 +116,7 @@ func (as *server) handleDialRequest(s network.Stream) { return } - nonce := msg.GetDialRequest().Nonce - // parse peer's addresses - var dialAddr ma.Multiaddr - var addrIdx int - for i, ab := range msg.GetDialRequest().GetAddrs() { - if i >= maxPeerAddresses { - break - } - a, err := ma.NewMultiaddrBytes(ab) - if err != nil { - continue - } - if !as.allowPrivateAddrs && !manet.IsPublicAddr(a) { - continue - } - if !as.dialerHost.Network().CanDial(p, a) { - continue - } - dialAddr = a - addrIdx = i - break - } + addrIdx, dialAddr := as.validateRequestAndGetAddr(p, &msg) // No dialable address if dialAddr == nil { msg = pb.Message{ @@ -185,6 +160,7 @@ func (as *server) handleDialRequest(s network.Stream) { } } + nonce := msg.GetDialRequest().Nonce dialStatus := as.dialBack(s.Conn().RemotePeer(), dialAddr, nonce) msg = pb.Message{ Msg: &pb.Message_DialResponse{ @@ -202,6 +178,30 @@ func (as *server) handleDialRequest(s network.Stream) { } } +func (as *server) validateRequestAndGetAddr(p peer.ID, msg *pb.Message) (int, ma.Multiaddr) { + var addrIdx int + var dialAddr ma.Multiaddr + for i, ab := range msg.GetDialRequest().GetAddrs() { + if i >= maxPeerAddresses { + break + } + a, err := ma.NewMultiaddrBytes(ab) + if err != nil { + continue + } + if !as.allowPrivateAddrs && !manet.IsPublicAddr(a) { + continue + } + if !as.dialerHost.Network().CanDial(p, a) { + continue + } + dialAddr = a + addrIdx = i + break + } + return addrIdx, dialAddr +} + // getDialData gets data from the client for dialing the address func getDialData(w pbio.Writer, r pbio.Reader, msg *pb.Message, addrIdx int) error { numBytes := minHandshakeSizeBytes + rand.Intn(maxHandshakeSizeBytes-minHandshakeSizeBytes) diff --git a/p2p/protocol/autonatv2/server_test.go b/p2p/protocol/autonatv2/server_test.go index 750bbfc096..d23d02c210 100644 --- a/p2p/protocol/autonatv2/server_test.go +++ b/p2p/protocol/autonatv2/server_test.go @@ -28,13 +28,13 @@ func newTestRequests(addrs []ma.Multiaddr, sendDialData bool) (reqs []Request) { } func TestServerInvalidAddrsRejected(t *testing.T) { - c := newAutoNAT(t, nil, allowAllAddrs) + c := newAutoNAT(t, nil, allowPrivateAddrs) defer c.Close() defer c.host.Close() t.Run("no transport", func(t *testing.T) { dialer := bhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptDisableQUIC, swarmt.OptDisableTCP)) - an := newAutoNAT(t, dialer, allowAllAddrs) + an := newAutoNAT(t, dialer, allowPrivateAddrs) defer an.Close() defer an.host.Close() @@ -96,7 +96,7 @@ func TestServerInvalidAddrsRejected(t *testing.T) { t.Run("too many address", func(t *testing.T) { dialer := bhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptDisableTCP)) - an := newAutoNAT(t, dialer, allowAllAddrs) + an := newAutoNAT(t, dialer, allowPrivateAddrs) defer an.Close() defer an.host.Close() @@ -115,7 +115,7 @@ func TestServerInvalidAddrsRejected(t *testing.T) { t.Run("msg too large", func(t *testing.T) { dialer := bhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptDisableTCP)) - an := newAutoNAT(t, dialer, allowAllAddrs) + an := newAutoNAT(t, dialer, allowPrivateAddrs) defer an.Close() defer an.host.Close() @@ -138,7 +138,7 @@ func TestServerDataRequest(t *testing.T) { // server will skip all tcp addresses dialer := bhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptDisableTCP)) // ask for dial data for quic address - an := newAutoNAT(t, dialer, allowAllAddrs, withDataRequestPolicy( + an := newAutoNAT(t, dialer, allowPrivateAddrs, withDataRequestPolicy( func(s network.Stream, dialAddr ma.Multiaddr) bool { if _, err := dialAddr.ValueForProtocol(ma.P_QUIC_V1); err == nil { return true @@ -150,7 +150,7 @@ func TestServerDataRequest(t *testing.T) { defer an.Close() defer an.host.Close() - c := newAutoNAT(t, nil, allowAllAddrs) + c := newAutoNAT(t, nil, allowPrivateAddrs) defer c.Close() defer c.host.Close() @@ -172,7 +172,6 @@ func TestServerDataRequest(t *testing.T) { require.NoError(t, err) require.Equal(t, Result{ - Idx: 0, Addr: quicAddr, Reachability: network.ReachabilityPublic, Status: pb.DialStatus_OK, @@ -185,11 +184,11 @@ func TestServerDataRequest(t *testing.T) { } func TestServerDial(t *testing.T) { - an := newAutoNAT(t, nil, WithServerRateLimit(10, 10, 10), allowAllAddrs) + an := newAutoNAT(t, nil, WithServerRateLimit(10, 10, 10), allowPrivateAddrs) defer an.Close() defer an.host.Close() - c := newAutoNAT(t, nil, allowAllAddrs) + c := newAutoNAT(t, nil, allowPrivateAddrs) defer c.Close() defer c.host.Close() @@ -203,7 +202,6 @@ func TestServerDial(t *testing.T) { append([]Request{{Addr: unreachableAddr, SendDialData: true}}, newTestRequests(hostAddrs, false)...)) require.NoError(t, err) require.Equal(t, Result{ - Idx: 0, Addr: unreachableAddr, Reachability: network.ReachabilityPrivate, Status: pb.DialStatus_E_DIAL_ERROR, @@ -214,7 +212,6 @@ func TestServerDial(t *testing.T) { res, err := c.GetReachability(context.Background(), newTestRequests(c.host.Addrs(), false)) require.NoError(t, err) require.Equal(t, Result{ - Idx: 0, Addr: hostAddrs[0], Reachability: network.ReachabilityPublic, Status: pb.DialStatus_OK, @@ -223,7 +220,6 @@ func TestServerDial(t *testing.T) { res, err := c.GetReachability(context.Background(), newTestRequests([]ma.Multiaddr{addr}, false)) require.NoError(t, err) require.Equal(t, Result{ - Idx: 0, Addr: addr, Reachability: network.ReachabilityPublic, Status: pb.DialStatus_OK, @@ -236,7 +232,6 @@ func TestServerDial(t *testing.T) { res, err := c.GetReachability(context.Background(), newTestRequests(c.host.Addrs(), false)) require.NoError(t, err) require.Equal(t, Result{ - Idx: 0, Addr: hostAddrs[0], Reachability: network.ReachabilityUnknown, Status: pb.DialStatus_E_DIAL_BACK_ERROR, @@ -327,7 +322,7 @@ func TestRateLimiterStress(t *testing.T) { } func FuzzServerDialRequest(f *testing.F) { - a := newAutoNAT(f, nil, allowAllAddrs, WithServerRateLimit(math.MaxInt32, math.MaxInt32, math.MaxInt32)) + a := newAutoNAT(f, nil, allowPrivateAddrs, WithServerRateLimit(math.MaxInt32, math.MaxInt32, math.MaxInt32)) c := newAutoNAT(f, nil) idAndWait(f, c, a) // reduce the streamTimeout before running this. TODO: fix this