Skip to content

Commit

Permalink
httpnet: an Exchange network layer for http-retrieval
Browse files Browse the repository at this point in the history
This and subsequent commits introduce an httpnet module at what is known as
the "bitswap network layer". The bitswap network layer connects bitswap-peers,
sends bitswap messages and receives responses.

Bitswap messages are basically a wantlist, a list of CIDs that should be sent
if available.

httpnet does the same, except instead of sending the bitswap message over
bitswap, it triggers http requests for the requested blocks. httpnet is a
drop-in addon so that we can request blocks over http, and not only via bitswap.

As httpnet is a network, it benefits from all existing wantlist management
logic. Any http/2 endpoint should benefit from streamlined requests on a
single http connection. A router-network ensures that messages are correctly
handled by bitswap or by http requests depending on what the peers are
advertising. HTTP requests are given priority in the presence of both.

Here are some of the httpnet features:

* Peers are marked as Connected when they are able to handle http requets.
* Peers are marked as Disconnected when http requests fail repeatedly (MaxRetries).
* Server errors trigger backoffs preventing more requests to happen to the same
  url for a period (Retry-After header or configuration value)
* We support several urls per peer, meaning a peer can provide alternative
  http endpoints which are tried based on number of failures or existing cooldowns.
* We translate HAVE requests to HTTP-HEAD requests and BLOCK requests to HTTP-GETs
* We support cancellations: ongoing or soon to happen requests for a CID
  can be cancelled using a "cancel" entry in the wantlist.
* We record latency information for peers by pinging regularly.
* We discriminate between different errors so that we know whether to
  move to the next block in a wantlist, or to retry with a different url,
  or to completely abort.
* Options to configure user-agent, max retries etc. are supported.
  • Loading branch information
hsanjuan committed Jan 13, 2025
1 parent 6397847 commit 20171a9
Show file tree
Hide file tree
Showing 29 changed files with 2,330 additions and 105 deletions.
2 changes: 1 addition & 1 deletion bitswap/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"time"

"github.com/ipfs/boxo/bitswap"
bsnet "github.com/ipfs/boxo/bitswap/network"
bsnet "github.com/ipfs/boxo/bitswap/network/bsnet"
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
mockrouting "github.com/ipfs/boxo/routing/mock"
Expand Down
6 changes: 3 additions & 3 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package bitswap implements the IPFS exchange interface with the BitSwap
// Package client implements the IPFS exchange interface with the BitSwap
// bilateral exchange protocol.
package client

Expand Down Expand Up @@ -191,7 +191,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr

sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pm := bspm.New(ctx, peerQueueFactory)

if bs.providerFinder != nil && bs.defaultProviderQueryManager {
// network can do dialing.
Expand Down Expand Up @@ -232,7 +232,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr
return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
return bsspm.New(id, network)
}
notif := notifications.New()
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
Expand Down
5 changes: 1 addition & 4 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,17 @@ type PeerManager struct {
psLk sync.RWMutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}

self peer.ID
}

// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager {
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge()
wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge()
return &PeerManager{
peerQueues: make(map[peer.ID]PeerQueue),
pwm: newPeerWantManager(wantGauge, wantBlockGauge),
createPeerQueue: createPeerQueue,
ctx: ctx,
self: self,

sessions: make(map[uint64]Session),
peerSessions: make(map[peer.ID]map[uint64]struct{}),
Expand Down
27 changes: 13 additions & 14 deletions bitswap/client/internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func TestAddingAndRemovingPeers(t *testing.T) {
peerQueueFactory := makePeerQueueFactory(msgs)

tp := random.Peers(6)
self, peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4], tp[5]
peerManager := New(ctx, peerQueueFactory, self)
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
peerManager := New(ctx, peerQueueFactory)

peerManager.Connected(peer1)
peerManager.Connected(peer2)
Expand Down Expand Up @@ -128,8 +128,8 @@ func TestBroadcastOnConnect(t *testing.T) {
msgs := make(chan msg, 16)
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(2)
self, peer1 := tp[0], tp[1]
peerManager := New(ctx, peerQueueFactory, self)
peer1 := tp[0]
peerManager := New(ctx, peerQueueFactory)

cids := random.Cids(2)
peerManager.BroadcastWantHaves(ctx, cids)
Expand All @@ -149,8 +149,8 @@ func TestBroadcastWantHaves(t *testing.T) {
msgs := make(chan msg, 16)
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
self, peer1, peer2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)
peer1, peer2 := tp[0], tp[1]
peerManager := New(ctx, peerQueueFactory)

cids := random.Cids(3)

Expand Down Expand Up @@ -190,8 +190,8 @@ func TestSendWants(t *testing.T) {
msgs := make(chan msg, 16)
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(2)
self, peer1 := tp[0], tp[1]
peerManager := New(ctx, peerQueueFactory, self)
peer1 := tp[0]
peerManager := New(ctx, peerQueueFactory)
cids := random.Cids(4)

peerManager.Connected(peer1)
Expand Down Expand Up @@ -224,8 +224,8 @@ func TestSendCancels(t *testing.T) {
msgs := make(chan msg, 16)
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
self, peer1, peer2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)
peer1, peer2 := tp[0], tp[1]
peerManager := New(ctx, peerQueueFactory)
cids := random.Cids(4)

// Connect to peer1 and peer2
Expand Down Expand Up @@ -285,8 +285,8 @@ func TestSessionRegistration(t *testing.T) {
peerQueueFactory := makePeerQueueFactory(msgs)

tp := random.Peers(3)
self, p1, p2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)
p1, p2 := tp[0], tp[1]
peerManager := New(ctx, peerQueueFactory)

id := uint64(1)
s := newSess(id)
Expand Down Expand Up @@ -344,9 +344,8 @@ func BenchmarkPeerManager(b *testing.B) {
return &benchPeerQueue{}
}

self := random.Peers(1)[0]
peers := random.Peers(500)
peerManager := New(ctx, peerQueueFactory, self)
peerManager := New(ctx, peerQueueFactory)

// Create a bunch of connections
connected := 0
Expand Down
14 changes: 14 additions & 0 deletions bitswap/network/bsnet/bsnet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package bsnet

import "github.com/ipfs/boxo/bitswap/network/bsnet/internal"

var (
// ProtocolBitswapNoVers is equivalent to the legacy bitswap protocol
ProtocolBitswapNoVers = internal.ProtocolBitswapNoVers
// ProtocolBitswapOneZero is the prefix for the legacy bitswap protocol
ProtocolBitswapOneZero = internal.ProtocolBitswapOneZero
// ProtocolBitswapOneOne is the prefix for version 1.1.0
ProtocolBitswapOneOne = internal.ProtocolBitswapOneOne
// ProtocolBitswap is the current version of the bitswap protocol: 1.2.0
ProtocolBitswap = internal.ProtocolBitswap
)
File renamed without changes.
60 changes: 42 additions & 18 deletions bitswap/network/ipfs_impl.go → bitswap/network/bsnet/ipfs_impl.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package network
package bsnet

import (
"context"
Expand All @@ -9,10 +9,10 @@ import (
"time"

bsmsg "github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/network/internal"
iface "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/network/bsnet/internal"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -23,7 +23,7 @@ import (
"github.com/multiformats/go-multistream"
)

var log = logging.Logger("bitswap/network")
var log = logging.Logger("bitswap/bsnet")

var (
maxSendTimeout = 2 * time.Minute
Expand All @@ -33,7 +33,7 @@ var (
)

// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host.
func NewFromIpfsHost(host host.Host, opts ...NetOpt) BitSwapNetwork {
func NewFromIpfsHost(host host.Host, opts ...NetOpt) iface.BitSwapNetwork {
s := processSettings(opts...)

bitswapNetwork := impl{
Expand Down Expand Up @@ -66,10 +66,10 @@ func processSettings(opts ...NetOpt) Settings {
type impl struct {
// NOTE: Stats must be at the top of the heap allocation to ensure 64bit
// alignment.
stats Stats
stats iface.Stats

host host.Host
connectEvtMgr *connectEventManager
connectEvtMgr *iface.ConnectEventManager

protocolBitswapNoVers protocol.ID
protocolBitswapOneZero protocol.ID
Expand All @@ -79,7 +79,7 @@ type impl struct {
supportedProtocols []protocol.ID

// inbound messages from the network are forwarded to the receiver
receivers []Receiver
receivers []iface.Receiver
}

// interfaceWrapper is concrete type that wraps an interface. Necessary because
Expand Down Expand Up @@ -109,7 +109,7 @@ type streamMessageSender struct {
to peer.ID
stream atomicInterface[network.Stream]
bsnet *impl
opts *MessageSenderOpts
opts *iface.MessageSenderOpts
}

type HasContext interface {
Expand Down Expand Up @@ -317,7 +317,7 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.
return nil
}

func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) {
func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *iface.MessageSenderOpts) (iface.MessageSender, error) {
opts = setDefaultOpts(opts)

sender := &streamMessageSender{
Expand All @@ -337,7 +337,7 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag
return sender, nil
}

func setDefaultOpts(opts *MessageSenderOpts) *MessageSenderOpts {
func setDefaultOpts(opts *iface.MessageSenderOpts) *iface.MessageSenderOpts {
copy := *opts
if opts.MaxRetries == 0 {
copy.MaxRetries = 3
Expand Down Expand Up @@ -385,14 +385,14 @@ func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stre
return bsnet.host.NewStream(ctx, p, bsnet.supportedProtocols...)
}

func (bsnet *impl) Start(r ...Receiver) {
func (bsnet *impl) Start(r ...iface.Receiver) {
bsnet.receivers = r
{
connectionListeners := make([]ConnectionListener, len(r))
connectionListeners := make([]iface.ConnectionListener, len(r))
for i, v := range r {
connectionListeners[i] = v
}
bsnet.connectEvtMgr = newConnectEventManager(connectionListeners...)
bsnet.connectEvtMgr = iface.NewConnectEventManager(connectionListeners...)
}
for _, proto := range bsnet.supportedProtocols {
bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream)
Expand Down Expand Up @@ -451,12 +451,36 @@ func (bsnet *impl) handleNewStream(s network.Stream) {
}
}

func (bsnet *impl) ConnectionManager() connmgr.ConnManager {
return bsnet.host.ConnManager()
func (bsnet *impl) TagPeer(p peer.ID, tag string, w int) {
if bsnet.host == nil {
return
}
bsnet.host.ConnManager().TagPeer(p, tag, w)
}

func (bsnet *impl) UntagPeer(p peer.ID, tag string) {
if bsnet.host == nil {
return
}
bsnet.host.ConnManager().UntagPeer(p, tag)
}

func (bsnet *impl) Protect(p peer.ID, tag string) {
if bsnet.host == nil {
return
}
bsnet.host.ConnManager().Protect(p, tag)
}

func (bsnet *impl) Unprotect(p peer.ID, tag string) bool {
if bsnet.host == nil {
return false
}
return bsnet.host.ConnManager().Unprotect(p, tag)
}

func (bsnet *impl) Stats() Stats {
return Stats{
func (bsnet *impl) Stats() iface.Stats {
return iface.Stats{
MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd),
MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package network_test
package bsnet_test

import (
"context"
Expand All @@ -10,13 +10,14 @@ import (

bsmsg "github.com/ipfs/boxo/bitswap/message"
pb "github.com/ipfs/boxo/bitswap/message/pb"
bsnet "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/network/internal"
network "github.com/ipfs/boxo/bitswap/network"
bsnet "github.com/ipfs/boxo/bitswap/network/bsnet"
"github.com/ipfs/boxo/bitswap/network/bsnet/internal"
tn "github.com/ipfs/boxo/bitswap/testnet"
"github.com/ipfs/go-test/random"
tnet "github.com/libp2p/go-libp2p-testing/net"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
p2pnet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
Expand All @@ -30,7 +31,7 @@ type receiver struct {
connectionEvent chan bool
lastMessage bsmsg.BitSwapMessage
lastSender peer.ID
listener network.Notifiee
listener p2pnet.Notifiee
}

func newReceiver() *receiver {
Expand Down Expand Up @@ -71,7 +72,7 @@ func (r *receiver) PeerDisconnected(p peer.ID) {
var errMockNetErr = errors.New("network err")

type ErrStream struct {
network.Stream
p2pnet.Stream
lk sync.Mutex
err error
timingOut bool
Expand Down Expand Up @@ -120,7 +121,7 @@ func (eh *ErrHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
return eh.Host.Connect(ctx, pi)
}

func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error) {
func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (p2pnet.Stream, error) {
eh.lk.Lock()
defer eh.lk.Unlock()

Expand Down Expand Up @@ -268,7 +269,7 @@ func TestMessageSendAndReceive(t *testing.T) {
}
}

func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *receiver, p2 tnet.Identity, r2 *receiver) (*ErrHost, bsnet.BitSwapNetwork, *ErrHost, bsnet.BitSwapNetwork, bsmsg.BitSwapMessage) {
func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *receiver, p2 tnet.Identity, r2 *receiver) (*ErrHost, network.BitSwapNetwork, *ErrHost, network.BitSwapNetwork, bsmsg.BitSwapMessage) {
// create network
mn := mocknet.New()
defer mn.Close()
Expand Down Expand Up @@ -337,7 +338,7 @@ func TestMessageResendAfterError(t *testing.T) {
eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)

testSendErrorBackoff := 100 * time.Millisecond
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{
MaxRetries: 3,
SendTimeout: 100 * time.Millisecond,
SendErrorBackoff: testSendErrorBackoff,
Expand Down Expand Up @@ -382,7 +383,7 @@ func TestMessageSendTimeout(t *testing.T) {

eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)

ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{
MaxRetries: 3,
SendTimeout: 100 * time.Millisecond,
SendErrorBackoff: 100 * time.Millisecond,
Expand Down Expand Up @@ -424,7 +425,7 @@ func TestMessageSendNotSupportedResponse(t *testing.T) {
eh, bsnet1, _, _, _ := prepareNetwork(t, ctx, p1, r1, p2, r2)

eh.setError(multistream.ErrNotSupported[protocol.ID]{})
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{
MaxRetries: 3,
SendTimeout: 100 * time.Millisecond,
SendErrorBackoff: 100 * time.Millisecond,
Expand Down Expand Up @@ -482,7 +483,7 @@ func TestSupportsHave(t *testing.T) {
t.Fatal(err)
}

senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{})
senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -532,7 +533,7 @@ func testNetworkCounters(t *testing.T, n1 int, n2 int) {
}

if n2 > 0 {
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{})
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{})
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 20171a9

Please sign in to comment.