Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beds 1129/add status reports #1257

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/version"
"github.com/gobitfly/beaconchain/pkg/exporter/modules"
"github.com/gobitfly/beaconchain/pkg/exporter/services"
"github.com/gobitfly/beaconchain/pkg/monitoring"
)

func Run() {
Expand Down Expand Up @@ -136,6 +137,10 @@ func Run() {

wg.Wait()

// enable light-weight db connection monitoring
monitoring.Init(false)
monitoring.Start()

if utils.Config.TieredCacheProvider != "redis" {
log.Fatal(fmt.Errorf("no cache provider set, please set TierdCacheProvider (example redis)"), "", 0)
}
Expand Down
3 changes: 1 addition & 2 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ require (
github.com/juliangruber/go-intersect v1.1.0
github.com/jung-kurt/gofpdf v1.16.2
github.com/k3a/html2text v1.2.1
github.com/kelseyhightower/envconfig v1.4.0
github.com/klauspost/compress v1.17.6
github.com/klauspost/pgzip v1.2.6
github.com/lib/pq v1.10.9
Expand All @@ -66,6 +65,7 @@ require (
github.com/prysmaticlabs/go-ssz v0.0.0-20210121151755-f6208871c388
github.com/rocket-pool/rocketpool-go v1.8.4-0.20241009143357-7b6894d57365
github.com/rocket-pool/smartnode v1.14.1
github.com/sethvargo/go-envconfig v1.1.0
github.com/shopspring/decimal v1.4.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -234,7 +234,6 @@ require (
github.com/sanity-io/litter v1.5.5 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/sethvargo/go-envconfig v1.1.0 // indirect
github.com/sethvargo/go-retry v0.2.4 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions backend/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,6 @@ github.com/k3a/html2text v1.2.1 h1:nvnKgBvBR/myqrwfLuiqecUtaK1lB9hGziIJKatNFVY=
github.com/k3a/html2text v1.2.1/go.mod h1:ieEXykM67iT8lTvEWBh6fhpH4B23kB9OMKPdIBmgUqA=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
Expand Down
25 changes: 8 additions & 17 deletions backend/pkg/api/data_access/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
response.Reports = make(map[string][]types.HealthzResult)
response.ReportingUUID = utils.GetUUID()
response.DeploymentType = utils.Config.DeploymentType
err := db.ClickHouseReader.SelectContext(ctx, &results, query, ch.Named("deployment_type", utils.Config.DeploymentType), ch.Named("clean_shutdown_event_id", constants.CleanShutdownEvent))
err := db.ClickHouseReader.SelectContext(ctx, &results, query, ch.Named("deployment_type", utils.Config.DeploymentType), ch.Named("clean_shutdown_event_id", string(constants.Event_MonitoringCleanShutdown)))
if err != nil {
response.Reports["response_error"] = []types.HealthzResult{
{
Expand All @@ -125,25 +125,16 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
return response
}

mustExist := []string{
"ch_rolling_1h",
"ch_rolling_24h",
"ch_rolling_7d",
"ch_rolling_30d",
"ch_rolling_90d",
"ch_rolling_total",
"ch_dashboard_epoch",
"api_service_avg_efficiency",
"api_service_validator_mapping",
"api_service_slot_viz",
"monitoring_timeouts",
}
for _, result := range results {
response.Reports[result.EventId] = append(response.Reports[result.EventId], result)
}
for _, id := range mustExist {
if _, ok := response.Reports[id]; !ok {
response.Reports[id] = []types.HealthzResult{
requiredEvents := constants.RequiredEvents
if utils.Config.DeploymentType == "production" {
requiredEvents = append(requiredEvents, constants.ProductionRequiredEvents...)
}
for _, id := range requiredEvents {
if _, ok := response.Reports[string(id)]; !ok {
response.Reports[string(id)] = []types.HealthzResult{
{
Status: constants.Failure,
Result: []map[string]string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (s *Services) startEfficiencyDataService(wg *sync.WaitGroup) {
for {
startTime := time.Now()
delay := time.Duration(utils.Config.Chain.ClConfig.SlotsPerEpoch*utils.Config.Chain.ClConfig.SecondsPerSlot) * time.Second
r := services.NewStatusReport("api_service_avg_efficiency", constants.Default, delay)
r := services.NewStatusReport(constants.Event_ApiServiceAvgEfficiency, constants.Default, delay)
r(constants.Running, nil)
err := s.updateEfficiencyData() // TODO: only update data if something has changed (new head epoch)
if err != nil {
Expand All @@ -36,7 +36,7 @@ func (s *Services) startEfficiencyDataService(wg *sync.WaitGroup) {
delay = 10 * time.Second
} else {
log.Infof("=== average network efficiency data updated in %s", time.Since(startTime))
r(constants.Success, map[string]string{"took": time.Since(startTime).String()})
r(constants.Success, map[string]string{"took": time.Since(startTime).String(), "took_raw": fmt.Sprintf("%v", time.Since(startTime).Milliseconds())})
o.Do(func() {
wg.Done()
})
Expand Down
4 changes: 2 additions & 2 deletions backend/pkg/api/services/service_slot_viz.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ func (s *Services) startSlotVizDataService(wg *sync.WaitGroup) {
for {
startTime := time.Now()
delay := time.Duration(utils.Config.Chain.ClConfig.SecondsPerSlot) * time.Second
r := services.NewStatusReport("api_service_slot_viz", constants.Default, delay)
r := services.NewStatusReport(constants.Event_ApiServiceSlotViz, constants.Default, delay)
r(constants.Running, nil)
err := s.updateSlotVizData() // TODO: only update data if something has changed (new head slot or new head epoch)
if err != nil {
log.Error(err, "error updating slotviz data", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
}
log.Infof("=== slotviz data updated in %s", time.Since(startTime))
r(constants.Success, map[string]string{"took": time.Since(startTime).String()})
r(constants.Success, map[string]string{"took": time.Since(startTime).String(), "took_raw": fmt.Sprintf("%v", time.Since(startTime).Milliseconds())})
o.Do(func() {
wg.Done()
})
Expand Down
4 changes: 2 additions & 2 deletions backend/pkg/api/services/service_validator_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *Services) startIndexMappingService(wg *sync.WaitGroup) {
startTime := time.Now()
delay := time.Duration(utils.Config.Chain.ClConfig.SecondsPerSlot) * time.Second
err = nil // clear error
r := services.NewStatusReport("api_service_validator_mapping", constants.Default, delay)
r := services.NewStatusReport(constants.Event_ApiServiceValidatorMapping, constants.Default, delay)
r(constants.Running, nil)
latestEpoch := cache.LatestEpoch.Get()
if currentValidatorMapping.Load() == nil || latestEpoch != lastEpochUpdate {
Expand All @@ -53,7 +53,7 @@ func (s *Services) startIndexMappingService(wg *sync.WaitGroup) {
delay = 10 * time.Second
} else {
log.Infof("=== validator mapping updated in %s", time.Since(startTime))
r(constants.Success, map[string]string{"took": time.Since(startTime).String(), "latest_epoch": fmt.Sprintf("%d", lastEpochUpdate)})
r(constants.Success, map[string]string{"took": time.Since(startTime).String(), "took_raw": fmt.Sprintf("%v", time.Since(startTime).Milliseconds()), "latest_epoch": fmt.Sprintf("%d", lastEpochUpdate)})
lastEpochUpdate = latestEpoch
o.Do(func() {
wg.Done()
Expand Down
11 changes: 9 additions & 2 deletions backend/pkg/exporter/modules/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/gobitfly/beaconchain/pkg/consapi"
"github.com/gobitfly/beaconchain/pkg/consapi/types"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
"github.com/gobitfly/beaconchain/pkg/monitoring/services"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

type ModuleInterface interface {
Init() error
GetName() string // Used for logging
GetMonitoringEventId() constants.Event

OnHead(*types.StandardEventHeadResponse) error // !Do not block in this functions for an extended period of time!

Expand Down Expand Up @@ -63,7 +66,6 @@ func StartAll(context ModuleContext, modules []ModuleInterface, justV2 bool) {
log.Error(err, "beacon-node seems to be unavailable", 0)
time.Sleep(time.Second * 10)
}

// start subscription modules
startSubscriptionModules(&context, modules)
}
Expand Down Expand Up @@ -146,15 +148,20 @@ func notifyAllModules(goPool *errgroup.Group, modules []ModuleInterface, f func(
for _, module := range modules {
module := module
goPool.Go(func() error {
start := time.Now()
r := services.NewStatusReport(module.GetMonitoringEventId(), constants.Default, constants.Default)
r(constants.Running, nil)
err := f(module)
if err != nil {
log.Error(err, fmt.Sprintf("error in module %s", module.GetName()), 0)
r(constants.Failure, map[string]string{"error": err.Error()})
return nil // return never gets caught anywhere? lets not risk a memory leak and instead return nil
}
r(constants.Success, map[string]string{"took_raw": fmt.Sprintf("%v", time.Since(start).Milliseconds())})
return nil
})
}
}

func GetModuleContext() (ModuleContext, error) {
cl := consapi.NewClient("http://" + utils.Config.Indexer.Node.Host + ":" + utils.Config.Indexer.Node.Port)

Expand Down
5 changes: 5 additions & 0 deletions backend/pkg/exporter/modules/dashboard_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/consapi/network"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
edb "github.com/gobitfly/beaconchain/pkg/exporter/db"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -992,6 +993,10 @@ func (d *dashboardData) GetName() string {
return "Dashboard-Data"
}

func (d *dashboardData) GetMonitoringEventId() constants.Event {
return constants.Event_ExporterModuleDashboardData
}

func (d *dashboardData) OnHead(event *constypes.StandardEventHeadResponse) error {
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions backend/pkg/exporter/modules/execution_deposits_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
)

// if we ever end up in a situation where we possibly have gaps in the data remember: the merkletree_index is unique.
Expand Down Expand Up @@ -155,6 +156,10 @@ func (d *executionDepositsExporter) GetName() string {
return "ExecutionDeposits-Exporter"
}

func (d *executionDepositsExporter) GetMonitoringEventId() constants.Event {
return constants.Event_ExporterModuleELDepositsExporter
}

func (d *executionDepositsExporter) OnChainReorg(event *constypes.StandardEventChainReorg) (err error) {
return nil // nop
}
Expand Down
5 changes: 5 additions & 0 deletions backend/pkg/exporter/modules/execution_payloads_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/types"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -50,6 +51,10 @@ func (d *executionPayloadsExporter) GetName() string {
return "ExecutionPayloads-Exporter"
}

func (d *executionPayloadsExporter) GetMonitoringEventId() constants.Event {
return constants.Event_ExporterModuleELPayloadExporter
}

func (d *executionPayloadsExporter) OnChainReorg(event *constypes.StandardEventChainReorg) (err error) {
return nil // nop
}
Expand Down
5 changes: 5 additions & 0 deletions backend/pkg/exporter/modules/execution_rewards_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
)

type executionRewardsFinalizer struct {
Expand All @@ -31,6 +32,10 @@ func (d *executionRewardsFinalizer) GetName() string {
return "ExecutionRewards-Finalizer"
}

func (d *executionRewardsFinalizer) GetMonitoringEventId() constants.Event {
return constants.Event_ExporterModuleELRewardsFinalizer
}

func (d *executionRewardsFinalizer) OnChainReorg(event *constypes.StandardEventChainReorg) (err error) {
return nil // nop
}
Expand Down
12 changes: 12 additions & 0 deletions backend/pkg/exporter/modules/network_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
"github.com/gobitfly/beaconchain/pkg/monitoring/services"
)

func networkLivenessUpdater(client rpc.Client) {
Expand All @@ -21,20 +23,26 @@ func networkLivenessUpdater(client rpc.Client) {
slotDuration := time.Second * time.Duration(utils.Config.Chain.ClConfig.SecondsPerSlot)

for {
r := services.NewStatusReport(constants.Event_ExporterLegacyNetworkLiveness, constants.Default, slotDuration)
r(constants.Running, nil)

head, err := client.GetChainHead()
if err != nil {
log.Error(err, "error getting chainhead when exporting networkliveness", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
time.Sleep(slotDuration)
continue
}

if prevHeadEpoch == head.HeadEpoch {
r(constants.Success, nil)
time.Sleep(slotDuration)
continue
}

// wait for node to be synced
if time.Now().Add(-epochDuration).After(utils.EpochToTime(head.HeadEpoch)) {
r(constants.Failure, map[string]string{"error": "node not synced"})
time.Sleep(slotDuration)
continue
}
Expand All @@ -45,6 +53,7 @@ func networkLivenessUpdater(client rpc.Client) {
head.HeadEpoch, head.FinalizedEpoch, head.JustifiedEpoch, head.PreviousJustifiedEpoch)
if err != nil {
log.Error(err, "error saving networkliveness", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
} else {
log.Infof("updated networkliveness for epoch %v", head.HeadEpoch)
prevHeadEpoch = head.HeadEpoch
Expand All @@ -53,12 +62,15 @@ func networkLivenessUpdater(client rpc.Client) {
err = cache.LatestNodeEpoch.Set(head.HeadEpoch)
if err != nil {
log.Error(err, "error setting latestNodeEpoch in cache", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
}

err = cache.LatestNodeFinalizedEpoch.Set(head.FinalizedEpoch)
if err != nil {
log.Error(err, "error setting latestNodeFinalizedEpoch in cache", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
}
r(constants.Success, nil)

time.Sleep(slotDuration)
}
Expand Down
9 changes: 8 additions & 1 deletion backend/pkg/exporter/modules/pubkey_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
monitoringServices "github.com/gobitfly/beaconchain/pkg/monitoring/services"
)

func UpdatePubkeyTag() {
log.Infof("Started Pubkey Tags Updater")
for {
start := time.Now()

r := monitoringServices.NewStatusReport(constants.Event_ExporterLegacyPubkeyTags, constants.Default, time.Second*12)
r(constants.Running, nil)
tx, err := db.WriterDb.Beginx()
if err != nil {
log.Error(err, "Error connecting to DB", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
// return err
}
_, err = tx.Exec(`INSERT INTO validator_tags (publickey, tag)
Expand All @@ -26,16 +30,19 @@ func UpdatePubkeyTag() {
ON CONFLICT (publickey, tag) DO NOTHING;`)
if err != nil {
log.Error(err, "error updating validator_tags", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
// return err
}

err = tx.Commit()
if err != nil {
log.Error(err, "error committing transaction", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
}
_ = tx.Rollback()

log.Infof("Updating Pubkey Tags took %v sec.", time.Since(start).Seconds())
r(constants.Success, map[string]string{"took": time.Since(start).String(), "took_raw": time.Since(start).String()})
metrics.TaskDuration.WithLabelValues("validator_pubkey_tag_updater").Observe(time.Since(start).Seconds())

time.Sleep(time.Minute * 10)
Expand Down
Loading
Loading