Skip to content

Commit

Permalink
feat(raw): validate block data before writting
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
Tangui-Bitfly committed Jan 22, 2025
1 parent 8d66b70 commit b85737f
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 214 deletions.
244 changes: 79 additions & 165 deletions backend/cmd/evm_node_indexer/main.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/golang-jwt/jwt/v4 v4.5.1
github.com/gomodule/redigo v1.9.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/gorilla/csrf v1.7.2
github.com/gorilla/handlers v1.5.2
Expand Down
2 changes: 2 additions & 0 deletions backend/pkg/commons/chain/chainID.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type IDGetter struct {
Sepolia *big.Int
Gnosis *big.Int
Optimistic *big.Int
Arbitrum *big.Int
Holesky *big.Int
}

Expand All @@ -17,6 +18,7 @@ var DefaultIDs = IDGetter{
Sepolia: big.NewInt(11155111),
Gnosis: big.NewInt(100),
Optimistic: big.NewInt(10),
Arbitrum: big.NewInt(42161),
Holesky: big.NewInt(17000),
}

Expand Down
75 changes: 53 additions & 22 deletions backend/pkg/commons/db2/database/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,35 +43,65 @@ func Wrap(db *BigTable, table string) TableWrapper {
}

func (w TableWrapper) Add(key string, item Item, allowDuplicate bool) error {
return w.BigTable.Add(w.table, key, item, allowDuplicate)
if err := w.BigTable.Add(w.table, key, item, allowDuplicate); err != nil {
return fmt.Errorf("table %s: %w", w.table, err)
}
return nil
}

func (w TableWrapper) Read(prefix string) ([]Row, error) {
return w.BigTable.Read(w.table, prefix)
res, err := w.BigTable.Read(w.table, prefix)
if err != nil {
return nil, fmt.Errorf("table %s: %w", w.table, err)
}
return res, nil
}

func (w TableWrapper) GetLatestValue(key string) (*Row, error) {
return w.BigTable.GetLatestValue(w.table, key)
res, err := w.BigTable.GetLatestValue(w.table, key)
if err != nil {
return nil, fmt.Errorf("table %s: %w", w.table, err)
}
return res, nil
}

func (w TableWrapper) GetRow(key string) (*Row, error) {
return w.BigTable.GetRow(w.table, key)
res, err := w.BigTable.GetRow(w.table, key)
if err != nil {
return nil, fmt.Errorf("table %s: %w", w.table, err)
}
return res, nil
}

func (w TableWrapper) GetRowKeys(prefix string, opts ...Option) ([]string, error) {
return w.BigTable.GetRowKeys(w.table, prefix, opts...)
res, err := w.BigTable.GetRowKeys(w.table, prefix, opts...)
if err != nil {
return nil, fmt.Errorf("table %s: %w", w.table, err)
}
return res, nil
}

func (w TableWrapper) BulkAdd(itemsByKey map[string][]Item, opts ...Option) error {
return w.BigTable.BulkAdd(w.table, itemsByKey, opts...)
if err := w.BigTable.BulkAdd(w.table, itemsByKey, opts...); err != nil {
return fmt.Errorf("table %s: %w", w.table, err)
}
return nil
}

func (w TableWrapper) GetRowsRange(high, low string, opts ...Option) ([]Row, error) {
return w.BigTable.GetRowsRange(w.table, high, low, opts...)
res, err := w.BigTable.GetRowsRange(w.table, high, low, opts...)
if err != nil {
return nil, fmt.Errorf("table %s: %w", w.table, err)
}
return res, nil
}

func (w TableWrapper) GetRowsWithKeys(keys []string) ([]Row, error) {
return w.BigTable.GetRowsWithKeys(w.table, keys)
res, err := w.BigTable.GetRowsWithKeys(w.table, keys)
if err != nil {
return nil, fmt.Errorf("table %s: %w", w.table, err)
}
return res, nil
}

