Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move providing responsabilities from bitswap to blockservice #534

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ The following emojis are used to highlight certain changes:

### Added

- `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context.
- `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously.
- `gateway`: a new header configuration middleware has been added to replace the existing header configuration, which can be used more generically.
- `namesys` now has a `WithMaxCacheTTL` option, which allows you to define a maximum TTL that will be used for caching IPNS entries.
- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do.

### Changed

Expand All @@ -29,6 +29,7 @@ The following emojis are used to highlight certain changes:

- 🛠 `boxo/gateway`: when making a trustless CAR request with the "entity-bytes" parameter, using a negative index greater than the underlying entity length could trigger reading more data than intended
- 🛠 `boxo/gateway`: the header configuration `Config.Headers` and `AddAccessControlHeaders` has been replaced by the new middleware provided by `NewHeaders`.
- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.

### Security

Expand Down
12 changes: 2 additions & 10 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/internal/defaults"
"github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
Expand Down Expand Up @@ -45,9 +44,8 @@ type bitswap interface {
}

var (
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
HasBlockBufferSize = defaults.HasBlockBufferSize
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
)

type Bitswap struct {
Expand Down Expand Up @@ -85,10 +83,6 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
serverOptions = append(serverOptions, server.WithTracer(tracer))
}

if HasBlockBufferSize != defaults.HasBlockBufferSize {
serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize))
}

ctx = metrics.CtxSubScope(ctx, "bitswap")

bs.Server = server.New(ctx, net, bstore, serverOptions...)
Expand All @@ -115,7 +109,6 @@ type Stat struct {
MessagesReceived uint64
BlocksSent uint64
DataSent uint64
ProvideBufLen int
}

func (bs *Bitswap) Stat() (*Stat, error) {
Expand All @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) {
Peers: ss.Peers,
BlocksSent: ss.BlocksSent,
DataSent: ss.DataSent,
ProvideBufLen: ss.ProvideBufLen,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)}
bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
defer ig.Close()

Expand Down
4 changes: 4 additions & 0 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
if err != nil {
t.Fatal(err)
}
err = inst.Adapter.Provide(ctx, blk.Cid())
if err != nil {
t.Fatal(err)
}
}

func TestBasicSessions(t *testing.T) {
Expand Down
5 changes: 0 additions & 5 deletions bitswap/internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ const (
BitswapMaxOutstandingBytesPerPeer = 1 << 20
// the number of bytes we attempt to make each outgoing bitswap message
BitswapEngineTargetMessageSize = 16 * 1024
// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
// TODO: Does this need to be this large givent that?
HasBlockBufferSize = 256

// Maximum size of the wantlist we are willing to keep in memory.
MaxQueuedWantlistEntiresPerPeer = 1024
Expand Down
4 changes: 0 additions & 4 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func TaskWorkerCount(count int) Option {
return Option{server.TaskWorkerCount(count)}
}

func ProvideEnabled(enabled bool) Option {
return Option{server.ProvideEnabled(enabled)}
}

func SetSendDontHaves(send bool) Option {
return Option{server.SetSendDontHaves(send)}
}
Expand Down
166 changes: 8 additions & 158 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,15 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"
)

var provideKeysBufferSize = 2048

var (
log = logging.Logger("bitswap-server")
sflog = log.Desugar()
)

const provideWorkerMax = 6

type Option func(*Server)

type Server struct {
Expand All @@ -59,20 +54,8 @@ type Server struct {

process process.Process

// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan cid.Cid
// provideKeys directly feeds provide workers
provideKeys chan cid.Cid

// Extra options to pass to the decision manager
engineOptions []decision.Option

// the size of channel buffer to use
hasBlockBufferSize int
// whether or not to make provide announcements
provideEnabled bool
}

func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server {
Expand All @@ -87,16 +70,12 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl
}()

s := &Server{
sentHistogram: bmetrics.SentHist(ctx),
sendTimeHistogram: bmetrics.SendTimeHist(ctx),
taskWorkerCount: defaults.BitswapTaskWorkerCount,
network: network,
process: px,
provideEnabled: true,
hasBlockBufferSize: defaults.HasBlockBufferSize,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
sentHistogram: bmetrics.SentHist(ctx),
sendTimeHistogram: bmetrics.SendTimeHist(ctx),
taskWorkerCount: defaults.BitswapTaskWorkerCount,
network: network,
process: px,
}
s.newBlocks = make(chan cid.Cid, s.hasBlockBufferSize)

for _, o := range options {
o(s)
Expand Down Expand Up @@ -131,13 +110,6 @@ func WithTracer(tap tracer.Tracer) Option {
}
}

// ProvideEnabled is an option for enabling/disabling provide announcements
func ProvideEnabled(enabled bool) Option {
return func(bs *Server) {
bs.provideEnabled = enabled
}
}

func WithPeerBlockRequestFilter(pbrf decision.PeerBlockRequestFilter) Option {
o := decision.WithPeerBlockRequestFilter(pbrf)
return func(bs *Server) {
Expand Down Expand Up @@ -233,16 +205,6 @@ func MaxCidSize(n uint) Option {
}
}

// HasBlockBufferSize configure how big the new blocks buffer should be.
func HasBlockBufferSize(count int) Option {
if count < 0 {
panic("cannot have negative buffer size")
}
return func(bs *Server) {
bs.hasBlockBufferSize = count
}
}

// WantlistForPeer returns the currently understood list of blocks requested by a
// given peer.
func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid {
Expand All @@ -263,18 +225,6 @@ func (bs *Server) startWorkers(ctx context.Context, px process.Process) {
bs.taskWorker(ctx, i)
})
}

if bs.provideEnabled {
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})

// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
px.Go(bs.provideWorker)
}
}

