Skip to content

Commit

Permalink
apply review suggestions #2
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Jul 17, 2024
1 parent 6106c5d commit a66f38b
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 120 deletions.
142 changes: 62 additions & 80 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
)

// TODO(@walldiss):
// - add method to list files in store
// - persist store stats like
// - amount of files
// - file types hist (ods/q1q4)
Expand Down Expand Up @@ -78,7 +79,7 @@ func NewStore(params *Parameters, basePath string) (*Store, error) {
return nil, fmt.Errorf("ensure heights folder '%s': %w", heightsFolderPath, err)
}

err := createEmptyFile(basePath)
err := ensureEmptyFile(basePath)
if err != nil {
return nil, fmt.Errorf("creating empty file: %w", err)
}
Expand Down Expand Up @@ -122,24 +123,17 @@ func (s *Store) Put(
}

// short circuit if file exists
if has, _ := s.hasByHeight(height); has {
f, err := s.getByHeight(height)
if err == nil {
s.metrics.observePutExist(ctx)
return s.getByHeight(height)
return f, nil
}

filePath := s.basepath + blocksPath + datahash.String()
f, err := s.createFile(filePath, datahash, square)
f, err = s.createFile(datahash, height, square)
if err != nil {
s.metrics.observePut(ctx, time.Since(tNow), square.Width(), true)
return nil, fmt.Errorf("creating file: %w", err)
}

// create hard link with height as name
err = s.createHeightLink(datahash, height)
if err != nil {
s.metrics.observePut(ctx, time.Since(tNow), square.Width(), false)
return nil, fmt.Errorf("linking height: %w", err)
}
s.metrics.observePut(ctx, time.Since(tNow), square.Width(), false)

// put file in recent cache
Expand All @@ -151,25 +145,21 @@ func (s *Store) Put(
}

func (s *Store) createFile(
filePath string,
datahash share.DataHash,
height uint64,
square *rsmt2d.ExtendedDataSquare,
) (eds.AccessorStreamer, error) {
// check if file with the same hash already exists
f, err := s.getByHash(datahash)
if err == nil {
return f, nil
}

if !errors.Is(err, ErrNotFound) {
return nil, fmt.Errorf("getting by hash: %w", err)
}

// create Q1Q4 file
f, err = file.CreateQ1Q4File(filePath, datahash, square)
filePath := s.basepath + blocksPath + datahash.String()
f, err := file.CreateQ1Q4File(filePath, datahash, square)
if err != nil {
return nil, fmt.Errorf("creating ODS file: %w", err)
}
// create hard link with height as name
err = s.createHeightLink(datahash, height)
if err != nil {
return nil, fmt.Errorf("creating hard link: %w", err)
}
return f, nil
}

Expand Down Expand Up @@ -269,75 +259,71 @@ func (s *Store) hasByHeight(height uint64) (bool, error) {
return pathExists(path)
}

func (s *Store) Remove(ctx context.Context, height uint64) error {
lock := s.stripLock.byHeight(height)
lock.Lock()
defer lock.Unlock()

func (s *Store) Remove(ctx context.Context, height uint64, dataRoot share.DataHash) error {
tNow := time.Now()
err := s.remove(ctx, height)
err := s.remove(height, dataRoot)
s.metrics.observeRemove(ctx, time.Since(tNow), err != nil)
return err
}

func (s *Store) remove(ctx context.Context, height uint64) error {
f, err := s.getByHeight(height)
if err != nil {
// short circuit if file not exists
if errors.Is(err, ErrNotFound) {
return nil
}
return fmt.Errorf("getting by height: %w", err)
func (s *Store) openFile(path string) (eds.AccessorStreamer, error) {
f, err := file.OpenQ1Q4File(path)
if err == nil {
return wrappedFile(f), nil
}
if os.IsNotExist(err) {
return nil, ErrNotFound
}
if errors.Is(err, file.ErrFileIsEmpty) {
return emptyAccessor, nil
}
return nil, fmt.Errorf("opening file: %w", err)
}

hash, err := f.DataRoot(ctx)
if err != nil {
return fmt.Errorf("getting data hash: %w", err)
func (s *Store) remove(height uint64, dataRoot share.DataHash) error {
if err := s.removeLink(height); err != nil {
return fmt.Errorf("removing link: %w", err)
}
// close file to release the reference in the cache
if err = f.Close(); err != nil {
return fmt.Errorf("closing file on removal: %w", err)
if err := s.removeFile(dataRoot); err != nil {
return fmt.Errorf("removing file: %w", err)
}
return nil
}

// lock by datahash to prevent concurrent access to the same underlying file
// by GetByHash
dlock := s.stripLock.byDatahash(hash)
dlock.Lock()
defer dlock.Unlock()
func (s *Store) removeLink(height uint64) error {
lock := s.stripLock.byHeight(height)
lock.Lock()
defer lock.Unlock()

if err = s.cache.Remove(height); err != nil {
if err := s.cache.Remove(height); err != nil {
return fmt.Errorf("removing from cache: %w", err)
}

// remove hard link by height
heightPath := s.basepath + heightsPath + fmt.Sprintf("%d", height)
if err = os.Remove(heightPath); err != nil {
return fmt.Errorf("removing by height: %w", err)
}

// remove file if not empty root
if !hash.IsEmptyRoot() {
hashPath := s.basepath + blocksPath + hash.String()
err = os.Remove(hashPath)
if err != nil {
return fmt.Errorf("removing by hash: %w", err)
}
err := os.Remove(heightPath)
if err != nil && !os.IsNotExist(err) {
return err
}
return nil
}

func (s *Store) openFile(path string) (eds.AccessorStreamer, error) {
f, err := file.OpenQ1Q4File(path)
if err == nil {
return wrappedFile(f), nil
}
if os.IsNotExist(err) {
return nil, ErrNotFound
func (s *Store) removeFile(hash share.DataHash) error {
// we don't need to remove the empty file, it should always be there
if hash.IsEmptyRoot() {
return nil
}
if errors.Is(err, file.ErrFileIsEmpty) {
return emptyAccessor, nil

dlock := s.stripLock.byDatahash(hash)
dlock.Lock()
defer dlock.Unlock()

hashPath := s.basepath + blocksPath + hash.String()
err := os.Remove(hashPath)
if err != nil && !os.IsNotExist(err) {
return err
}
return nil, fmt.Errorf("opening file: %w", err)
return nil
}

func fileLoader(f eds.AccessorStreamer) cache.OpenAccessorFn {
Expand Down Expand Up @@ -384,18 +370,14 @@ func pathExists(path string) (bool, error) {
}

func (s *Store) addEmptyHeight(height uint64) error {
// short circuit if link exists
has, err := s.hasByHeight(height)
if err != nil {
return err
}
if has {
return nil
err := s.createHeightLink(share.EmptyRoot().Hash(), height)
if err != nil && !errors.Is(err, os.ErrExist) {
return fmt.Errorf("creating empty height link: %w", err)
}
return s.createHeightLink(share.EmptyRoot().Hash(), height)
return nil
}

func createEmptyFile(basepath string) error {
func ensureEmptyFile(basepath string) error {
path := basepath + blocksPath + share.DataHash(share.EmptyRoot().Hash()).String()
ok, err := pathExists(path)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions store/store_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ func DefaultParameters() *Parameters {

func (p *Parameters) Validate() error {
if p.RecentBlocksCacheSize < 1 {
return errors.New("recent blocks cache size must be positive")
return errors.New("recent eds cache size must be positive")
}

if p.AvailabilityCacheSize < 1 {
return errors.New("blockstore cache size must be positive")
return errors.New("availability cache size must be positive")
}
return nil
}
47 changes: 9 additions & 38 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,41 +82,8 @@ func TestEDSStore(t *testing.T) {
f, err = edsStore.Put(ctx, dah.Hash(), height, eds)
require.NoError(t, err)
require.NoError(t, f.Close())
})

t.Run("Put eds with same hash for different height", func(t *testing.T) {
eds, dah := randomEDS(t)
h1 := height.Add(1)

f, err := edsStore.Put(ctx, dah.Hash(), h1, eds)
require.NoError(t, err)
require.NoError(t, f.Close())

h2 := height.Add(1)
f, err = edsStore.Put(ctx, dah.Hash(), h2, eds)
require.NoError(t, err)
require.NoError(t, f.Close())

// both heights should be available
has, err := edsStore.HasByHeight(ctx, h1)
require.NoError(t, err)
require.True(t, has)

has, err = edsStore.HasByHeight(ctx, h2)
require.NoError(t, err)
require.True(t, has)

// removing one height should not affect the other
err = edsStore.Remove(ctx, h1)
require.NoError(t, err)

has, err = edsStore.HasByHeight(ctx, h1)
require.NoError(t, err)
require.False(t, has)

has, err = edsStore.HasByHeight(ctx, h2)
require.NoError(t, err)
require.True(t, has)
// TODO: check amount of files in the store after the second Put
// after store supports listing
})

t.Run("GetByHeight", func(t *testing.T) {
Expand Down Expand Up @@ -177,9 +144,9 @@ func TestEDSStore(t *testing.T) {
})

t.Run("Remove", func(t *testing.T) {
// removing file that not exists should be noop
// removing file that does not exist should be noop
missingHeight := height.Add(1)
err := edsStore.Remove(ctx, missingHeight)
err := edsStore.Remove(ctx, missingHeight, share.DataHash{0x01, 0x02})
require.NoError(t, err)

eds, dah := randomEDS(t)
Expand All @@ -188,7 +155,7 @@ func TestEDSStore(t *testing.T) {
require.NoError(t, err)
require.NoError(t, f.Close())

err = edsStore.Remove(ctx, height)
err = edsStore.Remove(ctx, height, dah.Hash())
require.NoError(t, err)

// file should be removed from cache
Expand All @@ -200,6 +167,10 @@ func TestEDSStore(t *testing.T) {
require.NoError(t, err)
require.False(t, has)

// subsequent remove should be noop
err = edsStore.Remove(ctx, height, dah.Hash())
require.NoError(t, err)

// file should not be accessible by height
has, err = edsStore.HasByHeight(ctx, height)
require.NoError(t, err)
Expand Down

0 comments on commit a66f38b

Please sign in to comment.