Skip to content

Commit

Permalink
blockservice: add session workaround to work with wrapped blockservices
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Feb 16, 2024
1 parent 047fce0 commit 4060154
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 20 deletions.
23 changes: 15 additions & 8 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option)

// Blockstore returns the blockstore behind this blockservice.
func (s *blockService) Blockstore() blockstore.Blockstore {
if s.provider != nil {
// FIXME: this is a hack remove once ipfs/boxo#567 is solved.
return providingBlockstore{s.blockstore, s.provider}
}

return s.blockstore
}

Expand Down Expand Up @@ -275,7 +280,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
return nil, err
}

blockstore := bs.Blockstore()
provider, blockstore := grabProviderAndBlockstoreFromBlockservice(bs)

block, err := blockstore.Get(ctx, c)
switch {
Expand Down Expand Up @@ -309,7 +314,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
return nil, err
}
}
if provider := grabProviderFromBlockservice(bs); provider != nil {
if provider != nil {
err = provider.Provide(blk.Cid())
if err != nil {
return nil, err
Expand Down Expand Up @@ -360,7 +365,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
ks = ks2
}

bs := blockservice.Blockstore()
provider, bs := grabProviderAndBlockstoreFromBlockservice(blockservice)

var misses []cid.Cid
for _, c := range ks {
Expand Down Expand Up @@ -388,7 +393,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
}

ex := blockservice.Exchange()
provider := grabProviderFromBlockservice(blockservice)
var cache [1]blocks.Block // preallocate once for all iterations
for {
var b blocks.Block
Expand Down Expand Up @@ -515,10 +519,13 @@ func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist {
return verifcid.DefaultAllowlist
}

// grabProviderFromBlockservice can return nil if no provider is used.
func grabProviderFromBlockservice(bs BlockService) provider.Provider {
// grabProviderAndBlockstoreFromBlockservice can return nil if no provider is used.
func grabProviderAndBlockstoreFromBlockservice(bs BlockService) (provider.Provider, blockstore.Blockstore) {
if bbs, ok := bs.(*blockService); ok {
return bbs.provider, bbs.blockstore
}
if bbs, ok := bs.(ProvidingBlockService); ok {
return bbs.Provider()
return bbs.Provider(), bbs.Blockstore()
}

Check warning on line 529 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L528-L529

Added lines #L528 - L529 were not covered by tests
return nil
return nil, bs.Blockstore()
}
56 changes: 44 additions & 12 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,18 +289,26 @@ func TestAllowlist(t *testing.T) {
check(NewSession(ctx, blockservice).GetBlock)
}

type wrappedBlockservice struct {
BlockService
}

type mockProvider []cid.Cid

func (p *mockProvider) Provide(c cid.Cid) error {
*p = append(*p, c)
return nil
}

func TestProviding(t *testing.T) {
t.Parallel()
a := assert.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bgen := butil.NewBlockGenerator()
blocks := bgen.Blocks(9)
blocks := bgen.Blocks(12)

exchange := blockstore.NewBlockstore(ds.NewMapDatastore())

Expand All @@ -309,51 +317,75 @@ func TestProviding(t *testing.T) {
var added []cid.Cid

// Adding one block provide it.
a.NoError(blockservice.AddBlock(context.Background(), blocks[0]))
a.NoError(blockservice.AddBlock(ctx, blocks[0]))
added = append(added, blocks[0].Cid())
blocks = blocks[1:]

// Adding multiple blocks provide them.
a.NoError(blockservice.AddBlocks(context.Background(), blocks[0:2]))
a.NoError(blockservice.AddBlocks(ctx, blocks[0:2]))
added = append(added, blocks[0].Cid(), blocks[1].Cid())
blocks = blocks[2:]

// Downloading one block provide it.
a.NoError(exchange.Put(context.Background(), blocks[0]))
_, err := blockservice.GetBlock(context.Background(), blocks[0].Cid())
a.NoError(exchange.Put(ctx, blocks[0]))
_, err := blockservice.GetBlock(ctx, blocks[0].Cid())
a.NoError(err)
added = append(added, blocks[0].Cid())
blocks = blocks[1:]

// Downloading multiple blocks provide them.
a.NoError(exchange.PutMany(context.Background(), blocks[0:2]))
a.NoError(exchange.PutMany(ctx, blocks[0:2]))
cids := []cid.Cid{blocks[0].Cid(), blocks[1].Cid()}
var got []cid.Cid
for b := range blockservice.GetBlocks(context.Background(), cids) {
for b := range blockservice.GetBlocks(ctx, cids) {
got = append(got, b.Cid())
}
added = append(added, cids...)
a.ElementsMatch(cids, got)
blocks = blocks[2:]

session := NewSession(context.Background(), blockservice)
session := NewSession(ctx, blockservice)

// Downloading one block over a session provide it.
a.NoError(exchange.Put(context.Background(), blocks[0]))
_, err = session.GetBlock(context.Background(), blocks[0].Cid())
a.NoError(exchange.Put(ctx, blocks[0]))
_, err = session.GetBlock(ctx, blocks[0].Cid())
a.NoError(err)
added = append(added, blocks[0].Cid())
blocks = blocks[1:]

// Downloading multiple blocks over a session provide them.
a.NoError(exchange.PutMany(context.Background(), blocks[0:2]))
a.NoError(exchange.PutMany(ctx, blocks[0:2]))
cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()}
got = nil
for b := range session.GetBlocks(ctx, cids) {
got = append(got, b.Cid())
}
a.ElementsMatch(cids, got)
added = append(added, cids...)
blocks = blocks[2:]

// Test wrapping the blockservice like nopfs does.
session = NewSession(ctx, wrappedBlockservice{blockservice})

// Downloading one block over a wrapped blockservice session provide it.
a.NoError(exchange.Put(ctx, blocks[0]))
_, err = session.GetBlock(ctx, blocks[0].Cid())
a.NoError(err)
added = append(added, blocks[0].Cid())
blocks = blocks[1:]

// Downloading multiple blocks over a wrapped blockservice session provide them.
a.NoError(exchange.PutMany(ctx, blocks[0:2]))
cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()}
got = nil
for b := range session.GetBlocks(context.Background(), cids) {
for b := range session.GetBlocks(ctx, cids) {
got = append(got, b.Cid())
}
a.ElementsMatch(cids, got)
added = append(added, cids...)
blocks = blocks[2:]

a.Empty(blocks)

a.ElementsMatch(added, []cid.Cid(prov))
}
37 changes: 37 additions & 0 deletions blockservice/providing_blockstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package blockservice

import (
"context"

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/provider"
blocks "github.com/ipfs/go-block-format"
)

var _ blockstore.Blockstore = providingBlockstore{}

type providingBlockstore struct {
blockstore.Blockstore
provider provider.Provider
}

func (pbs providingBlockstore) Put(ctx context.Context, b blocks.Block) error {
if err := pbs.Blockstore.Put(ctx, b); err != nil {
return err
}

Check warning on line 21 in blockservice/providing_blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockservice/providing_blockstore.go#L20-L21

Added lines #L20 - L21 were not covered by tests

return pbs.provider.Provide(b.Cid())
}

func (pbs providingBlockstore) PutMany(ctx context.Context, b []blocks.Block) error {
if err := pbs.Blockstore.PutMany(ctx, b); err != nil {
return err // what are the semantics here, did some blocks were put ? assume PutMany is atomic
}

Check warning on line 29 in blockservice/providing_blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockservice/providing_blockstore.go#L26-L29

Added lines #L26 - L29 were not covered by tests

for _, b := range b {
if err := pbs.provider.Provide(b.Cid()); err != nil {
return err // this can only error if the whole provider is done for
}

Check warning on line 34 in blockservice/providing_blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockservice/providing_blockstore.go#L31-L34

Added lines #L31 - L34 were not covered by tests
}
return nil

Check warning on line 36 in blockservice/providing_blockstore.go

View check run for this annotation

Codecov / codecov/patch

blockservice/providing_blockstore.go#L36

Added line #L36 was not covered by tests
}

0 comments on commit 4060154

Please sign in to comment.