diff --git a/CHANGELOG.md b/CHANGELOG.md index d0a64b4e7..ad9097666 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,10 @@ The following emojis are used to highlight certain changes: ### Added -- `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context. - `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously. - `gateway`: a new header configuration middleware has been added to replace the existing header configuration, which can be used more generically. - `namesys` now has a `WithMaxCacheTTL` option, which allows you to define a maximum TTL that will be used for caching IPNS entries. +- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do. ### Changed @@ -29,6 +29,7 @@ The following emojis are used to highlight certain changes: - 🛠 `boxo/gateway`: when making a trustless CAR request with the "entity-bytes" parameter, using a negative index greater than the underlying entity length could trigger reading more data than intended - 🛠 `boxo/gateway`: the header configuration `Config.Headers` and `AddAccessControlHeaders` has been replaced by the new middleware provided by `NewHeaders`. +- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany. ### Security diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 393ab96ad..90c8690b7 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/ipfs/boxo/bitswap/client" - "github.com/ipfs/boxo/bitswap/internal/defaults" "github.com/ipfs/boxo/bitswap/message" "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/bitswap/server" @@ -45,9 +44,8 @@ type bitswap interface { } var ( - _ exchange.SessionExchange = (*Bitswap)(nil) - _ bitswap = (*Bitswap)(nil) - HasBlockBufferSize = defaults.HasBlockBufferSize + _ exchange.SessionExchange = (*Bitswap)(nil) + _ bitswap = (*Bitswap)(nil) ) type Bitswap struct { @@ -85,10 +83,6 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc serverOptions = append(serverOptions, server.WithTracer(tracer)) } - if HasBlockBufferSize != defaults.HasBlockBufferSize { - serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize)) - } - ctx = metrics.CtxSubScope(ctx, "bitswap") bs.Server = server.New(ctx, net, bstore, serverOptions...) @@ -115,7 +109,6 @@ type Stat struct { MessagesReceived uint64 BlocksSent uint64 DataSent uint64 - ProvideBufLen int } func (bs *Bitswap) Stat() (*Stat, error) { @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) { Peers: ss.Peers, BlocksSent: ss.BlocksSent, DataSent: ss.DataSent, - ProvideBufLen: ss.ProvideBufLen, }, nil } diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index 505871d6e..7d2b1f924 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -120,7 +120,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)} + bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)} ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) defer ig.Close() diff --git a/bitswap/client/bitswap_with_sessions_test.go b/bitswap/client/bitswap_with_sessions_test.go index 0baede658..a3174d0a4 100644 --- a/bitswap/client/bitswap_with_sessions_test.go +++ b/bitswap/client/bitswap_with_sessions_test.go @@ -37,6 +37,10 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk if err != nil { t.Fatal(err) } + err = inst.Adapter.Provide(ctx, blk.Cid()) + if err != nil { + t.Fatal(err) + } } func TestBasicSessions(t *testing.T) { diff --git a/bitswap/internal/defaults/defaults.go b/bitswap/internal/defaults/defaults.go index f5511cc7a..a06f5c8b9 100644 --- a/bitswap/internal/defaults/defaults.go +++ b/bitswap/internal/defaults/defaults.go @@ -20,11 +20,6 @@ const ( BitswapMaxOutstandingBytesPerPeer = 1 << 20 // the number of bytes we attempt to make each outgoing bitswap message BitswapEngineTargetMessageSize = 16 * 1024 - // HasBlockBufferSize is the buffer size of the channel for new blocks - // that need to be provided. They should get pulled over by the - // provideCollector even before they are actually provided. - // TODO: Does this need to be this large givent that? - HasBlockBufferSize = 256 // Maximum size of the wantlist we are willing to keep in memory. MaxQueuedWantlistEntiresPerPeer = 1024 diff --git a/bitswap/options.go b/bitswap/options.go index da759dfe2..9bea0b637 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -43,10 +43,6 @@ func TaskWorkerCount(count int) Option { return Option{server.TaskWorkerCount(count)} } -func ProvideEnabled(enabled bool) Option { - return Option{server.ProvideEnabled(enabled)} -} - func SetSendDontHaves(send bool) Option { return Option{server.SetSendDontHaves(send)} } diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 7feffd093..913fb83fb 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -21,20 +21,15 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-metrics-interface" process "github.com/jbenet/goprocess" - procctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/zap" ) -var provideKeysBufferSize = 2048 - var ( log = logging.Logger("bitswap-server") sflog = log.Desugar() ) -const provideWorkerMax = 6 - type Option func(*Server) type Server struct { @@ -59,20 +54,8 @@ type Server struct { process process.Process - // newBlocks is a channel for newly added blocks to be provided to the - // network. blocks pushed down this channel get buffered and fed to the - // provideKeys channel later on to avoid too much network activity - newBlocks chan cid.Cid - // provideKeys directly feeds provide workers - provideKeys chan cid.Cid - // Extra options to pass to the decision manager engineOptions []decision.Option - - // the size of channel buffer to use - hasBlockBufferSize int - // whether or not to make provide announcements - provideEnabled bool } func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server { @@ -87,16 +70,12 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl }() s := &Server{ - sentHistogram: bmetrics.SentHist(ctx), - sendTimeHistogram: bmetrics.SendTimeHist(ctx), - taskWorkerCount: defaults.BitswapTaskWorkerCount, - network: network, - process: px, - provideEnabled: true, - hasBlockBufferSize: defaults.HasBlockBufferSize, - provideKeys: make(chan cid.Cid, provideKeysBufferSize), + sentHistogram: bmetrics.SentHist(ctx), + sendTimeHistogram: bmetrics.SendTimeHist(ctx), + taskWorkerCount: defaults.BitswapTaskWorkerCount, + network: network, + process: px, } - s.newBlocks = make(chan cid.Cid, s.hasBlockBufferSize) for _, o := range options { o(s) @@ -131,13 +110,6 @@ func WithTracer(tap tracer.Tracer) Option { } } -// ProvideEnabled is an option for enabling/disabling provide announcements -func ProvideEnabled(enabled bool) Option { - return func(bs *Server) { - bs.provideEnabled = enabled - } -} - func WithPeerBlockRequestFilter(pbrf decision.PeerBlockRequestFilter) Option { o := decision.WithPeerBlockRequestFilter(pbrf) return func(bs *Server) { @@ -233,16 +205,6 @@ func MaxCidSize(n uint) Option { } } -// HasBlockBufferSize configure how big the new blocks buffer should be. -func HasBlockBufferSize(count int) Option { - if count < 0 { - panic("cannot have negative buffer size") - } - return func(bs *Server) { - bs.hasBlockBufferSize = count - } -} - // WantlistForPeer returns the currently understood list of blocks requested by a // given peer. func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid { @@ -263,18 +225,6 @@ func (bs *Server) startWorkers(ctx context.Context, px process.Process) { bs.taskWorker(ctx, i) }) } - - if bs.provideEnabled { - // Start up a worker to manage sending out provides messages - px.Go(func(px process.Process) { - bs.provideCollector(ctx) - }) - - // Spawn up multiple workers to handle incoming blocks - // consider increasing number if providing blocks bottlenecks - // file transfers - px.Go(bs.provideWorker) - } } func (bs *Server) taskWorker(ctx context.Context, id int) { @@ -382,10 +332,9 @@ func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) { } type Stat struct { - Peers []string - ProvideBufLen int - BlocksSent uint64 - DataSent uint64 + Peers []string + BlocksSent uint64 + DataSent uint64 } // Stat returns aggregated statistics about bitswap operations @@ -393,7 +342,6 @@ func (bs *Server) Stat() (Stat, error) { bs.counterLk.Lock() s := bs.counters bs.counterLk.Unlock() - s.ProvideBufLen = len(bs.newBlocks) peers := bs.engine.Peers() peersStr := make([]string, len(peers)) @@ -420,107 +368,9 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err // Send wanted blocks to decision engine bs.engine.NotifyNewBlocks(blks) - // If the reprovider is enabled, send block to reprovider - if bs.provideEnabled { - for _, blk := range blks { - select { - case bs.newBlocks <- blk.Cid(): - // send block off to be reprovided - case <-bs.process.Closing(): - return bs.process.Close() - } - } - } - return nil } -func (bs *Server) provideCollector(ctx context.Context) { - defer close(bs.provideKeys) - var toProvide []cid.Cid - var nextKey cid.Cid - var keysOut chan cid.Cid - - for { - select { - case blkey, ok := <-bs.newBlocks: - if !ok { - log.Debug("newBlocks channel closed") - return - } - - if keysOut == nil { - nextKey = blkey - keysOut = bs.provideKeys - } else { - toProvide = append(toProvide, blkey) - } - case keysOut <- nextKey: - if len(toProvide) > 0 { - nextKey = toProvide[0] - toProvide = toProvide[1:] - } else { - keysOut = nil - } - case <-ctx.Done(): - return - } - } -} - -func (bs *Server) provideWorker(px process.Process) { - // FIXME: OnClosingContext returns a _custom_ context type. - // Unfortunately, deriving a new cancelable context from this custom - // type fires off a goroutine. To work around this, we create a single - // cancelable context up-front and derive all sub-contexts from that. - // - // See: https://github.com/ipfs/go-ipfs/issues/5810 - ctx := procctx.OnClosingContext(px) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - limit := make(chan struct{}, provideWorkerMax) - - limitedGoProvide := func(k cid.Cid, wid int) { - defer func() { - // replace token when done - <-limit - }() - - log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k) - defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k) - - ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx - defer cancel() - - if err := bs.network.Provide(ctx, k); err != nil { - log.Warn(err) - } - } - - // worker spawner, reads from bs.provideKeys until it closes, spawning a - // _ratelimited_ number of workers to handle each key. - for wid := 2; ; wid++ { - log.Debug("Bitswap.ProvideWorker.Loop") - - select { - case <-px.Closing(): - return - case k, ok := <-bs.provideKeys: - if !ok { - log.Debug("provideKeys channel closed") - return - } - select { - case <-px.Closing(): - return - case limit <- struct{}{}: - go limitedGoProvide(k, wid) - } - } - } -} - func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) { // This call records changes to wantlists, blocks received, // and number of bytes transfered. diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 353be00f8..aac14ba84 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/exchange" + "github.com/ipfs/boxo/provider" "github.com/ipfs/boxo/verifcid" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -73,10 +74,21 @@ type BoundedBlockService interface { var _ BoundedBlockService = (*blockService)(nil) +// ProvidingBlockService is a Blockservice which provides new blocks to a provider. +type ProvidingBlockService interface { + BlockService + + // Provider can return nil, then no provider is used. + Provider() provider.Provider +} + +var _ ProvidingBlockService = (*blockService)(nil) + type blockService struct { allowlist verifcid.Allowlist blockstore blockstore.Blockstore exchange exchange.Interface + provider provider.Provider // If checkFirst is true then first check that a block doesn't // already exist to avoid republishing the block on the exchange. checkFirst bool @@ -99,6 +111,13 @@ func WithAllowlist(allowlist verifcid.Allowlist) Option { } } +// WithProvider allows to advertise anything that is added through the blockservice. +func WithProvider(prov provider.Provider) Option { + return func(bs *blockService) { + bs.provider = prov + } +} + // New creates a BlockService with given datastore instance. func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) BlockService { if exchange == nil { @@ -121,6 +140,11 @@ func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) // Blockstore returns the blockstore behind this blockservice. func (s *blockService) Blockstore() blockstore.Blockstore { + if s.provider != nil { + // FIXME: this is a hack remove once ipfs/boxo#567 is solved. + return providingBlockstore{s.blockstore, s.provider} + } + return s.blockstore } @@ -133,6 +157,10 @@ func (s *blockService) Allowlist() verifcid.Allowlist { return s.allowlist } +func (s *blockService) Provider() provider.Provider { + return s.provider +} + // NewSession creates a new session that allows for // controlled exchange of wantlists to decrease the bandwidth overhead. // If the current exchange is a SessionExchange, a new exchange @@ -140,16 +168,6 @@ func (s *blockService) Allowlist() verifcid.Allowlist { // directly. // Sessions are lazily setup, this is cheap. func NewSession(ctx context.Context, bs BlockService) *Session { - ses := grabSessionFromContext(ctx, bs) - if ses != nil { - return ses - } - - return newSession(ctx, bs) -} - -// newSession is like [NewSession] but it does not attempt to reuse session from the existing context. -func newSession(ctx context.Context, bs BlockService) *Session { return &Session{bs: bs, sesctx: ctx} } @@ -180,6 +198,11 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { logger.Errorf("NotifyNewBlocks: %s", err.Error()) } } + if s.provider != nil { + if err := s.provider.Provide(o.Cid()); err != nil { + logger.Errorf("Provide: %s", err.Error()) + } + } return nil } @@ -226,16 +249,20 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { logger.Errorf("NotifyNewBlocks: %s", err.Error()) } } + if s.provider != nil { + for _, o := range toput { + if err := s.provider.Provide(o.Cid()); err != nil { + logger.Errorf("Provide: %s", err.Error()) + } + } + } + return nil } // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - if ses := grabSessionFromContext(ctx, s); ses != nil { - return ses.GetBlock(ctx, c) - } - ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() @@ -253,7 +280,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } - blockstore := bs.Blockstore() + provider, blockstore := grabProviderAndBlockstoreFromBlockservice(bs) block, err := blockstore.Get(ctx, c) switch { @@ -287,6 +314,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } } + if provider != nil { + err = provider.Provide(blk.Cid()) + if err != nil { + return nil, err + } + } logger.Debugf("BlockService.BlockFetched %s", c) return blk, nil } @@ -295,10 +328,6 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func // the returned channel. // NB: No guarantees are made about order. func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { - if ses := grabSessionFromContext(ctx, s); ses != nil { - return ses.GetBlocks(ctx, ks) - } - ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() @@ -336,7 +365,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet ks = ks2 } - bs := blockservice.Blockstore() + provider, bs := grabProviderAndBlockstoreFromBlockservice(blockservice) var misses []cid.Cid for _, c := range ks { @@ -395,6 +424,14 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet cache[0] = nil // early gc } + if provider != nil { + err = provider.Provide(b.Cid()) + if err != nil { + logger.Errorf("could not tell the provider about new blocks: %s", err) + return + } + } + select { case out <- b: case <-ctx.Done(): @@ -474,43 +511,6 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo var _ BlockGetter = (*Session)(nil) -// ContextWithSession is a helper which creates a context with an embded session, -// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService] -// will be redirected to this same session instead. -// Sessions are lazily setup, this is cheap. -// It wont make a new session if one exists already in the context. -func ContextWithSession(ctx context.Context, bs BlockService) context.Context { - if grabSessionFromContext(ctx, bs) != nil { - return ctx - } - return EmbedSessionInContext(ctx, newSession(ctx, bs)) -} - -// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session. -func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context { - // use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice. - return context.WithValue(ctx, ses.bs, ses) -} - -// grabSessionFromContext returns nil if the session was not found -// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety, -// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app. -// By having this private we allow consumers to follow the trace of where the blockservice is passed and used. -func grabSessionFromContext(ctx context.Context, bs BlockService) *Session { - s := ctx.Value(bs) - if s == nil { - return nil - } - - ss, ok := s.(*Session) - if !ok { - // idk what to do here, that kinda sucks, giveup - return nil - } - - return ss -} - // grabAllowlistFromBlockservice never returns nil func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { if bbs, ok := bs.(BoundedBlockService); ok { @@ -518,3 +518,14 @@ func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { } return verifcid.DefaultAllowlist } + +// grabProviderAndBlockstoreFromBlockservice can return nil if no provider is used. +func grabProviderAndBlockstoreFromBlockservice(bs BlockService) (provider.Provider, blockstore.Blockstore) { + if bbs, ok := bs.(*blockService); ok { + return bbs.provider, bbs.blockstore + } + if bbs, ok := bs.(ProvidingBlockService); ok { + return bbs.Provider(), bbs.Blockstore() + } + return nil, bs.Blockstore() +} diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 53fd725f3..b04c3df8d 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -289,35 +289,18 @@ func TestAllowlist(t *testing.T) { check(NewSession(ctx, blockservice).GetBlock) } -type fakeIsNewSessionCreateExchange struct { - ses exchange.Fetcher - newSessionWasCalled bool +type wrappedBlockservice struct { + BlockService } -var _ exchange.SessionExchange = (*fakeIsNewSessionCreateExchange)(nil) +type mockProvider []cid.Cid -func (*fakeIsNewSessionCreateExchange) Close() error { +func (p *mockProvider) Provide(c cid.Cid) error { + *p = append(*p, c) return nil } -func (*fakeIsNewSessionCreateExchange) GetBlock(context.Context, cid.Cid) (blocks.Block, error) { - panic("should call on the session") -} - -func (*fakeIsNewSessionCreateExchange) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) { - panic("should call on the session") -} - -func (f *fakeIsNewSessionCreateExchange) NewSession(context.Context) exchange.Fetcher { - f.newSessionWasCalled = true - return f.ses -} - -func (*fakeIsNewSessionCreateExchange) NotifyNewBlocks(context.Context, ...blocks.Block) error { - return nil -} - -func TestContextSession(t *testing.T) { +func TestProviding(t *testing.T) { t.Parallel() a := assert.New(t) @@ -325,31 +308,84 @@ func TestContextSession(t *testing.T) { defer cancel() bgen := butil.NewBlockGenerator() - block1 := bgen.Next() - block2 := bgen.Next() + blocks := bgen.Blocks(12) + + exchange := blockstore.NewBlockstore(ds.NewMapDatastore()) - bs := blockstore.NewBlockstore(ds.NewMapDatastore()) - a.NoError(bs.Put(ctx, block1)) - a.NoError(bs.Put(ctx, block2)) - sesEx := &fakeIsNewSessionCreateExchange{ses: offline.Exchange(bs)} + prov := mockProvider{} + blockservice := New(blockstore.NewBlockstore(ds.NewMapDatastore()), offline.Exchange(exchange), WithProvider(&prov)) + var added []cid.Cid - service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx) + // Adding one block provide it. + a.NoError(blockservice.AddBlock(ctx, blocks[0])) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] - ctx = ContextWithSession(ctx, service) + // Adding multiple blocks provide them. + a.NoError(blockservice.AddBlocks(ctx, blocks[0:2])) + added = append(added, blocks[0].Cid(), blocks[1].Cid()) + blocks = blocks[2:] - b, err := service.GetBlock(ctx, block1.Cid()) + // Downloading one block provide it. + a.NoError(exchange.Put(ctx, blocks[0])) + _, err := blockservice.GetBlock(ctx, blocks[0].Cid()) a.NoError(err) - a.Equal(b.RawData(), block1.RawData()) - a.True(sesEx.newSessionWasCalled, "new session from context should be created") - sesEx.newSessionWasCalled = false - - bchan := service.GetBlocks(ctx, []cid.Cid{block2.Cid()}) - a.Equal((<-bchan).RawData(), block2.RawData()) - a.False(sesEx.newSessionWasCalled, "session should be reused in context") - - a.Equal( - NewSession(ctx, service), - NewSession(ContextWithSession(ctx, service), service), - "session must be deduped in all invocations on the same context", - ) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] + + // Downloading multiple blocks provide them. + a.NoError(exchange.PutMany(ctx, blocks[0:2])) + cids := []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} + var got []cid.Cid + for b := range blockservice.GetBlocks(ctx, cids) { + got = append(got, b.Cid()) + } + added = append(added, cids...) + a.ElementsMatch(cids, got) + blocks = blocks[2:] + + session := NewSession(ctx, blockservice) + + // Downloading one block over a session provide it. + a.NoError(exchange.Put(ctx, blocks[0])) + _, err = session.GetBlock(ctx, blocks[0].Cid()) + a.NoError(err) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] + + // Downloading multiple blocks over a session provide them. + a.NoError(exchange.PutMany(ctx, blocks[0:2])) + cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} + got = nil + for b := range session.GetBlocks(ctx, cids) { + got = append(got, b.Cid()) + } + a.ElementsMatch(cids, got) + added = append(added, cids...) + blocks = blocks[2:] + + // Test wrapping the blockservice like nopfs does. + session = NewSession(ctx, wrappedBlockservice{blockservice}) + + // Downloading one block over a wrapped blockservice session provide it. + a.NoError(exchange.Put(ctx, blocks[0])) + _, err = session.GetBlock(ctx, blocks[0].Cid()) + a.NoError(err) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] + + // Downloading multiple blocks over a wrapped blockservice session provide them. + a.NoError(exchange.PutMany(ctx, blocks[0:2])) + cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} + got = nil + for b := range session.GetBlocks(ctx, cids) { + got = append(got, b.Cid()) + } + a.ElementsMatch(cids, got) + added = append(added, cids...) + blocks = blocks[2:] + + a.Empty(blocks) + + a.ElementsMatch(added, []cid.Cid(prov)) } diff --git a/blockservice/providing_blockstore.go b/blockservice/providing_blockstore.go new file mode 100644 index 000000000..7435f8ae2 --- /dev/null +++ b/blockservice/providing_blockstore.go @@ -0,0 +1,37 @@ +package blockservice + +import ( + "context" + + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/provider" + blocks "github.com/ipfs/go-block-format" +) + +var _ blockstore.Blockstore = providingBlockstore{} + +type providingBlockstore struct { + blockstore.Blockstore + provider provider.Provider +} + +func (pbs providingBlockstore) Put(ctx context.Context, b blocks.Block) error { + if err := pbs.Blockstore.Put(ctx, b); err != nil { + return err + } + + return pbs.provider.Provide(b.Cid()) +} + +func (pbs providingBlockstore) PutMany(ctx context.Context, b []blocks.Block) error { + if err := pbs.Blockstore.PutMany(ctx, b); err != nil { + return err // what are the semantics here, did some blocks were put ? assume PutMany is atomic + } + + for _, b := range b { + if err := pbs.provider.Provide(b.Cid()); err != nil { + return err // this can only error if the whole provider is done for + } + } + return nil +} diff --git a/examples/go.mod b/examples/go.mod index 6c0630543..7b91ade2a 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -60,6 +60,7 @@ require ( github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect + github.com/ipfs/go-cidutil v0.1.0 // indirect github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect diff --git a/examples/go.sum b/examples/go.sum index cfb0944d0..725c2e244 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -167,6 +167,8 @@ github.com/ipfs/go-blockservice v0.5.0 h1:B2mwhhhVQl2ntW2EIpaWPwSCxSuqr5fFA93Ms4 github.com/ipfs/go-cid v0.0.6/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= +github.com/ipfs/go-cidutil v0.1.0 h1:RW5hO7Vcf16dplUU60Hs0AKDkQAVPVplr7lk97CFL+Q= +github.com/ipfs/go-cidutil v0.1.0/go.mod h1:e7OEVBMIv9JaOxt9zaGEmAoSlXW9jdFZ5lP/0PwcfpA= github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk= github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= diff --git a/gateway/blocks_backend.go b/gateway/blocks_backend.go index d85c2846b..99a762c80 100644 --- a/gateway/blocks_backend.go +++ b/gateway/blocks_backend.go @@ -689,12 +689,6 @@ func (bb *BlocksBackend) IsCached(ctx context.Context, p path.Path) bool { return has } -var _ WithContextHint = (*BlocksBackend)(nil) - -func (bb *BlocksBackend) WrapContextForRequest(ctx context.Context) context.Context { - return blockservice.ContextWithSession(ctx, bb.blockService) -} - func (bb *BlocksBackend) ResolvePath(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, error) { roots, lastSeg, remainder, err := bb.getPathRoots(ctx, path) if err != nil {