// BigTable is a wrapper around Google Cloud Bigtable for storing and retrieving data
Expand All @@ -91,6 +121,7 @@ func NewBigTableWithClient(ctx context.Context, client *bigtable.Client, adminCl

// NewBigTable initializes a new BigTable
// It returns a BigTable and an error if any part of the setup fails
// if tablesAndFamilies is not nil it will try to create the associated tables and families if not already presents
func NewBigTable(project, instance string, tablesAndFamilies map[string][]string, options ...option.ClientOption) (*BigTable, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
Expand Down Expand Up @@ -124,27 +155,27 @@ func createTableAndFamilies(ctx context.Context, admin *bigtable.AdminClient, ta
// Get the list of existing tables
tables, err := admin.Tables(ctx)
if err != nil {
return fmt.Errorf("could not fetch table list: %v", err)
return fmt.Errorf("could not fetch table list: %w", err)
}

// Create the table if it doesn't exist
if !slices.Contains(tables, tableName) {
if err := admin.CreateTable(ctx, tableName); err != nil {
return fmt.Errorf("could not create table %s: %v", tableName, err)
return fmt.Errorf("could not create table %s: %w", tableName, err)
}
}

// Retrieve information about the table
tblInfo, err := admin.TableInfo(ctx, tableName)
if err != nil {
return fmt.Errorf("could not read info for table %s: %v", tableName, err)
return fmt.Errorf("could not read info for table %s: %w", tableName, err)
}

for _, familyName := range familyNames {
// Create the column family if it doesn't exist
if !slices.Contains(tblInfo.Families, familyName) {
if err := admin.CreateColumnFamily(ctx, tableName, familyName); err != nil {
return fmt.Errorf("could not create column family %s: %v", familyName, err)
return fmt.Errorf("could not create column family %s: %w", familyName, err)
}
}
}
Expand Down Expand Up @@ -211,7 +242,7 @@ func (b BigTable) Add(table, key string, item Item, allowDuplicate bool) error {
}
// Apply the mutation to the table using the given key
if err := tbl.Apply(ctx, key, mut); err != nil {
return fmt.Errorf("could not apply row mutation: %v", err)
return fmt.Errorf("could not apply row mutation: %w", err)
}
return nil
}
Expand Down Expand Up @@ -240,7 +271,7 @@ func (b BigTable) Read(table, prefix string) ([]Row, error) {
return true
})
if err != nil {
return nil, fmt.Errorf("could not read rows: %v", err)
return nil, fmt.Errorf("could not read rows: %w", err)
}

return rows, nil
Expand Down Expand Up @@ -268,7 +299,7 @@ func (b BigTable) GetLatestValue(table, key string) (*Row, error) {
})

if err != nil {
return nil, fmt.Errorf("could not read rows: %v", err)
return nil, fmt.Errorf("could not read rows: %w", err)
}

return &data, nil
Expand All @@ -283,7 +314,7 @@ func (b BigTable) GetRow(table, key string) (*Row, error) {
var data *Row
row, err := tbl.ReadRow(ctx, key)
if err != nil {
return nil, fmt.Errorf("could not read row: %v", err)
return nil, fmt.Errorf("could not read row: %w", err)
}
if row == nil {
return nil, ErrNotFound
Expand Down Expand Up @@ -338,7 +369,7 @@ func (b BigTable) GetRowsRange(table, high, low string, opts ...Option) ([]Row,
return true
}, readOptions...)
if err != nil {
return nil, fmt.Errorf("could not read rows: %v", err)
return nil, fmt.Errorf("could not read rows: %w", err)
}
if len(data) == 0 {
return nil, ErrNotFound
Expand All @@ -362,7 +393,7 @@ func (b BigTable) GetRowKeys(table, prefix string, opts ...Option) ([]string, er
return true
}, readOptions...)
if err != nil {
return nil, fmt.Errorf("could not read rows: %v", err)
return nil, fmt.Errorf("could not read rows: %w", err)
}

return data, nil
Expand All @@ -389,7 +420,7 @@ func (b BigTable) GetRowsWithKeys(table string, keys []string) ([]Row, error) {
})