func (bs *Server) taskWorker(ctx context.Context, id int) {
Expand Down Expand Up @@ -382,18 +332,16 @@ func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) {
}

type Stat struct {
Peers []string
ProvideBufLen int
BlocksSent uint64
DataSent uint64
Peers []string
BlocksSent uint64
DataSent uint64
}

// Stat returns aggregated statistics about bitswap operations
func (bs *Server) Stat() (Stat, error) {
bs.counterLk.Lock()
s := bs.counters
bs.counterLk.Unlock()
s.ProvideBufLen = len(bs.newBlocks)

peers := bs.engine.Peers()
peersStr := make([]string, len(peers))
Expand All @@ -420,107 +368,9 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
// Send wanted blocks to decision engine
bs.engine.NotifyNewBlocks(blks)

// If the reprovider is enabled, send block to reprovider
if bs.provideEnabled {
for _, blk := range blks {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
}

return nil
}

func (bs *Server) provideCollector(ctx context.Context) {
defer close(bs.provideKeys)
var toProvide []cid.Cid
var nextKey cid.Cid
var keysOut chan cid.Cid

for {
select {
case blkey, ok := <-bs.newBlocks:
if !ok {
log.Debug("newBlocks channel closed")
return
}

if keysOut == nil {
nextKey = blkey
keysOut = bs.provideKeys
} else {
toProvide = append(toProvide, blkey)
}
case keysOut <- nextKey:
if len(toProvide) > 0 {
nextKey = toProvide[0]
toProvide = toProvide[1:]
} else {
keysOut = nil
}
case <-ctx.Done():
return
}
}
}

func (bs *Server) provideWorker(px process.Process) {
// FIXME: OnClosingContext returns a _custom_ context type.
// Unfortunately, deriving a new cancelable context from this custom
// type fires off a goroutine. To work around this, we create a single
// cancelable context up-front and derive all sub-contexts from that.
//
// See: https://github.com/ipfs/go-ipfs/issues/5810
ctx := procctx.OnClosingContext(px)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

limit := make(chan struct{}, provideWorkerMax)

limitedGoProvide := func(k cid.Cid, wid int) {
defer func() {
// replace token when done
<-limit
}()

log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k)
defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k)

ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx
defer cancel()

if err := bs.network.Provide(ctx, k); err != nil {
log.Warn(err)
}
}

// worker spawner, reads from bs.provideKeys until it closes, spawning a
// _ratelimited_ number of workers to handle each key.
for wid := 2; ; wid++ {
log.Debug("Bitswap.ProvideWorker.Loop")

select {
case <-px.Closing():
return
case k, ok := <-bs.provideKeys:
if !ok {
log.Debug("provideKeys channel closed")
return
}
select {
case <-px.Closing():
return
case limit <- struct{}{}:
go limitedGoProvide(k, wid)
}
}
}
}

func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
Expand Down
Loading
Loading