Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(BEDS-536): add re-index-blocks cmd to fix internal transfers status #2965

Open
wants to merge 45 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
190f40e
fix(internal transaction): recursively revert traces
Tangui-Bitfly Oct 1, 2024
43144fb
fix: SQLReaderDb interface missing methods
Tangui-Bitfly Oct 2, 2024
24087a7
fix(eth1tx): remove transfers if reverted
Tangui-Bitfly Oct 2, 2024
5a4c542
fix(internal tx): only save the highest root revert
Tangui-Bitfly Oct 2, 2024
983aea0
ci lint
Tangui-Bitfly Oct 2, 2024
19e6108
feat: updated eth1 proto file
Monika-Bitfly Oct 4, 2024
a2d8210
feat: updated Eth1InternalTransaction in eth1.proto file
Monika-Bitfly Oct 4, 2024
542bc39
feat: added Reverted value to Eth1InternalTransactionIndexed
Monika-Bitfly Oct 4, 2024
22303e9
feat: add check for internal txs while querying the block from erigon…
Monika-Bitfly Oct 4, 2024
3ec42b9
feat: updated reverted internal txs check logic
Monika-Bitfly Oct 4, 2024
bb1d8ab
feat(types): add status enum and add status partially executed
Tangui-Bitfly Oct 7, 2024
bb5ec81
refactor(client/erigon): simplify transaction indexing flow
Tangui-Bitfly Oct 7, 2024
1127a6f
feat(client/erigon): rework geth traces
Tangui-Bitfly Oct 7, 2024
c1a27cd
fix: lint
Tangui-Bitfly Oct 7, 2024
2a0a2fa
feat: add status to Eth1TransactionIndexed msg and update hash format…
Monika-Bitfly Oct 7, 2024
87bc9a3
fix: updated internal tx handling in GetBlock func
Monika-Bitfly Oct 7, 2024
93e492a
fix: updated internal txs parsing in GetBlock
Monika-Bitfly Oct 8, 2024
793b638
fix: optimise the memory of internla tx parsing
Monika-Bitfly Oct 8, 2024
12aba2f
Merge branch 'BEDS-536/fix-internal-transfers-display' of github.com:…
Monika-Bitfly Oct 8, 2024
057c25d
lint
Tangui-Bitfly Oct 8, 2024
abaaae2
fix(internal): remove revert + status logic from client to transformers
Tangui-Bitfly Oct 9, 2024
0dcaa74
test(internal): test for revert transformer on tx and itx
Tangui-Bitfly Oct 9, 2024
9ad4dde
feat(cmd): add re-index-blocks cmd
Tangui-Bitfly Oct 10, 2024
acd149d
chore(proto): clean proto Eth1Transaction and Eth1TransactionIndexed
Tangui-Bitfly Oct 10, 2024
5c033fa
fix(TransformItx): empty revertSource + error before skipping
Tangui-Bitfly Oct 10, 2024
20a211e
feat(BEDS-536): implement misc `fix-internal-txs-from-node` cmd with …
Monika-Bitfly Oct 23, 2024
357057a
(BEDS-536) Use raw db for resync (#2972)
Tangui-Bitfly Oct 29, 2024
34ce72c
fix ci
Tangui-Bitfly Oct 29, 2024
02926ed
store/bigtable: fix range limits
Tangui-Bitfly Oct 30, 2024
e4883a5
store/bigtable: fix grpc error on close
Tangui-Bitfly Oct 30, 2024
e272839
updated Receipts len check
Monika-Bitfly Oct 30, 2024
335b29c
updated traceMode to geth
Monika-Bitfly Oct 30, 2024
33eb338
fix ci
Tangui-Bitfly Oct 30, 2024
c14af4b
clenup
Monika-Bitfly Oct 30, 2024
6bb6221
rpc/erigon: parse traces geth handle CALLCODE
Tangui-Bitfly Nov 4, 2024
7056058
fix(transform itx): allow internal index == ITX_PER_TX_LIMIT
Tangui-Bitfly Nov 18, 2024
caf4ce3
Merge remote-tracking branch 'origin/master' into BEDS-536/fix-intern…
guybrush Nov 19, 2024
5b7b9d2
fix(bigtable): retry on grpc internal err
Tangui-Bitfly Nov 26, 2024
da00e8d
fix(re index): log error rather than returning error and panicking
Tangui-Bitfly Nov 27, 2024
e8e2ee0
feat(re index): re print read error at the end
Tangui-Bitfly Nov 28, 2024
3e3f64d
fix(TransformEnsNameRegistered): return none nil if ignored chainID
Tangui-Bitfly Nov 29, 2024
0db781a
fix(db2/WithFallback): fallback on syscall.ECONNRESET
Tangui-Bitfly Nov 29, 2024
d542378
fix(blockHash): prevent wrong calculated hash
Tangui-Bitfly Dec 20, 2024
f65de95
fix: return error if mismatch between receipts and transactions length
Tangui-Bitfly Jan 13, 2025
3321f74
fix: blockhash read from node response
Tangui-Bitfly Jan 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion cache/tiered_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/sirupsen/logrus"
)

