Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: proximitytransport creation #106

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/proximitytransport/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newConn(ctx context.Context, t *proximityTransport, remoteMa ma.Multiaddr,
return nil, fmt.Errorf("invalid network direction")
}

connScope, err := t.rcmgr.OpenConnection(netdir, false, remoteMa)
connScope, err := t.swarm.ResourceManager().OpenConnection(netdir, false, remoteMa)
if err != nil {
return nil, fmt.Errorf("resource manager blocked connection : %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/proximitytransport/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func newListener(ctx context.Context, localMa ma.Multiaddr, t *proximityTranspor
// Starts the native driver.
// If it failed, don't return a error because no other transport
// on the libp2p node will be created.
t.driver.Start(t.host.ID().String())
t.driver.Start(t.swarm.LocalPeer().String())

return listener
}
Expand Down
43 changes: 30 additions & 13 deletions pkg/proximitytransport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"fmt"
"sync"

host "github.com/libp2p/go-libp2p/core/host"
network "github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
tpt "github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
"github.com/pkg/errors"
Expand Down Expand Up @@ -50,9 +50,8 @@ type ProximityTransport interface {
}

type proximityTransport struct {
host host.Host
swarm *swarm.Swarm
upgrader tpt.Upgrader
rcmgr network.ResourceManager

connMap map[string]*Conn
connMapMutex sync.RWMutex
Expand All @@ -64,7 +63,7 @@ type proximityTransport struct {
ctx context.Context
}

func NewTransport(ctx context.Context, l *zap.Logger, driver ProximityDriver) func(h host.Host, u tpt.Upgrader, rcmgr network.ResourceManager) (*proximityTransport, error) {
func NewTransport(ctx context.Context, l *zap.Logger, driver ProximityDriver) func(swarm *swarm.Swarm, u tpt.Upgrader) (*proximityTransport, error) {
if l == nil {
l = zap.NewNop()
}
Expand All @@ -76,12 +75,12 @@ func NewTransport(ctx context.Context, l *zap.Logger, driver ProximityDriver) fu
driver = &NoopProximityDriver{}
}

return func(h host.Host, u tpt.Upgrader, rcmgr network.ResourceManager) (*proximityTransport, error) {
l.Debug("remi: transport.go: new Transport")
return func(swarm *swarm.Swarm, u tpt.Upgrader) (*proximityTransport, error) {
l.Debug("NewTransport called", zap.String("driver", driver.ProtocolName()))
transport := &proximityTransport{
host: h,
swarm: swarm,
upgrader: u,
rcmgr: rcmgr,
connMap: make(map[string]*Conn),
cache: NewRingBufferMap(l, 128),
driver: driver,
Expand Down Expand Up @@ -141,7 +140,7 @@ func (t *proximityTransport) CanDial(remoteMa ma.Multiaddr) bool {
func (t *proximityTransport) Listen(localMa ma.Multiaddr) (tpt.Listener, error) {
// localAddr is supposed to be equal to the localPID
// or to DefaultAddr since multiaddr == /<protocol>/<peerID>
localPID := t.host.ID().String()
localPID := t.swarm.LocalPeer().String()
localAddr, err := localMa.ValueForProtocol(t.driver.ProtocolCode())
if err != nil || (localMa.String() != t.driver.DefaultAddr() && localAddr != localPID) {
return nil, errors.Wrap(err, "error: proximityTransport.Listen: wrong multiaddr")
Expand Down Expand Up @@ -246,7 +245,7 @@ func (t *proximityTransport) HandleFoundPeer(sRemotePID string) bool {
t.lock.RUnlock()

// Adds peer to peerstore.
t.host.Peerstore().AddAddr(remotePID, remoteMa,
t.swarm.Peerstore().AddAddr(remotePID, remoteMa,
pstore.TempAddrTTL)

// Delete previous cache if it exists
Expand All @@ -259,13 +258,13 @@ func (t *proximityTransport) HandleFoundPeer(sRemotePID string) bool {
// Needed to read and write during the connect handshake.
go func() {
// Need to use listener than t.listener here to not have to check valid value of t.listener
err := t.host.Connect(listener.ctx, peer.AddrInfo{
err := t.connect(listener.ctx, peer.AddrInfo{
ID: remotePID,
Addrs: []ma.Multiaddr{remoteMa},
})
if err != nil {
t.logger.Error("HandleFoundPeer: async connect error", zap.Error(err))
t.host.Peerstore().SetAddr(remotePID, remoteMa, -1)
t.swarm.Peerstore().SetAddr(remotePID, remoteMa, -1)
t.driver.CloseConnWithPeer(sRemotePID)
}
}()
Expand All @@ -287,6 +286,24 @@ func (t *proximityTransport) HandleFoundPeer(sRemotePID string) bool {
}
}

// Adapted from https://github.com/libp2p/go-libp2p/blob/v0.38.1/p2p/host/basic/basic_host.go#L795
func (t *proximityTransport) connect(ctx context.Context, pi peer.AddrInfo) error {
// absorb addresses into peerstore
t.swarm.Peerstore().AddAddrs(pi.ID, pi.Addrs, pstore.TempAddrTTL)

forceDirect, _ := network.GetForceDirectDial(ctx)
canUseLimitedConn, _ := network.GetAllowLimitedConn(ctx)
if !forceDirect {
connectedness := t.swarm.Connectedness(pi.ID)
if connectedness == network.Connected || (canUseLimitedConn && connectedness == network.Limited) {
return nil
}
}

_, err := t.swarm.DialPeer(ctx, pi.ID)
return err
}

// HandleLostPeer is called by the native driver when the connection with the peer is lost.
// Closes connections with the peer.
func (t *proximityTransport) HandleLostPeer(sRemotePID string) {
Expand All @@ -304,10 +321,10 @@ func (t *proximityTransport) HandleLostPeer(sRemotePID string) {
}

// Remove peer's address to peerstore.
t.host.Peerstore().SetAddr(remotePID, remoteMa, -1)
t.swarm.Peerstore().SetAddr(remotePID, remoteMa, -1)

// Close the peer connection
conns := t.host.Network().ConnsToPeer(remotePID)
conns := t.swarm.ConnsToPeer(remotePID)
for _, conn := range conns {
if conn.RemoteMultiaddr().Equal(remoteMa) {
conn.Close()
Expand Down
Loading