if err != nil {
return nil, fmt.Errorf("could not read rows: %v", err)
return nil, fmt.Errorf("could not read rows: %w", err)
}
if len(data) == 0 {
return nil, ErrNotFound
Expand All @@ -408,7 +439,7 @@ func (b BigTable) Clear() error {
}
for _, table := range tables {
if err := b.admin.DropAllRows(ctx, table); err != nil {
return fmt.Errorf("could not drop all rows: %v", err)
return fmt.Errorf("could not drop all rows: %w", err)
}
}
return nil
Expand All @@ -421,11 +452,11 @@ func (b BigTable) Close() error {
return fmt.Errorf("cannot close client: bigtable client is nil")
}
if err := b.client.Close(); err != nil && status.Code(err) != codes.Canceled {
return fmt.Errorf("cannot close client: %v", err)
return fmt.Errorf("cannot close client: %w", err)
}
if b.admin != nil {
if err := b.admin.Close(); err != nil && status.Code(err) != codes.Canceled {
return fmt.Errorf("cannot close admin client: %v", err)
return fmt.Errorf("cannot close admin client: %w", err)
}
}
return nil
Expand Down
7 changes: 7 additions & 0 deletions backend/pkg/commons/db2/database/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package database
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -72,6 +73,9 @@ func (api RemoteServer) GetRowsRange(w http.ResponseWriter, r *http.Request) {
}
rows, err := api.db.GetRowsRange(args.High, args.Low, WithOpenRange(args.OpenRange), WithLimit(args.Limit))
if err != nil {
if errors.Is(err, ErrNotFound) {
err = ErrNotFound
}
respondWithErr(w, http.StatusInternalServerError, err)
return
}
Expand All @@ -90,6 +94,9 @@ func (api RemoteServer) GetRow(w http.ResponseWriter, r *http.Request) {
}
row, err := api.db.GetRow(args.Key)
if err != nil {
if errors.Is(err, ErrNotFound) {
err = ErrNotFound
}
respondWithErr(w, http.StatusInternalServerError, err)
return
}
Expand Down
6 changes: 3 additions & 3 deletions backend/pkg/commons/db2/raw/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestBigTableClientRealCondition(t *testing.T) {
}

rawStore := NewStore(database.Wrap(bt, Table))
rpcClient, err := rpc.DialOptions(context.Background(), "http://foo.bar", rpc.WithHTTPClient(&http.Client{
rpcClient, err := rpc.DialOptions(context.Background(), "https://foo.bar", rpc.WithHTTPClient(&http.Client{
Transport: NewBigTableEthRaw(rawStore, chainID),
}))
if err != nil {
Expand Down Expand Up @@ -131,7 +131,7 @@ func BenchmarkRawBigTable(b *testing.B) {
}

rawStore := WithCache(NewStore(database.Wrap(bt, Table)))
rpcClient, err := rpc.DialOptions(context.Background(), "http://foo.bar", rpc.WithHTTPClient(&http.Client{
rpcClient, err := rpc.DialOptions(context.Background(), "https://foo.bar", rpc.WithHTTPClient(&http.Client{
Transport: NewBigTableEthRaw(rawStore, chainID),
}))
if err != nil {
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestBigTableClient(t *testing.T) {
t.Fatal(err)
}

rpcClient, err := rpc.DialOptions(context.Background(), "http://foo.bar", rpc.WithHTTPClient(&http.Client{
rpcClient, err := rpc.DialOptions(context.Background(), "https://foo.bar", rpc.WithHTTPClient(&http.Client{
Transport: NewBigTableEthRaw(WithCache(rawStore), tt.block.ChainID),
}))
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions backend/pkg/commons/db2/raw/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/gobitfly/beaconchain/pkg/commons/db2/database"
"github.com/gobitfly/beaconchain/pkg/commons/hexutil"
"github.com/gobitfly/beaconchain/pkg/commons/log"
)

type compressor interface {
Expand All @@ -30,6 +29,9 @@ func NewStore(store database.Database) Store {
func (store Store) AddBlocks(blocks []FullBlockData) error {
itemsByKey := make(map[string][]database.Item)
for _, fullBlock := range blocks {
if err := validateBlock(fullBlock); err != nil {
return fmt.Errorf("block %d: %w", fullBlock.BlockNumber, err)
}
if len(fullBlock.Block) == 0 || len(fullBlock.BlockTxs) != 0 && len(fullBlock.Traces) == 0 {
return fmt.Errorf("block %d: empty data", fullBlock.BlockNumber)
}
Expand Down Expand Up @@ -64,10 +66,6 @@ func (store Store) AddBlocks(blocks []FullBlockData) error {
Data: traces,
},
}
if len(fullBlock.Receipts) == 0 {
// todo move that log higher up
log.Warn(fmt.Sprintf("empty receipts at block %d lRec %d lTxs %d", fullBlock.BlockNumber, len(fullBlock.Receipts), len(fullBlock.BlockTxs)))
}
if fullBlock.BlockUnclesCount > 0 {
uncles, err := store.compressor.compress(fullBlock.Uncles)
if err != nil {
Expand All @@ -80,7 +78,10 @@ func (store Store) AddBlocks(blocks []FullBlockData) error {
})
}
}
return store.db.BulkAdd(itemsByKey)
if err := store.db.BulkAdd(itemsByKey); err != nil {
return fmt.Errorf("cannot add blocks [%d-%d] to database: %w", blocks[0].BlockNumber, blocks[len(blocks)-1].BlockNumber, err)
}
return nil
}

func (store Store) ReadBlockByNumber(chainID uint64, number int64) (*FullBlockData, error) {
Expand Down
Loading

0 comments on commit b85737f

Please sign in to comment.