Skip to content

Commit

Permalink
write epoch based data wip
Browse files Browse the repository at this point in the history
  • Loading branch information
manuelsc committed Mar 6, 2024
1 parent 666718b commit c91aeb6
Show file tree
Hide file tree
Showing 27 changed files with 695 additions and 256 deletions.
73 changes: 55 additions & 18 deletions backend/cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,55 @@ func main() {
log.InfoWithFields(log.Fields{"config": *configPath, "version": version.Version, "commit": version.GitCommit, "chainName": utils.Config.Chain.ClConfig.ConfigName}, "starting")

wg := &sync.WaitGroup{}
if !cfg.JustV2 {
wg.Add(1)
go func() {
defer wg.Done()
db.MustInitDB(&types.DatabaseConfig{
Username: cfg.WriterDatabase.Username,
Password: cfg.WriterDatabase.Password,
Name: cfg.WriterDatabase.Name,
Host: cfg.WriterDatabase.Host,
Port: cfg.WriterDatabase.Port,
MaxOpenConns: cfg.WriterDatabase.MaxOpenConns,
MaxIdleConns: cfg.WriterDatabase.MaxIdleConns,
SSL: cfg.WriterDatabase.SSL,
}, &types.DatabaseConfig{
Username: cfg.ReaderDatabase.Username,
Password: cfg.ReaderDatabase.Password,
Name: cfg.ReaderDatabase.Name,
Host: cfg.ReaderDatabase.Host,
Port: cfg.ReaderDatabase.Port,
MaxOpenConns: cfg.ReaderDatabase.MaxOpenConns,
MaxIdleConns: cfg.ReaderDatabase.MaxIdleConns,
SSL: cfg.ReaderDatabase.SSL,
})
}()
} else {
log.Warnf("------- EXPORTER RUNNING IN V2 ONLY MODE ------")
}

wg.Add(1)
go func() {
defer wg.Done()
db.MustInitDB(&types.DatabaseConfig{
Username: cfg.WriterDatabase.Username,
Password: cfg.WriterDatabase.Password,
Name: cfg.WriterDatabase.Name,
Host: cfg.WriterDatabase.Host,
Port: cfg.WriterDatabase.Port,
MaxOpenConns: cfg.WriterDatabase.MaxOpenConns,
MaxIdleConns: cfg.WriterDatabase.MaxIdleConns,
db.MustInitAlloyDb(&types.DatabaseConfig{
Username: cfg.AlloyWriter.Username,
Password: cfg.AlloyWriter.Password,
Name: cfg.AlloyWriter.Name,
Host: cfg.AlloyWriter.Host,
Port: cfg.AlloyWriter.Port,
MaxOpenConns: cfg.AlloyWriter.MaxOpenConns,
MaxIdleConns: cfg.AlloyWriter.MaxIdleConns,
SSL: cfg.AlloyWriter.SSL,
}, &types.DatabaseConfig{
Username: cfg.ReaderDatabase.Username,
Password: cfg.ReaderDatabase.Password,
Name: cfg.ReaderDatabase.Name,
Host: cfg.ReaderDatabase.Host,
Port: cfg.ReaderDatabase.Port,
MaxOpenConns: cfg.ReaderDatabase.MaxOpenConns,
MaxIdleConns: cfg.ReaderDatabase.MaxIdleConns,
Username: cfg.AlloyReader.Username,
Password: cfg.AlloyReader.Password,
Name: cfg.AlloyReader.Name,
Host: cfg.AlloyReader.Host,
Port: cfg.AlloyReader.Port,
MaxOpenConns: cfg.AlloyReader.MaxOpenConns,
MaxIdleConns: cfg.AlloyReader.MaxIdleConns,
SSL: cfg.AlloyReader.SSL,
})
}()

Expand Down Expand Up @@ -133,16 +163,23 @@ func main() {
log.Fatal(fmt.Errorf("no cache provider set, please set TierdCacheProvider (example redis)"), "", 0)
}

defer db.ReaderDb.Close()
defer db.WriterDb.Close()
if !cfg.JustV2 {
defer db.ReaderDb.Close()
defer db.WriterDb.Close()
}
defer db.AlloyReader.Close()
defer db.AlloyWriter.Close()
defer db.BigtableClient.Close()

context, err := modules.GetModuleContext()
if err != nil {
log.Fatal(err, "error getting module context", 0)
}

go services.StartHistoricPriceService()
if !cfg.JustV2 {
go services.StartHistoricPriceService()
}

go modules.StartAll(context)

// Keep the program alive until Ctrl+C is pressed
Expand Down
9 changes: 2 additions & 7 deletions backend/cmd/misc/commands/validator_stats_partition.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package commands

import (
"database/sql"
"flag"
"fmt"
"math"
Expand All @@ -10,6 +9,7 @@ import (

"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/utils"

"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand Down Expand Up @@ -195,12 +195,7 @@ func tableRenaming(currentTableName, destinationTableName string, numberOfPartit
if err != nil {
return errors.Wrap(err, "error starting transaction")
}
defer func() {
err := tx.Rollback()
if err != nil && !errors.Is(err, sql.ErrTxDone) {
log.Error(err, "error rolling back transaction", 0)
}
}()
defer utils.Rollback(tx)

// Sanity check same day height
err = sanityCheckIsSameExportedDay(tx, destinationTableName)
Expand Down
33 changes: 6 additions & 27 deletions backend/cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func main() {
log.Fatal(err, "error initializing bigtable", 0)
}

cl := consapi.NewNodeDataRetriever("http://" + cfg.Indexer.Node.Host + ":" + cfg.Indexer.Node.Port)
cl := consapi.NewClient("http://" + cfg.Indexer.Node.Host + ":" + cfg.Indexer.Node.Port)
nodeImpl, ok := cl.ClientInt.(*consapi.NodeClient)
if !ok {
log.Fatal(nil, "lighthouse client can only be used with real node impl", 0)
Expand Down Expand Up @@ -667,12 +667,7 @@ func migrateAppPurchases(appStoreSecret string) error {
if err != nil {
return fmt.Errorf("error starting db transactions: %w", err)
}
defer func() {
err := tx.Rollback()
if err != nil && !errors.Is(err, sql.ErrTxDone) {
log.Error(err, "error rolling back transaction", 0)
}
}()
defer utils.Rollback(tx)

// Delete marked as duplicate, though the duplicate reject reason is not always set - mainly missing on historical data
_, err = tx.Exec("DELETE FROM users_app_subscriptions WHERE store = 'ios-appstore' AND reject_reason = 'duplicate';")
Expand Down Expand Up @@ -848,16 +843,11 @@ func fixExecTransactionsCount() error {

log.Infof("dbUpdates: %v", len(dbUpdates))

tx, err := db.WriterDb.Begin()
tx, err := db.WriterDb.Beginx()
if err != nil {
return fmt.Errorf("error starting db transactions: %w", err)
}
defer func() {
err := tx.Rollback()
if err != nil && !errors.Is(err, sql.ErrTxDone) {
log.Error(err, "error rolling back transaction", 0)
}
}()
defer utils.Rollback(tx)

for b := 0; b < len(dbUpdates); b += int(batchSize) {
start := b
Expand All @@ -871,7 +861,6 @@ func fixExecTransactionsCount() error {
valueStrings = append(valueStrings, fmt.Sprintf("(%v,%v)", v.BlockNumber, v.ExecTxsCount))
}

//nolint:gosec
stmt := fmt.Sprintf(`
update blocks as a set exec_transactions_count = b.exec_transactions_count
from (values %s) as b(exec_block_number, exec_transactions_count)
Expand Down Expand Up @@ -1284,12 +1273,7 @@ func updateAPIKey(user uint64) error {
if err != nil {
return err
}
defer func() {
err := tx.Rollback()
if err != nil && !errors.Is(err, sql.ErrTxDone) {
log.Error(err, "error rolling back transaction", 0)
}
}()
defer utils.Rollback(tx)

_, err = tx.Exec(`UPDATE api_statistics set apikey = $1 where apikey = $2`, apiKey, u.OldKey)
if err != nil {
Expand Down Expand Up @@ -1867,12 +1851,7 @@ func UpdateValidatorStatisticsSyncData(day uint64, dryRun bool) error {
if err != nil {
return fmt.Errorf("error retrieving raw sql connection: %w", err)
}
defer func() {
err := tx.Rollback()
if err != nil && !errors.Is(err, sql.ErrTxDone) {
log.Error(err, "error rolling back transaction", 0)
}
}()
defer utils.Rollback(tx)

log.Infof("updating statistics data into the validator_stats table %v | %v", len(onlySyncCommitteeValidatorData), len(validatorData))

Expand Down
2 changes: 1 addition & 1 deletion backend/cmd/statistics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func main() {

chainID := new(big.Int).SetUint64(utils.Config.Chain.ClConfig.DepositChainID)
if utils.Config.Indexer.Node.Type == "lighthouse" {
cl := consapi.NewNodeDataRetriever("http://" + cfg.Indexer.Node.Host + ":" + cfg.Indexer.Node.Port)
cl := consapi.NewClient("http://" + cfg.Indexer.Node.Host + ":" + cfg.Indexer.Node.Port)
nodeImpl, ok := cl.ClientInt.(*consapi.NodeClient)
if !ok {
log.Fatal(nil, "lighthouse client can only be used with real node impl", 0)
Expand Down
42 changes: 42 additions & 0 deletions backend/db_migrations/20240304094628_dashboard_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS dashboard_data_epoch (
validatorindex int,
epoch int,
attestations_source_reward BIGINT,
attestations_target_reward BIGINT,
attestations_head_reward BIGINT,
attestations_inactivity_reward BIGINT,
attestations_inclusion_reward BIGINT,
attestations_reward BIGINT,
attestations_ideal_source_reward BIGINT,
attestations_ideal_target_reward BIGINT,
attestations_ideal_head_reward BIGINT,
attestations_ideal_inactivity_reward BIGINT,
attestations_ideal_inclusion_reward BIGINT,
attestations_ideal_reward BIGINT,
blocks_scheduled smallint,
blocks_proposed smallint,
blocks_cl_reward BIGINT, -- gwei
blocks_el_reward NUMERIC, -- wei
sync_scheduled smallint,
sync_executed smallint,
sync_rewards BIGINT,
slashed BOOLEAN,
balance_start BIGINT,
balance_end BIGINT,
deposits_count smallint,
deposits_amount BIGINT,
withdrawals_count smallint,
withdrawals_amount BIGINT
) PARTITION BY range (epoch);

create index on dashboard_data_epoch (validatorindex, epoch);

-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin

-- +goose StatementEnd
24 changes: 24 additions & 0 deletions backend/db_migrations/new.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash

# Ask for name of migration
echo "Enter name of migration file (for example add_validators_indices): "
read -r name

# This script creates a new migration file with the current timestamp
# as the filename prefix.
filename=$(date +"%Y%m%d%H%M%S")_$name.sql
touch $filename

cat <<EOF > $filename
-- +goose Up
-- +goose StatementBegin
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
-- +goose StatementEnd
EOF


10 changes: 7 additions & 3 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/aws/aws-sdk-go-v2 v1.25.0
github.com/aws/aws-sdk-go-v2/credentials v1.13.43
github.com/aws/aws-sdk-go-v2/service/s3 v1.49.0
github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df
github.com/coocood/freecache v1.2.4
github.com/davecgh/go-spew v1.1.1
github.com/donovanhide/eventsource v0.0.0-20210830082556-c59027999da0
Expand All @@ -31,9 +30,12 @@ require (
github.com/hashicorp/golang-lru v1.0.2
github.com/invopop/jsonschema v0.12.0
github.com/jackc/pgtype v1.14.2
github.com/jackc/pgx v3.6.2+incompatible
github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e
github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c
github.com/jackc/pgx/v5 v5.5.3
github.com/jmoiron/sqlx v1.3.5
github.com/juliangruber/go-intersect v1.1.0
github.com/jung-kurt/gofpdf v1.16.2
github.com/kelseyhightower/envconfig v1.4.0
github.com/klauspost/compress v1.17.2
Expand All @@ -51,7 +53,6 @@ require (
github.com/rs/cors v1.8.2
github.com/shopspring/decimal v1.3.1
github.com/sirupsen/logrus v1.9.3
github.com/streamonkey/size v0.0.1
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a
github.com/wealdtech/go-ens/v3 v3.6.0
github.com/wealdtech/go-eth2-types/v2 v2.8.2
Expand Down Expand Up @@ -145,13 +146,16 @@ require (
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
github.com/ipld/go-ipld-prime v0.20.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.1.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/juliangruber/go-intersect v1.1.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
Loading

0 comments on commit c91aeb6

Please sign in to comment.