Skip to content

Commit

Permalink
fix: handle (and test) WholeCID vs not; fast Has() path for storage
Browse files Browse the repository at this point in the history
This commit was moved from ipld/go-car@c58b233
  • Loading branch information
rvagg committed Mar 20, 2023
1 parent 0405449 commit ca32fef
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 23 deletions.
24 changes: 8 additions & 16 deletions ipld/car/v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,29 +344,21 @@ func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
}

func (b *ReadWrite) Has(ctx context.Context, key cid.Cid) (bool, error) {
if !b.opts.StoreIdentityCIDs {
// If we don't store identity CIDs then we can return them straight away as if they are here,
// otherwise we need to check for their existence.
// Note, we do this without locking, since there is no shared information to lock for in order to perform the check.
if _, ok, err := store.IsIdentity(key); err != nil {
return false, err
} else if ok {
return true, nil
}
}

if ctx.Err() != nil {
return false, ctx.Err()
}

b.ronly.mu.Lock()
defer b.ronly.mu.Unlock()

if b.ronly.closed {
return false, errClosed
}

return b.idx.HasMultihash(key.Hash())
return store.Has(
b.idx,
key,
b.opts.MaxIndexCidSize,
b.opts.StoreIdentityCIDs,
b.opts.BlockstoreAllowDuplicatePuts,
b.opts.BlockstoreUseWholeCIDs,
)
}

