Skip to content

Commit

Permalink
fix: poximity transport creation (#106)
Browse files Browse the repository at this point in the history
Signed-off-by: D4ryl00 <[email protected]>
  • Loading branch information
D4ryl00 authored Jan 20, 2025
1 parent b0c9131 commit 4d1f5c3
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/proximitytransport/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newConn(ctx context.Context, t *proximityTransport, remoteMa ma.Multiaddr,
return nil, fmt.Errorf("invalid network direction")
}

connScope, err := t.rcmgr.OpenConnection(netdir, false, remoteMa)
connScope, err := t.swarm.ResourceManager().OpenConnection(netdir, false, remoteMa)
if err != nil {
return nil, fmt.Errorf("resource manager blocked connection : %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/proximitytransport/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func newListener(ctx context.Context, localMa ma.Multiaddr, t *proximityTranspor
// Starts the native driver.
// If it failed, don't return a error because no other transport
// on the libp2p node will be created.
t.driver.Start(t.host.ID().String())
t.driver.Start(t.swarm.LocalPeer().String())

return listener
}
Expand Down
43 changes: 30 additions & 13 deletions pkg/proximitytransport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"fmt"
"sync"

host "github.com/libp2p/go-libp2p/core/host"
network "github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
tpt "github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
"github.com/pkg/errors"
Expand Down Expand Up @@ -50,9 +50,8 @@ type ProximityTransport interface {
}

type proximityTransport struct {
host host.Host
swarm *swarm.Swarm
upgrader tpt.Upgrader
rcmgr network.ResourceManager

connMap map[string]*Conn
connMapMutex sync.RWMutex
Expand All @@ -64,7 +63,7 @@ type proximityTransport struct {
ctx context.Context
}

func NewTransport(ctx context.Context, l *zap.Logger, driver ProximityDriver) func(h host.Host, u tpt.Upgrader, rcmgr network.ResourceManager) (*proximityTransport, error) {
func NewTransport(ctx context.Context, l *zap.Logger, driver ProximityDriver) func(swarm *swarm.Swarm, u tpt.Upgrader) (*proximityTransport, error) {
if l == nil {
l = zap.NewNop()
}
Expand All @@ -76,12 +75,12 @@ func NewTransport(ctx context.Context, l *zap.Logger, driver ProximityDriver) fu
driver = &NoopProximityDriver{}
}

return func(h host.Host, u tpt.Upgrader, rcmgr network.ResourceManager) (*proximityTransport, error) {
l.Debug("remi: transport.go: new Transport")
return func(swarm *swarm.Swarm, u tpt.Upgrader) (*proximityTransport, error) {
l.Debug("NewTransport called", zap.String("driver", driver.ProtocolName()))
transport := &proximityTransport{
host: h,
swarm: swarm,
upgrader: u,
rcmgr: rcmgr,
connMap: make(map[string]*Conn),
cache: NewRingBufferMap(l, 128),
driver: driver,
Expand Down Expand Up @@ -141,7 +140,7 @@ func (t *proximityTransport) CanDial(remoteMa ma.Multiaddr) bool {
func (t *proximityTransport) Listen(localMa ma.Multiaddr) (tpt.Listener, error) {
// localAddr is supposed to be equal to the localPID
// or to DefaultAddr since multiaddr == /<protocol>/<peerID>
localPID := t.host.ID().String()
localPID := t.swarm.LocalPeer().String()
localAddr, err := localMa.ValueForProtocol(t.driver.ProtocolCode())
if err != nil || (localMa.String() != t.driver.DefaultAddr() && localAddr != localPID) {
return nil, errors.Wrap(err, "error: proximityTransport.Listen: wrong multiaddr")
Expand Down Expand Up @@ -246,7 +245,7 @@ func (t *proximityTransport) HandleFoundPeer(sRemotePID string) bool {
t.lock.RUnlock()

// Adds peer to peerstore.
t.host.Peerstore().AddAddr(remotePID, remoteMa,
t.swarm.Peerstore().AddAddr(remotePID, remoteMa,
pstore.TempAddrTTL)

// Delete previous cache if it exists
Expand All @@ -259,13 +258,13 @@ func (t *proximityTransport) HandleFoundPeer(sRemotePID string) bool {
// Needed to read and write during the connect handshake.
go func() {
// Need to use listener than t.listener here to not have to check valid value of t.listener
err := t.host.Connect(listener.ctx, peer.AddrInfo{
err := t.connect(listener.ctx, peer.AddrInfo{
ID: remotePID,
Addrs: []ma.Multiaddr{remoteMa},
})
if err != nil {
t.logger.Error("HandleFoundPeer: async connect error", zap.Error(err))
t.host.Peerstore().SetAddr(remotePID, remoteMa, -1)
t.swarm.Peerstore().SetAddr(remotePID, remoteMa, -1)
t.driver.CloseConnWithPeer(sRemotePID)
}
}()
Expand All @@ -287,6 +286,24 @@ func (t *proximityTransport) HandleFoundPeer(sRemotePID string) bool {
}
}

// Adapted from https://github.com/libp2p/go-libp2p/blob/v0.38.1/p2p/host/basic/basic_host.go#L795
func (t *proximityTransport) connect(ctx context.Context, pi peer.AddrInfo) error {
// absorb addresses into peerstore
t.swarm.Peerstore().AddAddrs(pi.ID, pi.Addrs, pstore.TempAddrTTL)

forceDirect, _ := network.GetForceDirectDial(ctx)
canUseLimitedConn, _ := network.GetAllowLimitedConn(ctx)
if !forceDirect {
connectedness := t.swarm.Connectedness(pi.ID)
if connectedness == network.Connected || (canUseLimitedConn && connectedness == network.Limited) {
return nil
}
}

_, err := t.swarm.DialPeer(ctx, pi.ID)
return err
}

// HandleLostPeer is called by the native driver when the connection with the peer is lost.
// Closes connections with the peer.
func (t *proximityTransport) HandleLostPeer(sRemotePID string) {
Expand All @@ -304,10 +321,10 @@ func (t *proximityTransport) HandleLostPeer(sRemotePID string) {
}

// Remove peer's address to peerstore.
t.host.Peerstore().SetAddr(remotePID, remoteMa, -1)
t.swarm.Peerstore().SetAddr(remotePID, remoteMa, -1)

// Close the peer connection
conns := t.host.Network().ConnsToPeer(remotePID)
conns := t.swarm.ConnsToPeer(remotePID)
for _, conn := range conns {
if conn.RemoteMultiaddr().Equal(remoteMa) {
conn.Close()
Expand Down

0 comments on commit 4d1f5c3

Please sign in to comment.