Skip to content

Commit

Permalink
Merge pull request #566 from gobitfly/BIDS-3208/dashboard_notificatio…
Browse files Browse the repository at this point in the history
…ns_backend

Bids 3208/dashboard notifications backend
  • Loading branch information
peterbitfly authored Sep 24, 2024
2 parents 4e76b22 + 1ef7ac8 commit 267a0f9
Show file tree
Hide file tree
Showing 11 changed files with 1,020 additions and 1,197 deletions.
87 changes: 65 additions & 22 deletions backend/cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/coocood/freecache"
"github.com/davecgh/go-spew/spew"
"github.com/ethereum/go-ethereum/common"
"github.com/go-redis/redis/v8"
"github.com/gobitfly/beaconchain/cmd/misc/commands"
Expand All @@ -32,6 +33,7 @@ import (
edb "github.com/gobitfly/beaconchain/pkg/exporter/db"
"github.com/gobitfly/beaconchain/pkg/exporter/modules"
"github.com/gobitfly/beaconchain/pkg/exporter/services"
"github.com/gobitfly/beaconchain/pkg/notification"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/pkg/errors"
utilMath "github.com/protolambda/zrnt/eth2/util/math"
Expand Down Expand Up @@ -75,7 +77,7 @@ func Run() {
}

configPath := fs.String("config", "config/default.config.yml", "Path to the config file")
fs.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases")
fs.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, collect-notifications, collect-user-db-notifications")
fs.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
fs.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
fs.Uint64Var(&opts.User, "user", 0, "user id")
Expand Down Expand Up @@ -181,27 +183,27 @@ func Run() {
defer db.FrontendWriterDB.Close()

// clickhouse
db.ClickHouseWriter, db.ClickHouseReader = db.MustInitDB(&types.DatabaseConfig{
Username: cfg.ClickHouse.WriterDatabase.Username,
Password: cfg.ClickHouse.WriterDatabase.Password,
Name: cfg.ClickHouse.WriterDatabase.Name,
Host: cfg.ClickHouse.WriterDatabase.Host,
Port: cfg.ClickHouse.WriterDatabase.Port,
MaxOpenConns: cfg.ClickHouse.WriterDatabase.MaxOpenConns,
SSL: true,
MaxIdleConns: cfg.ClickHouse.WriterDatabase.MaxIdleConns,
}, &types.DatabaseConfig{
Username: cfg.ClickHouse.ReaderDatabase.Username,
Password: cfg.ClickHouse.ReaderDatabase.Password,
Name: cfg.ClickHouse.ReaderDatabase.Name,
Host: cfg.ClickHouse.ReaderDatabase.Host,
Port: cfg.ClickHouse.ReaderDatabase.Port,
MaxOpenConns: cfg.ClickHouse.ReaderDatabase.MaxOpenConns,
SSL: true,
MaxIdleConns: cfg.ClickHouse.ReaderDatabase.MaxIdleConns,
}, "clickhouse", "clickhouse")
defer db.ClickHouseReader.Close()
defer db.ClickHouseWriter.Close()
// db.ClickHouseWriter, db.ClickHouseReader = db.MustInitDB(&types.DatabaseConfig{
// Username: cfg.ClickHouse.WriterDatabase.Username,
// Password: cfg.ClickHouse.WriterDatabase.Password,
// Name: cfg.ClickHouse.WriterDatabase.Name,
// Host: cfg.ClickHouse.WriterDatabase.Host,
// Port: cfg.ClickHouse.WriterDatabase.Port,
// MaxOpenConns: cfg.ClickHouse.WriterDatabase.MaxOpenConns,
// SSL: true,
// MaxIdleConns: cfg.ClickHouse.WriterDatabase.MaxIdleConns,
// }, &types.DatabaseConfig{
// Username: cfg.ClickHouse.ReaderDatabase.Username,
// Password: cfg.ClickHouse.ReaderDatabase.Password,
// Name: cfg.ClickHouse.ReaderDatabase.Name,
// Host: cfg.ClickHouse.ReaderDatabase.Host,
// Port: cfg.ClickHouse.ReaderDatabase.Port,
// MaxOpenConns: cfg.ClickHouse.ReaderDatabase.MaxOpenConns,
// SSL: true,
// MaxIdleConns: cfg.ClickHouse.ReaderDatabase.MaxIdleConns,
// }, "clickhouse", "clickhouse")
// defer db.ClickHouseReader.Close()
// defer db.ClickHouseWriter.Close()

// Initialize the persistent redis client
rdc := redis.NewClient(&redis.Options{
Expand All @@ -216,6 +218,14 @@ func Run() {
db.PersistentRedisDbClient = rdc
defer db.PersistentRedisDbClient.Close()

if utils.Config.TieredCacheProvider != "redis" {
log.Fatal(nil, "no cache provider set, please set TierdCacheProvider (redis)", 0)
}
if utils.Config.TieredCacheProvider == "redis" || len(utils.Config.RedisCacheEndpoint) != 0 {
cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint)
log.Infof("tiered Cache initialized, latest finalized epoch: %v", cache.LatestFinalizedEpoch.Get())
}

switch opts.Command {
case "nameValidatorsByRanges":
err := nameValidatorsByRanges(opts.ValidatorNameRanges)
Expand Down Expand Up @@ -456,6 +466,10 @@ func Run() {
err = fixEns(erigonClient)
case "fix-ens-addresses":
err = fixEnsAddresses(erigonClient)
case "collect-notifications":
err = collectNotifications(opts.StartEpoch)
case "collect-user-db-notifications":
err = collectUserDbNotifications(opts.StartEpoch)
default:
log.Fatal(nil, fmt.Sprintf("unknown command %s", opts.Command), 0)
}
Expand All @@ -467,6 +481,35 @@ func Run() {
}
}

func collectNotifications(startEpoch uint64) error {
epoch := startEpoch

log.Infof("collecting notifications for epoch %v", epoch)
notifications, err := notification.GetNotificationsForEpoch(utils.Config.Notifications.PubkeyCachePath, epoch)
if err != nil {
return err
}

log.Infof("found %v notifications for epoch %v with %v notifications for user 0", len(notifications), epoch, len(notifications[0]))
if len(notifications[0]) > 0 {
spew.Dump(notifications[0])
}
return nil
}

func collectUserDbNotifications(startEpoch uint64) error {
epoch := startEpoch

log.Infof("collecting notifications for epoch %v", epoch)
notifications, err := notification.GetUserNotificationsForEpoch(utils.Config.Notifications.PubkeyCachePath, epoch)
if err != nil {
return err
}

log.Infof("found %v notifications for epoch %v", len(notifications), epoch)
return nil
}

func fixEns(erigonClient *rpc.ErigonClient) error {
log.Infof("command: fix-ens")
addrs := []struct {
Expand Down
49 changes: 26 additions & 23 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
cloud.google.com/go/bigtable v1.21.0
cloud.google.com/go/secretmanager v1.11.5
firebase.google.com/go v3.13.0+incompatible
firebase.google.com/go/v4 v4.14.1
github.com/ClickHouse/clickhouse-go/v2 v2.17.1
github.com/Gurpartap/storekit-go v0.0.0-20201205024111-36b6cd5c6a21
github.com/alexedwards/scs/redisstore v0.0.0-20240316134038-7e11d57e8885
Expand All @@ -30,7 +31,7 @@ require (
github.com/gobitfly/eth.store v0.0.0-20240312111708-b43f13990280
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/golang-jwt/jwt/v4 v4.5.0
github.com/golang/protobuf v1.5.3
github.com/golang/protobuf v1.5.4
github.com/gomodule/redigo v1.9.2
github.com/google/uuid v1.6.0
github.com/gorilla/csrf v1.7.2
Expand Down Expand Up @@ -70,26 +71,27 @@ require (
github.com/wealdtech/go-eth2-types/v2 v2.8.2
github.com/wealdtech/go-eth2-util v1.8.0
github.com/xeipuuv/gojsonschema v1.2.0
golang.org/x/crypto v0.19.0
golang.org/x/crypto v0.21.0
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
golang.org/x/sync v0.6.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
golang.org/x/tools v0.18.0
google.golang.org/api v0.164.0
google.golang.org/protobuf v1.32.0
google.golang.org/api v0.170.0
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v2 v2.4.0
)

require (
cloud.google.com/go v0.112.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go v0.112.1 // indirect
cloud.google.com/go/compute v1.24.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/firestore v1.14.0 // indirect
cloud.google.com/go/iam v1.1.5 // indirect
cloud.google.com/go/longrunning v0.5.4 // indirect
cloud.google.com/go/storage v1.36.0 // indirect
cloud.google.com/go/firestore v1.15.0 // indirect
cloud.google.com/go/iam v1.1.7 // indirect
cloud.google.com/go/longrunning v0.5.5 // indirect
cloud.google.com/go/storage v1.40.0 // indirect
github.com/ClickHouse/ch-go v0.58.2 // indirect
github.com/MicahParks/keyfunc v1.9.0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2 // indirect
github.com/ajg/form v1.5.1 // indirect
Expand Down Expand Up @@ -153,7 +155,7 @@ require (
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
github.com/gorilla/securecookie v1.1.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect
github.com/herumi/bls-eth-go-binary v1.31.0 // indirect
Expand Down Expand Up @@ -246,24 +248,25 @@ require (
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
go.opentelemetry.io/otel v1.23.0 // indirect
go.opentelemetry.io/otel/metric v1.23.0 // indirect
go.opentelemetry.io/otel/trace v1.23.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 // indirect
google.golang.org/grpc v1.62.0 // indirect
google.golang.org/appengine/v2 v2.0.2 // indirect
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311132316-a219d84964c2 // indirect
google.golang.org/grpc v1.62.1 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
Expand Down
Loading

0 comments on commit 267a0f9

Please sign in to comment.