Skip to content

Commit

Permalink
httpnet: an Exchange network layer for http-retrieval
Browse files Browse the repository at this point in the history
This and subsequent commits introduce an httpnet module at what is known as
the "bitswap network layer". The bitswap network layer connects bitswap-peers,
sends bitswap messages and receives responses.

Bitswap messages are basically a wantlist, a list of CIDs that should be sent
if available.

httpnet does the same, except instead of sending the bitswap message over
bitswap, it triggers http requests for the requested blocks. httpnet is a
drop-in addon so that we can request blocks over http, and not only via bitswap.

As httpnet is a network, it benefits from all existing wantlist management
logic. Any http/2 endpoint should benefit from streamlined requests on a
single http connection. A router-network ensures that messages are correctly
handled by bitswap or by http requests depending on what the peers are
advertising. HTTP requests are given priority in the presence of both.

Here are some of the httpnet features:

* Peers are marked as Connected when they are able to handle http requets.
* Peers are marked as Disconnected when http requests fail repeatedly (MaxRetries).
* Server errors trigger backoffs preventing more requests to happen to the same
  url for a period (Retry-After header or configuration value)
* We support several urls per peer, meaning a peer can provide alternative
  http endpoints which are tried based on number of failures or existing cooldowns.
* We translate HAVE requests to HTTP-HEAD requests and BLOCK requests to HTTP-GETs
* We support cancellations: ongoing or soon to happen requests for a CID
  can be cancelled using a "cancel" entry in the wantlist.
* We record latency information for peers by pinging regularly.
* We discriminate between different errors so that we know whether to
  move to the next block in a wantlist, or to retry with a different url,
  or to completely abort.
* Options to configure user-agent, max retries etc. are supported.
  • Loading branch information
hsanjuan committed Jan 13, 2025
1 parent 6397847 commit 08e6e2e
Show file tree
Hide file tree
Showing 26 changed files with 2,301 additions and 75 deletions.
6 changes: 3 additions & 3 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package bitswap implements the IPFS exchange interface with the BitSwap
// Package client implements the IPFS exchange interface with the BitSwap
// bilateral exchange protocol.
package client

Expand Down Expand Up @@ -191,7 +191,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr

sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pm := bspm.New(ctx, peerQueueFactory)

if bs.providerFinder != nil && bs.defaultProviderQueryManager {
// network can do dialing.
Expand Down Expand Up @@ -232,7 +232,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr
return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
return bsspm.New(id, network)
}
notif := notifications.New()
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
Expand Down
5 changes: 1 addition & 4 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,17 @@ type PeerManager struct {
psLk sync.RWMutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}

self peer.ID
}

// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager {
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge()
wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge()
return &PeerManager{
peerQueues: make(map[peer.ID]PeerQueue),
pwm: newPeerWantManager(wantGauge, wantBlockGauge),
createPeerQueue: createPeerQueue,
ctx: ctx,
self: self,

sessions: make(map[uint64]Session),
peerSessions: make(map[peer.ID]map[uint64]struct{}),
Expand Down
14 changes: 14 additions & 0 deletions bitswap/network/bsnet/bsnet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package bsnet

import "github.com/ipfs/boxo/bitswap/network/bsnet/internal"

var (
// ProtocolBitswapNoVers is equivalent to the legacy bitswap protocol
ProtocolBitswapNoVers = internal.ProtocolBitswapNoVers
// ProtocolBitswapOneZero is the prefix for the legacy bitswap protocol
ProtocolBitswapOneZero = internal.ProtocolBitswapOneZero
// ProtocolBitswapOneOne is the prefix for version 1.1.0
ProtocolBitswapOneOne = internal.ProtocolBitswapOneOne
// ProtocolBitswap is the current version of the bitswap protocol: 1.2.0
ProtocolBitswap = internal.ProtocolBitswap
)
File renamed without changes.
61 changes: 43 additions & 18 deletions bitswap/network/ipfs_impl.go → bitswap/network/bsnet/ipfs_impl.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package network
package bsnet

import (
"context"
Expand All @@ -9,10 +9,10 @@ import (
"time"

bsmsg "github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/network/internal"
iface "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/network/bsnet/internal"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -23,7 +23,7 @@ import (
"github.com/multiformats/go-multistream"
)

var log = logging.Logger("bitswap/network")
var log = logging.Logger("bitswap/bsnet")

var (
maxSendTimeout = 2 * time.Minute
Expand All @@ -33,7 +33,7 @@ var (
)

// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host.
func NewFromIpfsHost(host host.Host, opts ...NetOpt) BitSwapNetwork {
func NewFromIpfsHost(host host.Host, opts ...NetOpt) iface.BitSwapNetwork {
s := processSettings(opts...)

bitswapNetwork := impl{
Expand Down Expand Up @@ -66,10 +66,10 @@ func processSettings(opts ...NetOpt) Settings {
type impl struct {
// NOTE: Stats must be at the top of the heap allocation to ensure 64bit
// alignment.
stats Stats
stats iface.Stats

host host.Host
connectEvtMgr *connectEventManager
connectEvtMgr *iface.ConnectEventManager

protocolBitswapNoVers protocol.ID
protocolBitswapOneZero protocol.ID
Expand All @@ -79,7 +79,7 @@ type impl struct {
supportedProtocols []protocol.ID

// inbound messages from the network are forwarded to the receiver
receivers []Receiver
receivers []iface.Receiver
}

// interfaceWrapper is concrete type that wraps an interface. Necessary because
Expand Down Expand Up @@ -108,8 +108,9 @@ func (a *atomicInterface[T]) Store(v T) {
type streamMessageSender struct {
to peer.ID
stream atomicInterface[network.Stream]

Check failure on line 110 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go next)

other declaration of stream

Check failure on line 110 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go next)

other declaration of stream

Check failure on line 110 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

other declaration of stream

Check failure on line 110 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

other declaration of stream
stream network.Stream

Check failure on line 111 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go next)

stream redeclared

Check failure on line 111 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go next)

stream redeclared

Check failure on line 111 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

stream redeclared

Check failure on line 111 in bitswap/network/bsnet/ipfs_impl.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

stream redeclared
bsnet *impl
opts *MessageSenderOpts
opts *iface.MessageSenderOpts
}

type HasContext interface {
Expand Down Expand Up @@ -317,7 +318,7 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.
return nil
}

func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) {
func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *iface.MessageSenderOpts) (iface.MessageSender, error) {
opts = setDefaultOpts(opts)

sender := &streamMessageSender{
Expand All @@ -337,7 +338,7 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag
return sender, nil
}

func setDefaultOpts(opts *MessageSenderOpts) *MessageSenderOpts {
func setDefaultOpts(opts *iface.MessageSenderOpts) *iface.MessageSenderOpts {
copy := *opts
if opts.MaxRetries == 0 {
copy.MaxRetries = 3
Expand Down Expand Up @@ -385,14 +386,14 @@ func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stre
return bsnet.host.NewStream(ctx, p, bsnet.supportedProtocols...)
}

func (bsnet *impl) Start(r ...Receiver) {
func (bsnet *impl) Start(r ...iface.Receiver) {
bsnet.receivers = r
{
connectionListeners := make([]ConnectionListener, len(r))
connectionListeners := make([]iface.ConnectionListener, len(r))
for i, v := range r {
connectionListeners[i] = v
}
bsnet.connectEvtMgr = newConnectEventManager(connectionListeners...)
bsnet.connectEvtMgr = iface.NewConnectEventManager(connectionListeners...)
}
for _, proto := range bsnet.supportedProtocols {
bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream)
Expand Down Expand Up @@ -451,12 +452,36 @@ func (bsnet *impl) handleNewStream(s network.Stream) {
}
}

func (bsnet *impl) ConnectionManager() connmgr.ConnManager {
return bsnet.host.ConnManager()
func (bsnet *impl) TagPeer(p peer.ID, tag string, w int) {
if bsnet.host == nil {
return
}
bsnet.host.ConnManager().TagPeer(p, tag, w)
}

func (bsnet *impl) UntagPeer(p peer.ID, tag string) {
if bsnet.host == nil {
return
}
bsnet.host.ConnManager().UntagPeer(p, tag)
}

func (bsnet *impl) Protect(p peer.ID, tag string) {
if bsnet.host == nil {
return
}
bsnet.host.ConnManager().Protect(p, tag)
}

func (bsnet *impl) Unprotect(p peer.ID, tag string) bool {
if bsnet.host == nil {
return false
}
return bsnet.host.ConnManager().Unprotect(p, tag)
}

func (bsnet *impl) Stats() Stats {
return Stats{
func (bsnet *impl) Stats() iface.Stats {
return iface.Stats{
MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd),
MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package network_test
package bsnet_test

import (
"context"
Expand All @@ -11,7 +11,7 @@ import (
bsmsg "github.com/ipfs/boxo/bitswap/message"
pb "github.com/ipfs/boxo/bitswap/message/pb"
bsnet "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/network/internal"
"github.com/ipfs/boxo/bitswap/network/bsnet/internal"
tn "github.com/ipfs/boxo/bitswap/testnet"
"github.com/ipfs/go-test/random"
tnet "github.com/libp2p/go-libp2p-testing/net"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package network
package bsnet

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package network
package bsnet

import "github.com/libp2p/go-libp2p/core/protocol"

Expand Down
29 changes: 16 additions & 13 deletions bitswap/network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"sync"

"github.com/gammazero/deque"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/peer"
)

var log = logging.Logger("bitswap/connevtman")

type ConnectionListener interface {
PeerConnected(peer.ID)
PeerDisconnected(peer.ID)
Expand All @@ -20,7 +23,7 @@ const (
stateUnresponsive
)

type connectEventManager struct {
type ConnectEventManager struct {
connListeners []ConnectionListener
lk sync.RWMutex
cond sync.Cond
Expand All @@ -36,8 +39,8 @@ type peerState struct {
pending bool
}

func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager {
evtManager := &connectEventManager{
func NewConnectEventManager(connListeners ...ConnectionListener) *ConnectEventManager {
evtManager := &ConnectEventManager{
connListeners: connListeners,
peers: make(map[peer.ID]*peerState),
done: make(chan struct{}),
Expand All @@ -46,11 +49,11 @@ func newConnectEventManager(connListeners ...ConnectionListener) *connectEventMa
return evtManager
}

func (c *connectEventManager) Start() {
func (c *ConnectEventManager) Start() {
go c.worker()
}

func (c *connectEventManager) Stop() {
func (c *ConnectEventManager) Stop() {
c.lk.Lock()
c.stop = true
c.lk.Unlock()
Expand All @@ -59,15 +62,15 @@ func (c *connectEventManager) Stop() {
<-c.done
}

func (c *connectEventManager) getState(p peer.ID) state {
func (c *ConnectEventManager) getState(p peer.ID) state {
if state, ok := c.peers[p]; ok {
return state.newState
} else {
return stateDisconnected
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) {
func (c *ConnectEventManager) setState(p peer.ID, newState state) {
state, ok := c.peers[p]
if !ok {
state = new(peerState)
Expand All @@ -83,14 +86,14 @@ func (c *connectEventManager) setState(p peer.ID, newState state) {

// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
// connect event manager has been stopped.
func (c *connectEventManager) waitChange() bool {
func (c *ConnectEventManager) waitChange() bool {
for !c.stop && c.changeQueue.Len() == 0 {
c.cond.Wait()
}
return !c.stop
}

func (c *connectEventManager) worker() {
func (c *ConnectEventManager) worker() {
c.lk.Lock()
defer c.lk.Unlock()
defer close(c.done)
Expand Down Expand Up @@ -145,7 +148,7 @@ func (c *connectEventManager) worker() {
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
func (c *ConnectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

Expand All @@ -158,7 +161,7 @@ func (c *connectEventManager) Connected(p peer.ID) {
}

// Called when we drop the final connection to a peer.
func (c *connectEventManager) Disconnected(p peer.ID) {
func (c *ConnectEventManager) Disconnected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

Expand All @@ -172,7 +175,7 @@ func (c *connectEventManager) Disconnected(p peer.ID) {
}

// Called whenever a peer is unresponsive.
func (c *connectEventManager) MarkUnresponsive(p peer.ID) {
func (c *ConnectEventManager) MarkUnresponsive(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

Expand All @@ -191,7 +194,7 @@ func (c *connectEventManager) MarkUnresponsive(p peer.ID) {
// - When not connected, we ignore this call. Unfortunately, a peer may disconnect before we process
//
// the "on message" event, so we can't treat this as evidence of a connection.
func (c *connectEventManager) OnMessage(p peer.ID) {
func (c *ConnectEventManager) OnMessage(p peer.ID) {
c.lk.RLock()
unresponsive := c.getState(p) == stateUnresponsive
c.lk.RUnlock()
Expand Down
Loading

0 comments on commit 08e6e2e

Please sign in to comment.