func (b *ReadWrite) Get(ctx context.Context, key cid.Cid) (blocks.Block, error) {
Expand Down
50 changes: 50 additions & 0 deletions ipld/car/v2/blockstore/readwrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,3 +1085,53 @@ func TestBlockstoreFinalizeReadOnly(t *testing.T) {
err = bs.Put(ctx, root)
require.Error(t, err)
}

func TestWholeCID(t *testing.T) {
for _, whole := range []bool{true, false} {
whole := whole
t.Run(fmt.Sprintf("whole=%t", whole), func(t *testing.T) {
t.Parallel()
ctx := context.Background()
path := filepath.Join(t.TempDir(), fmt.Sprintf("writable_%t.car", whole))
rw, err := blockstore.OpenReadWrite(path, []cid.Cid{}, carv2.UseWholeCIDs(whole))
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, rw.Finalize()) })

require.NoError(t, rw.Put(ctx, oneTestBlockWithCidV1))
has, err := rw.Has(ctx, oneTestBlockWithCidV1.Cid())
require.NoError(t, err)
require.True(t, has)

pref := oneTestBlockWithCidV1.Cid().Prefix()
pref.Codec = cid.DagCBOR
pref.Version = 1
cpb1, err := pref.Sum(oneTestBlockWithCidV1.RawData())
require.NoError(t, err)

has, err = rw.Has(ctx, cpb1)
require.NoError(t, err)
require.Equal(t, has, !whole)

require.NoError(t, rw.Put(ctx, anotherTestBlockWithCidV0))
has, err = rw.Has(ctx, anotherTestBlockWithCidV0.Cid())
require.NoError(t, err)
require.True(t, has)
has, err = rw.Has(ctx, cpb1)
require.NoError(t, err)
require.Equal(t, has, !whole)

pref = anotherTestBlockWithCidV0.Cid().Prefix()
pref.Codec = cid.DagJSON
pref.Version = 1
cpb2, err := pref.Sum(anotherTestBlockWithCidV0.RawData())
require.NoError(t, err)

has, err = rw.Has(ctx, cpb2)
require.NoError(t, err)
require.Equal(t, has, !whole)
has, err = rw.Has(ctx, cpb1)
require.NoError(t, err)
require.Equal(t, has, !whole)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,30 @@ func ShouldPut(

return true, nil
}

// Has returns true if the block exists in in the store according to the various
// rules associated with the options. Similar to ShouldPut, but for the simpler
// Has() case.
func Has(
idx *InsertionIndex,
c cid.Cid,
maxIndexCidSize uint64,
storeIdentityCIDs bool,
blockstoreAllowDuplicatePuts bool,
blockstoreUseWholeCIDs bool,
) (bool, error) {

// If StoreIdentityCIDs option is disabled then treat IDENTITY CIDs like IdStore.
if !storeIdentityCIDs {
if _, ok, err := IsIdentity(c); err != nil {
return false, err
} else if ok {
return true, nil
}
}

if blockstoreUseWholeCIDs {
return idx.HasExactCID(c)
}
return idx.HasMultihash(c.Hash())
}
26 changes: 19 additions & 7 deletions ipld/car/v2/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,25 @@ func (sc *StorageCar) Has(ctx context.Context, keyStr string) (bool, error) {
return false, fmt.Errorf("bad CID key: %w", err)
}

sc.mu.RLock()
defer sc.mu.RUnlock()

if sc.closed {
return false, errClosed
}

if idx, ok := sc.idx.(*store.InsertionIndex); ok && sc.writer != nil {
// writable CAR, fast path using InsertionIndex
return store.Has(
idx,
keyCid,
sc.opts.MaxIndexCidSize,
sc.opts.StoreIdentityCIDs,
sc.opts.BlockstoreAllowDuplicatePuts,
sc.opts.BlockstoreUseWholeCIDs,
)
}

if !sc.opts.StoreIdentityCIDs {
// If we don't store identity CIDs then we can return them straight away as if they are here,
// otherwise we need to check for their existence.
Expand All @@ -360,13 +379,6 @@ func (sc *StorageCar) Has(ctx context.Context, keyStr string) (bool, error) {
}
}

sc.mu.RLock()
defer sc.mu.RUnlock()

if sc.closed {
return false, errClosed
}

_, _, size, err := store.FindCid(
sc.reader,
sc.idx,
Expand Down
55 changes: 55 additions & 0 deletions ipld/car/v2/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,61 @@ func TestOperationsErrorWithBadCidStrings(t *testing.T) {
require.Nil(t, got)
}

func TestWholeCID(t *testing.T) {
for _, whole := range []bool{true, false} {
whole := whole
t.Run(fmt.Sprintf("whole=%t", whole), func(t *testing.T) {
t.Parallel()
ctx := context.Background()
path := filepath.Join(t.TempDir(), fmt.Sprintf("writable_%t.car", whole))
out, err := os.Create(path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, out.Close()) })
store, err := storage.NewReadableWritable(out, []cid.Cid{}, carv2.UseWholeCIDs(whole))
require.NoError(t, err)

c1, b1 := randBlock()
c2, b2 := randBlock()

require.NoError(t, store.Put(ctx, c1.KeyString(), b1))
has, err := store.Has(ctx, c1.KeyString())
require.NoError(t, err)
require.True(t, has)

pref := c1.Prefix()
pref.Codec = cid.DagProtobuf
pref.Version = 1
cpb1, err := pref.Sum(b1)
require.NoError(t, err)

has, err = store.Has(ctx, cpb1.KeyString())
require.NoError(t, err)
require.Equal(t, has, !whole)

require.NoError(t, store.Put(ctx, c2.KeyString(), b1))
has, err = store.Has(ctx, c2.KeyString())
require.NoError(t, err)
require.True(t, has)
has, err = store.Has(ctx, cpb1.KeyString())
require.NoError(t, err)
require.Equal(t, has, !whole)

pref = c2.Prefix()
pref.Codec = cid.DagProtobuf
pref.Version = 1
cpb2, err := pref.Sum(b2)
require.NoError(t, err)

has, err = store.Has(ctx, cpb2.KeyString())
require.NoError(t, err)
require.Equal(t, has, !whole)
has, err = store.Has(ctx, cpb1.KeyString())
require.NoError(t, err)
require.Equal(t, has, !whole)
})
}
}

type writerOnly struct {
io.Writer
}
Expand Down

0 comments on commit ca32fef

Please sign in to comment.