var _ TieredCacher = (*tieredCache)(nil)

// Tiered cache is a cache implementation combining a
type tieredCache struct {
localGoCache *freecache.Cache
Expand All @@ -31,7 +33,19 @@ type RemoteCache interface {
GetBool(ctx context.Context, key string) (bool, error)
}

var TieredCache *tieredCache
type TieredCacher interface {
Set(key string, value interface{}, expiration time.Duration) error
SetString(key string, value string, expiration time.Duration) error
SetUint64(key string, value uint64, expiration time.Duration) error
SetBool(key string, value bool, expiration time.Duration) error

GetStringWithLocalTimeout(key string, localExpiration time.Duration) (string, error)
GetUint64WithLocalTimeout(key string, localExpiration time.Duration) (uint64, error)
GetBoolWithLocalTimeout(key string, localExpiration time.Duration) (bool, error)
GetWithLocalTimeout(key string, localExpiration time.Duration, returnValue interface{}) (interface{}, error)
}

var TieredCache TieredCacher

func MustInitTieredCache(redisAddress string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
Expand Down
4 changes: 2 additions & 2 deletions cmd/eth1indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
offsetBlocks := flag.Int64("blocks.offset", 100, "Blocks offset")
checkBlocksGaps := flag.Bool("blocks.gaps", false, "Check for gaps in the blocks table")
checkBlocksGapsLookback := flag.Int("blocks.gaps.lookback", 1000000, "Lookback for gaps check of the blocks table")
traceMode := flag.String("blocks.tracemode", "parity/geth", "Trace mode to use, can bei either 'parity', 'geth' or 'parity/geth' for both")
traceMode := flag.String("blocks.tracemode", "geth", "Trace mode to use, can bei either 'parity', 'geth' or 'parity/geth' for both")

concurrencyData := flag.Int64("data.concurrency", 30, "Concurrency to use when indexing data from bigtable")
startData := flag.Int64("data.start", 0, "Block to start indexing")
Expand Down Expand Up @@ -187,7 +187,7 @@ func main() {
return
}

transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
transforms := make([]db.TransformFunc, 0)
transforms = append(transforms,
bt.TransformBlock,
bt.TransformTx,
Expand Down
204 changes: 182 additions & 22 deletions cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package main
import (
"bytes"
"context"

"database/sql"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"math"
"math/big"
Expand All @@ -17,6 +17,7 @@ import (
"time"

"firebase.google.com/go/v4/messaging"

"github.com/gobitfly/eth2-beaconchain-explorer/cmd/misc/commands"
"github.com/gobitfly/eth2-beaconchain-explorer/db"
"github.com/gobitfly/eth2-beaconchain-explorer/exporter"
Expand All @@ -27,19 +28,15 @@ import (
"github.com/gobitfly/eth2-beaconchain-explorer/utils"
"github.com/gobitfly/eth2-beaconchain-explorer/version"

"github.com/Gurpartap/storekit-go"
"github.com/coocood/freecache"
"github.com/ethereum/go-ethereum/common"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/pkg/errors"
utilMath "github.com/protolambda/zrnt/eth2/util/math"
"github.com/sirupsen/logrus"
go_ens "github.com/wealdtech/go-ens/v3"
"golang.org/x/sync/errgroup"

"flag"

"github.com/Gurpartap/storekit-go"

"github.com/sirupsen/logrus"
)

var opts = struct {
Expand Down Expand Up @@ -77,7 +74,7 @@ func main() {
statsPartitionCommand := commands.StatsMigratorCommand{}

configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, disable-user-per-email, validate-firebase-tokens")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, re-index-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, disable-user-per-email, validate-firebase-tokens")
flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
flag.Uint64Var(&opts.User, "user", 0, "user id")
Expand Down Expand Up @@ -320,6 +317,8 @@ func main() {
exportHistoricPrices(opts.StartDay, opts.EndDay)
case "index-missing-blocks":
indexMissingBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient)
case "re-index-blocks":
reIndexBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient, opts.Transformers, opts.BatchSize, opts.DataConcurrency)
case "migrate-last-attestation-slot-bigtable":
migrateLastAttestationSlotToBigtable()
case "migrate-app-purchases":
Expand Down Expand Up @@ -440,6 +439,8 @@ func main() {
err = disableUserPerEmail()
case "fix-epochs":
err = fixEpochs()
case "fix-internal-txs-from-node":
fixInternalTxsFromNode(opts.StartBlock, opts.EndBlock, opts.BatchSize, opts.DataConcurrency, bt)
case "validate-firebase-tokens":
err = validateFirebaseTokens()
default:
Expand Down Expand Up @@ -544,6 +545,52 @@ func disableUserPerEmail() error {
return nil
}

func fixInternalTxsFromNode(startBlock, endBlock, batchSize, concurrency uint64, bt *db.Bigtable) {
if endBlock > 0 && endBlock < startBlock {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0)
return
}

if concurrency == 0 {
utils.LogError(nil, "concurrency must be greater than 0", 0)
return
}
if bt == nil {
utils.LogError(nil, "no bigtable provided", 0)
return
}

transformers := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
transformers = append(transformers, bt.TransformBlock, bt.TransformTx, bt.TransformItx)

to := endBlock
if endBlock == math.MaxInt64 {
lastBlockFromBlocksTable, err := bt.GetLastBlockInBlocksTable()
if err != nil {
utils.LogError(err, "error retrieving last blocks from blocks table", 0)
return
}

to = uint64(lastBlockFromBlocksTable)
}

cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
blockCount := utilMath.MaxU64(1, batchSize)

logrus.Infof("Starting to reindex all txs for blocks ranging from %d to %d", startBlock, to)
for from := startBlock; from <= to; from = from + blockCount {
toBlock := utilMath.MinU64(to, from+blockCount-1)

logrus.Infof("reindexing txs for blocks from height %v to %v in data table ...", from, toBlock)
err := bt.ReindexITxsFromNode(int64(from), int64(toBlock), int64(batchSize), int64(concurrency), transformers, cache)
if err != nil {
utils.LogError(err, "error indexing from bigtable", 0)
}
cache.Clear()

}
}

func fixEns(erigonClient *rpc.ErigonClient) error {
logrus.WithField("dry", opts.DryRun).Infof("command: fix-ens")
addrs := []struct {
Expand Down Expand Up @@ -1082,7 +1129,7 @@ func debugBlocks() error {
}
// logrus.WithFields(logrus.Fields{"block": i, "data": fmt.Sprintf("%+v", b)}).Infof("block from bt")

elBlock, _, err := elClient.GetBlock(int64(i), "parity/geth")
elBlock, _, err := elClient.GetBlock(int64(i), "geth")
if err != nil {
return err
}
Expand Down Expand Up @@ -1550,7 +1597,7 @@ func indexMissingBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.E
if _, err := db.BigtableClient.GetBlockFromBlocksTable(block); err != nil {
logrus.Infof("could not load [%v] from blocks table, will try to fetch it from the node and save it", block)

bc, _, err := client.GetBlock(int64(block), "parity/geth")
bc, _, err := client.GetBlock(int64(block), "geth")
if err != nil {
utils.LogError(err, fmt.Sprintf("error getting block %v from the node", block), 0)
return
Expand All @@ -1568,35 +1615,127 @@ func indexMissingBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.E
}
}

func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, concurrency uint64, transformerFlag string, bt *db.Bigtable, client *rpc.ErigonClient) {
if endBlock > 0 && endBlock < startBlock {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0)
// Goes through the blocks in the given range from [start] to [end] and re indexes them with the provided transformers
//
// Both [start] and [end] are inclusive
// Pass math.MaxInt64 as [end] to export from [start] to the last block in the blocks table
func reIndexBlocks(start uint64, end uint64, bt *db.Bigtable, client *rpc.ErigonClient, transformerFlag string, batchSize uint64, concurrency uint64) {
if start > 0 && end < start {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", end, start), 0)
return
}
if concurrency == 0 {
utils.LogError(nil, "concurrency must be greater than 0", 0)
return
}
if bt == nil {
utils.LogError(nil, "no bigtable provided", 0)
if end == math.MaxInt64 {
lastBlockFromBlocksTable, err := bt.GetLastBlockInBlocksTable()
if err != nil {
logrus.Errorf("error retrieving last blocks from blocks table: %v", err)
return
}
end = uint64(lastBlockFromBlocksTable)
}
transformers, importENSChanges, err := getTransformers(transformerFlag, bt)
if err != nil {
utils.LogError(nil, err, 0)
return
}
if importENSChanges {
if err := bt.ImportEnsUpdates(client.GetNativeClient(), math.MaxInt64); err != nil {
utils.LogError(err, "error importing ens from events", 0)
return
}
}

readGroup := errgroup.Group{}
readGroup.SetLimit(int(concurrency))

writeGroup := errgroup.Group{}
writeGroup.SetLimit(int(concurrency*concurrency) + 1)

cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
quit := make(chan struct{})

sink := make(chan *types.Eth1Block)
writeGroup.Go(func() error {
for {
select {
case block, ok := <-sink:
if !ok {
return nil
}
writeGroup.Go(func() error {
if err := bt.SaveBlock(block); err != nil {
return fmt.Errorf("error saving block %v: %w", block.Number, err)
}
err := bt.IndexBlocksWithTransformers([]*types.Eth1Block{block}, transformers, cache)
if err != nil {
return fmt.Errorf("error indexing from bigtable: %w", err)
}
logrus.Infof("%d indexed", block.Number)
return nil
})
case <-quit:
return nil
}
}
})

var errs []error
var mu sync.Mutex
for i := start; i <= end; i = i + batchSize {
height := int64(i)
readGroup.Go(func() error {
heightEnd := height + int64(batchSize) - 1
if heightEnd > int64(end) {
heightEnd = int64(end)
}
blocks, err := client.GetBlocks(height, heightEnd, "geth")
if err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("cannot read block range %d-%d: %w", height, heightEnd, err))
mu.Unlock()
logrus.WithFields(map[string]interface{}{
"message": err.Error(),
"start": height,
"end": heightEnd,
}).Error("cannot read block range")
return nil
}
for _, block := range blocks {
sink <- block
}
return nil
})
}
if err := readGroup.Wait(); err != nil {
panic(err)
}
for _, err := range errs {
logrus.Error(err.Error())
}
quit <- struct{}{}
close(sink)
if err := writeGroup.Wait(); err != nil {
panic(err)
}
}

transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
func getTransformers(transformerFlag string, bt *db.Bigtable) ([]db.TransformFunc, bool, error) {
transforms := make([]db.TransformFunc, 0)

logrus.Infof("transformerFlag: %v", transformerFlag)
transformerList := strings.Split(transformerFlag, ",")
if transformerFlag == "all" {
transformerList = []string{"TransformBlock", "TransformTx", "TransformBlobTx", "TransformItx", "TransformERC20", "TransformERC721", "TransformERC1155", "TransformWithdrawals", "TransformUncle", "TransformEnsNameRegistered", "TransformContract"}
} else if len(transformerList) == 0 {
utils.LogError(nil, "no transformer functions provided", 0)
return
return nil, false, fmt.Errorf("no transformer functions provided")
}
logrus.Infof("transformers: %v", transformerList)

importENSChanges := false
/**
* Add additional transformers you want to sync to this switch case
**/
for _, t := range transformerList {
switch t {
case "TransformBlock":
Expand All @@ -1623,10 +1762,31 @@ func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, co
case "TransformContract":
transforms = append(transforms, bt.TransformContract)
default:
utils.LogError(nil, "Invalid transformer flag %v", 0)
return
return nil, false, fmt.Errorf("invalid transformer flag %v", t)
}
}
return transforms, importENSChanges, nil
}

func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, concurrency uint64, transformerFlag string, bt *db.Bigtable, client *rpc.ErigonClient) {
if endBlock > 0 && endBlock < startBlock {
utils.LogError(nil, fmt.Sprintf("endBlock [%v] < startBlock [%v]", endBlock, startBlock), 0)
return
}
if concurrency == 0 {
utils.LogError(nil, "concurrency must be greater than 0", 0)
return
}
if bt == nil {
utils.LogError(nil, "no bigtable provided", 0)
return
}

transforms, importENSChanges, err := getTransformers(transformerFlag, bt)
if err != nil {
utils.LogError(nil, err, 0)
return
}

cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit

Expand Down
38 changes: 38 additions & 0 deletions cmd/store/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"errors"
"flag"
"net/http"

"github.com/sirupsen/logrus"

"github.com/gobitfly/eth2-beaconchain-explorer/db2"
"github.com/gobitfly/eth2-beaconchain-explorer/db2/store"
"github.com/gobitfly/eth2-beaconchain-explorer/types"
"github.com/gobitfly/eth2-beaconchain-explorer/utils"
)

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.Parse()

cfg := &types.Config{}
err := utils.ReadConfig(cfg, *configPath)
if err != nil {
panic(err)
}

bt, err := store.NewBigTable(cfg.RawBigtable.Bigtable.Project, cfg.RawBigtable.Bigtable.Instance, nil)
if err != nil {
panic(err)
}
remote := store.NewRemoteStore(store.Wrap(bt, db2.BlocksRawTable, ""))
go func() {
logrus.Info("starting remote raw store on port 8087")
if err := http.ListenAndServe("0.0.0.0:8087", remote.Routes()); err != nil && !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}()
utils.WaitForCtrlC()
}
Loading