diff --git a/backend/cmd/exporter/main.go b/backend/cmd/exporter/main.go index 897ce320b..8be9c7fc4 100644 --- a/backend/cmd/exporter/main.go +++ b/backend/cmd/exporter/main.go @@ -19,6 +19,7 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/version" "github.com/gobitfly/beaconchain/pkg/exporter/modules" "github.com/gobitfly/beaconchain/pkg/exporter/services" + "github.com/gobitfly/beaconchain/pkg/monitoring" ) func Run() { @@ -136,6 +137,10 @@ func Run() { wg.Wait() + // enable light-weight db connection monitoring + monitoring.Init(false) + monitoring.Start() + if utils.Config.TieredCacheProvider != "redis" { log.Fatal(fmt.Errorf("no cache provider set, please set TierdCacheProvider (example redis)"), "", 0) } diff --git a/backend/go.mod b/backend/go.mod index f0b6c4f7a..fb536e301 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -52,7 +52,6 @@ require ( github.com/juliangruber/go-intersect v1.1.0 github.com/jung-kurt/gofpdf v1.16.2 github.com/k3a/html2text v1.2.1 - github.com/kelseyhightower/envconfig v1.4.0 github.com/klauspost/compress v1.17.6 github.com/klauspost/pgzip v1.2.6 github.com/lib/pq v1.10.9 @@ -66,6 +65,7 @@ require ( github.com/prysmaticlabs/go-ssz v0.0.0-20210121151755-f6208871c388 github.com/rocket-pool/rocketpool-go v1.8.4-0.20241009143357-7b6894d57365 github.com/rocket-pool/smartnode v1.14.1 + github.com/sethvargo/go-envconfig v1.1.0 github.com/shopspring/decimal v1.4.0 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.9.0 @@ -234,7 +234,6 @@ require ( github.com/sanity-io/litter v1.5.5 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/sergi/go-diff v1.2.0 // indirect - github.com/sethvargo/go-envconfig v1.1.0 // indirect github.com/sethvargo/go-retry v0.2.4 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect diff --git a/backend/go.sum b/backend/go.sum index 2cfda3b52..087b51058 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -601,8 +601,6 @@ github.com/k3a/html2text v1.2.1 h1:nvnKgBvBR/myqrwfLuiqecUtaK1lB9hGziIJKatNFVY= github.com/k3a/html2text v1.2.1/go.mod h1:ieEXykM67iT8lTvEWBh6fhpH4B23kB9OMKPdIBmgUqA= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= -github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= diff --git a/backend/pkg/api/data_access/healthz.go b/backend/pkg/api/data_access/healthz.go index 303184532..bdd3ae172 100644 --- a/backend/pkg/api/data_access/healthz.go +++ b/backend/pkg/api/data_access/healthz.go @@ -111,7 +111,7 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types. response.Reports = make(map[string][]types.HealthzResult) response.ReportingUUID = utils.GetUUID() response.DeploymentType = utils.Config.DeploymentType - err := db.ClickHouseReader.SelectContext(ctx, &results, query, ch.Named("deployment_type", utils.Config.DeploymentType), ch.Named("clean_shutdown_event_id", constants.CleanShutdownEvent)) + err := db.ClickHouseReader.SelectContext(ctx, &results, query, ch.Named("deployment_type", utils.Config.DeploymentType), ch.Named("clean_shutdown_event_id", string(constants.Event_MonitoringCleanShutdown))) if err != nil { response.Reports["response_error"] = []types.HealthzResult{ { @@ -125,25 +125,16 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types. return response } - mustExist := []string{ - "ch_rolling_1h", - "ch_rolling_24h", - "ch_rolling_7d", - "ch_rolling_30d", - "ch_rolling_90d", - "ch_rolling_total", - "ch_dashboard_epoch", - "api_service_avg_efficiency", - "api_service_validator_mapping", - "api_service_slot_viz", - "monitoring_timeouts", - } for _, result := range results { response.Reports[result.EventId] = append(response.Reports[result.EventId], result) } - for _, id := range mustExist { - if _, ok := response.Reports[id]; !ok { - response.Reports[id] = []types.HealthzResult{ + requiredEvents := constants.RequiredEvents + if utils.Config.DeploymentType == "production" { + requiredEvents = append(requiredEvents, constants.ProductionRequiredEvents...) + } + for _, id := range requiredEvents { + if _, ok := response.Reports[string(id)]; !ok { + response.Reports[string(id)] = []types.HealthzResult{ { Status: constants.Failure, Result: []map[string]string{ diff --git a/backend/pkg/api/services/service_average_network_efficiency.go b/backend/pkg/api/services/service_average_network_efficiency.go index d25d21a5d..64305faf4 100644 --- a/backend/pkg/api/services/service_average_network_efficiency.go +++ b/backend/pkg/api/services/service_average_network_efficiency.go @@ -27,7 +27,7 @@ func (s *Services) startEfficiencyDataService(wg *sync.WaitGroup) { for { startTime := time.Now() delay := time.Duration(utils.Config.Chain.ClConfig.SlotsPerEpoch*utils.Config.Chain.ClConfig.SecondsPerSlot) * time.Second - r := services.NewStatusReport("api_service_avg_efficiency", constants.Default, delay) + r := services.NewStatusReport(constants.Event_ApiServiceAvgEfficiency, constants.Default, delay) r(constants.Running, nil) err := s.updateEfficiencyData() // TODO: only update data if something has changed (new head epoch) if err != nil { diff --git a/backend/pkg/api/services/service_slot_viz.go b/backend/pkg/api/services/service_slot_viz.go index 615b1d0b1..95fad1b69 100644 --- a/backend/pkg/api/services/service_slot_viz.go +++ b/backend/pkg/api/services/service_slot_viz.go @@ -32,7 +32,7 @@ func (s *Services) startSlotVizDataService(wg *sync.WaitGroup) { for { startTime := time.Now() delay := time.Duration(utils.Config.Chain.ClConfig.SecondsPerSlot) * time.Second - r := services.NewStatusReport("api_service_slot_viz", constants.Default, delay) + r := services.NewStatusReport(constants.Event_ApiServiceSlotViz, constants.Default, delay) r(constants.Running, nil) err := s.updateSlotVizData() // TODO: only update data if something has changed (new head slot or new head epoch) if err != nil { diff --git a/backend/pkg/api/services/service_validator_mapping.go b/backend/pkg/api/services/service_validator_mapping.go index c77f3647a..9af94088b 100644 --- a/backend/pkg/api/services/service_validator_mapping.go +++ b/backend/pkg/api/services/service_validator_mapping.go @@ -41,7 +41,7 @@ func (s *Services) startIndexMappingService(wg *sync.WaitGroup) { startTime := time.Now() delay := time.Duration(utils.Config.Chain.ClConfig.SecondsPerSlot) * time.Second err = nil // clear error - r := services.NewStatusReport("api_service_validator_mapping", constants.Default, delay) + r := services.NewStatusReport(constants.Event_ApiServiceValidatorMapping, constants.Default, delay) r(constants.Running, nil) latestEpoch := cache.LatestEpoch.Get() if currentValidatorMapping.Load() == nil || latestEpoch != lastEpochUpdate { diff --git a/backend/pkg/exporter/modules/base.go b/backend/pkg/exporter/modules/base.go index 1dba9ed01..c23c0ca60 100644 --- a/backend/pkg/exporter/modules/base.go +++ b/backend/pkg/exporter/modules/base.go @@ -11,6 +11,8 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/utils" "github.com/gobitfly/beaconchain/pkg/consapi" "github.com/gobitfly/beaconchain/pkg/consapi/types" + "github.com/gobitfly/beaconchain/pkg/monitoring/constants" + "github.com/gobitfly/beaconchain/pkg/monitoring/services" "github.com/pkg/errors" "golang.org/x/sync/errgroup" ) @@ -18,6 +20,7 @@ import ( type ModuleInterface interface { Init() error GetName() string // Used for logging + GetMonitoringEventId() constants.Event OnHead(*types.StandardEventHeadResponse) error // !Do not block in this functions for an extended period of time! @@ -63,7 +66,6 @@ func StartAll(context ModuleContext, modules []ModuleInterface, justV2 bool) { log.Error(err, "beacon-node seems to be unavailable", 0) time.Sleep(time.Second * 10) } - // start subscription modules startSubscriptionModules(&context, modules) } @@ -146,15 +148,20 @@ func notifyAllModules(goPool *errgroup.Group, modules []ModuleInterface, f func( for _, module := range modules { module := module goPool.Go(func() error { + start := time.Now() + r := services.NewStatusReport(module.GetMonitoringEventId(), constants.Default, constants.Default) + r(constants.Running, nil) err := f(module) if err != nil { log.Error(err, fmt.Sprintf("error in module %s", module.GetName()), 0) + r(constants.Failure, map[string]string{"error": err.Error()}) + return nil // return never gets caught anywhere? lets not risk a memory leak and instead return nil } + r(constants.Success, map[string]string{"took_raw": fmt.Sprintf("%v", time.Since(start).Milliseconds())}) return nil }) } } - func GetModuleContext() (ModuleContext, error) { cl := consapi.NewClient("http://" + utils.Config.Indexer.Node.Host + ":" + utils.Config.Indexer.Node.Port) diff --git a/backend/pkg/exporter/modules/dashboard_data.go b/backend/pkg/exporter/modules/dashboard_data.go index 55ca2ebd8..f7bb7d3f8 100644 --- a/backend/pkg/exporter/modules/dashboard_data.go +++ b/backend/pkg/exporter/modules/dashboard_data.go @@ -19,6 +19,7 @@ import ( "github.com/gobitfly/beaconchain/pkg/consapi/network" constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" edb "github.com/gobitfly/beaconchain/pkg/exporter/db" + "github.com/gobitfly/beaconchain/pkg/monitoring/constants" "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" "golang.org/x/sync/errgroup" @@ -992,6 +993,10 @@ func (d *dashboardData) GetName() string { return "Dashboard-Data" } +func (d *dashboardData) GetMonitoringEventId() constants.Event { + return constants.Event_ExporterModuleDashboardData +} + func (d *dashboardData) OnHead(event *constypes.StandardEventHeadResponse) error { return nil } diff --git a/backend/pkg/exporter/modules/execution_deposits_exporter.go b/backend/pkg/exporter/modules/execution_deposits_exporter.go index 62e5aa668..76854cd4c 100644 --- a/backend/pkg/exporter/modules/execution_deposits_exporter.go +++ b/backend/pkg/exporter/modules/execution_deposits_exporter.go @@ -31,6 +31,7 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/types" "github.com/gobitfly/beaconchain/pkg/commons/utils" constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" + "github.com/gobitfly/beaconchain/pkg/monitoring/constants" ) // if we ever end up in a situation where we possibly have gaps in the data remember: the merkletree_index is unique. @@ -155,6 +156,10 @@ func (d *executionDepositsExporter) GetName() string { return "ExecutionDeposits-Exporter" } +func (d *executionDepositsExporter) GetMonitoringEventId() constants.Event { + return constants.Event_ExporterModuleELDepositsExporter +} + func (d *executionDepositsExporter) OnChainReorg(event *constypes.StandardEventChainReorg) (err error) { return nil // nop } diff --git a/backend/pkg/exporter/modules/execution_payloads_exporter.go b/backend/pkg/exporter/modules/execution_payloads_exporter.go index 7266821cc..9fbbb975c 100644 --- a/backend/pkg/exporter/modules/execution_payloads_exporter.go +++ b/backend/pkg/exporter/modules/execution_payloads_exporter.go @@ -11,6 +11,7 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/types" constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" + "github.com/gobitfly/beaconchain/pkg/monitoring/constants" "github.com/pkg/errors" "github.com/shopspring/decimal" "golang.org/x/sync/errgroup" @@ -50,6 +51,10 @@ func (d *executionPayloadsExporter) GetName() string { return "ExecutionPayloads-Exporter" } +func (d *executionPayloadsExporter) GetMonitoringEventId() constants.Event { + return constants.Event_ExporterModuleELPayloadExporter +} + func (d *executionPayloadsExporter) OnChainReorg(event *constypes.StandardEventChainReorg) (err error) { return nil // nop } diff --git a/backend/pkg/exporter/modules/execution_rewards_finalizer.go b/backend/pkg/exporter/modules/execution_rewards_finalizer.go index 13a6529af..4b67ea8bf 100644 --- a/backend/pkg/exporter/modules/execution_rewards_finalizer.go +++ b/backend/pkg/exporter/modules/execution_rewards_finalizer.go @@ -9,6 +9,7 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/db" "github.com/gobitfly/beaconchain/pkg/commons/log" constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" + "github.com/gobitfly/beaconchain/pkg/monitoring/constants" ) type executionRewardsFinalizer struct { @@ -31,6 +32,10 @@ func (d *executionRewardsFinalizer) GetName() string { return "ExecutionRewards-Finalizer" } +func (d *executionRewardsFinalizer) GetMonitoringEventId() constants.Event { + return constants.Event_ExporterModuleELRewardsFinalizer +} + func (d *executionRewardsFinalizer) OnChainReorg(event *constypes.StandardEventChainReorg) (err error) { return nil // nop } diff --git a/backend/pkg/exporter/modules/network_liveness.go b/backend/pkg/exporter/modules/network_liveness.go index 8c93341d7..e2c7f33c6 100644 --- a/backend/pkg/exporter/modules/network_liveness.go +++ b/backend/pkg/exporter/modules/network_liveness.go @@ -8,6 +8,8 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/rpc" "github.com/gobitfly/beaconchain/pkg/commons/utils" + "github.com/gobitfly/beaconchain/pkg/monitoring/constants" + "github.com/gobitfly/beaconchain/pkg/monitoring/services" ) func networkLivenessUpdater(client rpc.Client) { @@ -21,20 +23,26 @@ func networkLivenessUpdater(client rpc.Client) { slotDuration := time.Second * time.Duration(utils.Config.Chain.ClConfig.SecondsPerSlot) for { + r := services.NewStatusReport(constants.Event_ExporterLegacyNetworkLiveness, constants.Default, slotDuration) + r(constants.Running, nil) + head, err := client.GetChainHead() if err != nil { log.Error(err, "error getting chainhead when exporting networkliveness", 0) + r(constants.Failure, map[string]string{"error": err.Error()}) time.Sleep(slotDuration) continue } if prevHeadEpoch == head.HeadEpoch { + r(constants.Success, nil) time.Sleep(slotDuration) continue } // wait for node to be synced if time.Now().Add(-epochDuration).After(utils.EpochToTime(head.HeadEpoch)) { + r(constants.Failure, map[string]string{"error": "node not synced"}) time.Sleep(slotDuration) continue } @@ -45,6 +53,7 @@ func networkLivenessUpdater(client rpc.Client) { head.HeadEpoch, head.FinalizedEpoch, head.JustifiedEpoch, head.PreviousJustifiedEpoch) if err != nil { log.Error(err, "error saving networkliveness", 0) + r(constants.Failure, map[string]string{"error": err.Error()}) } else { log.Infof("updated networkliveness for epoch %v", head.HeadEpoch) prevHeadEpoch = head.HeadEpoch @@ -53,12 +62,15 @@ func networkLivenessUpdater(client rpc.Client) { err = cache.LatestNodeEpoch.Set(head.HeadEpoch) if err != nil { log.Error(err, "error setting latestNodeEpoch in cache", 0) + r(constants.Failure, map[string]string{"error": err.Error()}) } err = cache.LatestNodeFinalizedEpoch.Set(head.FinalizedEpoch) if err != nil { log.Error(err, "error setting latestNodeFinalizedEpoch in cache", 0) + r(constants.Failure, map[string]string{"error": err.Error()}) } + r(constants.Success, nil) time.Sleep(slotDuration) } diff --git a/backend/pkg/exporter/modules/pubkey_tags.go b/backend/pkg/exporter/modules/pubkey_tags.go index 4f1090380..97bdc9c16 100644 --- a/backend/pkg/exporter/modules/pubkey_tags.go +++ b/backend/pkg/exporter/modules/pubkey_tags.go @@ -6,16 +6,20 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/db" "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/metrics" + "github.com/gobitfly/beaconchain/pkg/monitoring/constants" + monitoringServices "github.com/gobitfly/beaconchain/pkg/monitoring/services" ) func UpdatePubkeyTag() { log.Infof("Started Pubkey Tags Updater") for { start := time.Now() - + r := monitoringServices.NewStatusReport(constants.Event_ExporterLegacyPubkeyTags, constants.Default, time.Second*12) + r(constants.Running, nil) tx, err := db.WriterDb.Beginx() if err != nil { log.Error(err, "Error connecting to DB", 0) + r(constants.Failure, map[string]string{"error": err.Error()}) // return err } _, err = tx.Exec(`INSERT INTO validator_tags (publickey, tag) @@ -26,16 +30,19 @@ func UpdatePubkeyTag() { ON CONFLICT (publickey, tag) DO NOTHING;`) if err != nil { log.Error(err, "error updating validator_tags", 0) + r(constants.Failure, map[string]string{"error": err.Error()}) // return err } err = tx.Commit() if err != nil { log.Error(err, "error committing transaction", 0) + r(constants.Failure, map[string]string{"error": err.Error()}) } _ = tx.Rollback() log.Infof("Updating Pubkey Tags took %v sec.", time.Since(start).Seconds()) + r(constants.Success, map[string]string{"took": time.Since(start).String(), "took_raw": time.Since(start).String()}) metrics.TaskDuration.WithLabelValues("validator_pubkey_tag_updater").Observe(time.Since(start).Seconds()) time.Sleep(time.Minute * 10) diff --git a/backend/pkg/exporter/modules/rocketpool.go b/backend/pkg/exporter/modules/rocketpool.go index 648c184c4..fd73e09a1 100644 --- a/backend/pkg/exporter/modules/rocketpool.go +++ b/backend/pkg/exporter/modules/rocketpool.go @@ -17,6 +17,8 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/metrics" "github.com/gobitfly/beaconchain/pkg/commons/services" "github.com/gobitfly/beaconchain/pkg/commons/utils" + "github.com/gobitfly/beaconchain/pkg/monitoring/constants" + monitoringServices "github.com/gobitfly/beaconchain/pkg/monitoring/services" "github.com/pkg/errors" "github.com/ethereum/go-ethereum/common" @@ -189,16 +191,20 @@ func (rp *RocketpoolExporter) Run() error { for { t0 := time.Now() + r := monitoringServices.NewStatusReport(constants.Event_ExporterLegacyRocketPool, constants.Default, time.Second*12) + r(constants.Running, nil) var err error err = rp.Update(count) if err != nil { log.Error(err, "error updating rocketpool-data", 0) + r(constants.Failure, map[string]string{"error": err.Error()}) time.Sleep(errorInterval) continue } err = rp.Save(count) if err != nil { log.Error(err, "error saving rocketpool-data", 0) + r(constants.Failure, map[string]string{"error": err.Error()}) time.Sleep(errorInterval) continue } @@ -206,6 +212,7 @@ func (rp *RocketpoolExporter) Run() error { services.ReportStatus("rocketpoolExporter", "Running", nil) metrics.TaskDuration.WithLabelValues("exporter_rocketpoolExporter").Observe(time.Since(t0).Seconds()) + r(constants.Success, map[string]string{"took": time.Since(t0).String(), "took_raw": fmt.Sprintf("%v", time.Since(t0).Milliseconds())}) log.InfoWithFields(log.Fields{"duration": time.Since(t0)}, "exported rocketpool-data") count++ diff --git a/backend/pkg/exporter/modules/slot_exporter.go b/backend/pkg/exporter/modules/slot_exporter.go index a56be4771..5b8ed2b22 100644 --- a/backend/pkg/exporter/modules/slot_exporter.go +++ b/backend/pkg/exporter/modules/slot_exporter.go @@ -16,6 +16,7 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/rpc" "github.com/gobitfly/beaconchain/pkg/commons/services" constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" + "github.com/gobitfly/beaconchain/pkg/monitoring/constants" "github.com/klauspost/pgzip" "fmt" @@ -675,6 +676,10 @@ func (d *slotExporterData) GetName() string { return "Slot-Exporter" } +func (d *slotExporterData) GetMonitoringEventId() constants.Event { + return constants.Event_ExporterModuleSlotExporter +} + func (d *slotExporterData) OnChainReorg(event *constypes.StandardEventChainReorg) (err error) { return nil // nop } diff --git a/backend/pkg/exporter/modules/sync_committees.go b/backend/pkg/exporter/modules/sync_committees.go index 30e021f25..2dfc76cec 100644 --- a/backend/pkg/exporter/modules/sync_committees.go +++ b/backend/pkg/exporter/modules/sync_committees.go @@ -12,6 +12,8 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/rpc" "github.com/gobitfly/beaconchain/pkg/commons/utils" + "github.com/gobitfly/beaconchain/pkg/monitoring/constants" + "github.com/gobitfly/beaconchain/pkg/monitoring/services" "github.com/jmoiron/sqlx" ) @@ -19,10 +21,14 @@ import ( func syncCommitteesExporter(rpcClient rpc.Client) { for { t0 := time.Now() + r := services.NewStatusReport(constants.Event_ExporterLegacySyncCommittees, constants.Default, time.Second*12) + r(constants.Running, nil) err := exportSyncCommittees(rpcClient) if err != nil { log.Error(err, "error exporting sync_committees", 0, map[string]interface{}{"duration": time.Since(t0)}) + r(constants.Failure, map[string]string{"error": err.Error()}) } + r(constants.Success, map[string]string{"took": time.Since(t0).String(), "took_raw": fmt.Sprintf("%v", time.Since(t0).Milliseconds())}) time.Sleep(time.Second * 12) } } diff --git a/backend/pkg/exporter/modules/sync_committees_count.go b/backend/pkg/exporter/modules/sync_committees_count.go index da97308e7..0d845cc4f 100644 --- a/backend/pkg/exporter/modules/sync_committees_count.go +++ b/backend/pkg/exporter/modules/sync_committees_count.go @@ -7,14 +7,21 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/db" "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/utils" + "github.com/gobitfly/beaconchain/pkg/monitoring/constants" + "github.com/gobitfly/beaconchain/pkg/monitoring/services" ) func syncCommitteesCountExporter() { for { + t0 := time.Now() + r := services.NewStatusReport(constants.Event_ExporterLegacySyncCommitteesCount, constants.Default, time.Second*12) + r(constants.Running, nil) err := exportSyncCommitteesCount() if err != nil { log.Error(err, "error exporting sync_committees_count_per_validator", 0) + r(constants.Failure, map[string]string{"error": err.Error()}) } + r(constants.Success, map[string]string{"took": time.Since(t0).String(), "took_raw": fmt.Sprintf("%v", time.Since(t0).Milliseconds())}) time.Sleep(time.Second * 12) } } diff --git a/backend/pkg/monitoring/constants/main.go b/backend/pkg/monitoring/constants/main.go index 9e20f037b..7131c1c3f 100644 --- a/backend/pkg/monitoring/constants/main.go +++ b/backend/pkg/monitoring/constants/main.go @@ -4,12 +4,75 @@ import "time" // status enum type StatusType string +type Event string const ( - Running StatusType = "running" - Success StatusType = "success" - Failure StatusType = "failure" - Default time.Duration = -1 * time.Second + Running StatusType = "running" + Success StatusType = "success" + Failure StatusType = "failure" + Default time.Duration = -1 * time.Second + Event_ApiServiceAvgEfficiency Event = "api_service_avg_efficiency" + Event_ApiServiceSlotViz Event = "api_service_slot_viz" + Event_ApiServiceValidatorMapping Event = "api_service_validator_mapping" + Event_ExporterLegacyNetworkLiveness Event = "exporter_legacy_network_liveness" + Event_ExporterLegacySyncCommittees Event = "exporter_legacy_sync_committees" + Event_ExporterLegacySyncCommitteesCount Event = "exporter_legacy_sync_committees_count" + Event_ExporterLegacyRocketPool Event = "exporter_legacy_rocket_pool" + Event_ExporterLegacyPubkeyTags Event = "exporter_legacy_pubkey_tags" + Event_ExporterModuleELRewardsFinalizer Event = "exporter_module_el_rewards_finalizer" + Event_ExporterModuleELPayloadExporter Event = "exporter_module_el_payload_exporter" + Event_ExporterModuleELDepositsExporter Event = "exporter_module_el_deposits_exporter" + Event_ExporterModuleSlotExporter Event = "exporter_module_slot_exporter" + Event_ExporterModuleDashboardData Event = "exporter_module_dashboard_data" + Event_MonitoringCleanShutdown Event = "clean_shutdown" + Event_MonitoringCleanShutdownSpam Event = "monitoring_clean_shutdown_spam" + Event_MonitoringTimeouts Event = "monitoring_timeouts" + Event_ClickhouseDashboardEpoch Event = "ch_dashboard_epoch" + Event_ClickhouseRolling_1h Event = "ch_rolling_1h" + Event_ClickhouseRolling_24h Event = "ch_rolling_24h" + Event_ClickhouseRolling_7d Event = "ch_rolling_7d" + Event_ClickhouseRolling_30d Event = "ch_rolling_30d" + Event_ClickhouseRolling_90d Event = "ch_rolling_90d" + Event_ClickhouseRolling_total Event = "ch_rolling_total" + Event_DBConnReaderDB Event = "db_conn_reader_db" + Event_DBConnWriterDB Event = "db_conn_writer_db" + Event_DBConnUserReader Event = "db_conn_user_reader" + Event_DBConnUserWriter Event = "db_conn_user_writer" + Event_DBConnAlloyReader Event = "db_conn_alloy_reader" + Event_DBConnAlloyWriter Event = "db_conn_alloy_writer" + Event_DBConnFrontendReaderDB Event = "db_conn_frontend_reader_db" + Event_DBConnFrontendWriterDB Event = "db_conn_frontend_writer_db" + Event_DBConnClickhouseReader Event = "db_conn_clickhouse_reader" + Event_DBConnClickhouseWriter Event = "db_conn_clickhouse_writer" + Event_DBConnClickhouseNativeWriter Event = "db_conn_clickhouse_native_writer" + Event_DBConnPersistentRedisDbClient Event = "db_conn_persistent_redis_db_client" + Event_DBConnTieredCache Event = "db_conn_tiered_cache" ) -const CleanShutdownEvent = "clean_shutdown" +// events that if not present in the monitoring system, should cause an alert +var RequiredEvents = []Event{ + Event_ApiServiceAvgEfficiency, + Event_ApiServiceSlotViz, + Event_ApiServiceValidatorMapping, + Event_MonitoringTimeouts, + Event_ClickhouseDashboardEpoch, + Event_ClickhouseRolling_1h, + Event_ClickhouseRolling_24h, + Event_ClickhouseRolling_7d, + Event_ClickhouseRolling_30d, + Event_ClickhouseRolling_90d, + Event_ClickhouseRolling_total, +} + +// events that only need to be present in production +var ProductionRequiredEvents = []Event{ + Event_ExporterLegacyNetworkLiveness, + Event_ExporterLegacySyncCommittees, + Event_ExporterLegacySyncCommitteesCount, + Event_ExporterLegacyRocketPool, + Event_ExporterLegacyPubkeyTags, + Event_ExporterModuleELRewardsFinalizer, + Event_ExporterModuleELPayloadExporter, + Event_ExporterModuleELDepositsExporter, + Event_ExporterModuleSlotExporter, +} diff --git a/backend/pkg/monitoring/monitoring.go b/backend/pkg/monitoring/monitoring.go index 00d9af0c4..511e07464 100644 --- a/backend/pkg/monitoring/monitoring.go +++ b/backend/pkg/monitoring/monitoring.go @@ -56,7 +56,7 @@ func Stop() { service.Stop() } // this prevents status reports that werent shut down cleanly from triggering alerts - services.NewStatusReport(constants.CleanShutdownEvent, constants.Default, constants.Default)(constants.Success, nil) + services.NewStatusReport(constants.Event_MonitoringCleanShutdown, constants.Default, constants.Default)(constants.Success, nil) if startedClickhouse.Load() { db.ClickHouseNativeWriter.Close() } diff --git a/backend/pkg/monitoring/services/base.go b/backend/pkg/monitoring/services/base.go index 04cbea094..9858d63e4 100644 --- a/backend/pkg/monitoring/services/base.go +++ b/backend/pkg/monitoring/services/base.go @@ -3,6 +3,8 @@ package services import ( "context" "fmt" + "path/filepath" + "runtime" "sync" "sync/atomic" "time" @@ -42,11 +44,12 @@ func (s *ServiceBase) Stop() { s.wg.Wait() } -func NewStatusReport(id string, timeout time.Duration, check_interval time.Duration) func(status constants.StatusType, metadata map[string]string) { +func NewStatusReport(id constants.Event, timeout time.Duration, check_interval time.Duration) func(status constants.StatusType, metadata map[string]string) { runId := uuid.New().String() return func(status constants.StatusType, metadata map[string]string) { // acquire snowflake synchronously flake := utils.GetSnowflake() + callerProgramCounter, callerFullFilePath, callerLine, callerOK := runtime.Caller(1) now := time.Now() go func() { if metadata == nil { @@ -56,6 +59,11 @@ func NewStatusReport(id string, timeout time.Duration, check_interval time.Durat metadata["run_id"] = runId metadata["status"] = string(status) metadata["executable_version"] = fmt.Sprintf("%s (%s)", version.Version, version.GoVersion) + if callerOK { + callerFunction := runtime.FuncForPC(callerProgramCounter).Name() + callerFile := filepath.Base(callerFullFilePath) + metadata["caller"] = fmt.Sprintf("%s %s:%d", callerFunction, callerFile, callerLine) + } // report status to monitoring ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) diff --git a/backend/pkg/monitoring/services/clean_shutdown_spam.go b/backend/pkg/monitoring/services/clean_shutdown_spam.go index 0dcf01391..20dc3fe3c 100644 --- a/backend/pkg/monitoring/services/clean_shutdown_spam.go +++ b/backend/pkg/monitoring/services/clean_shutdown_spam.go @@ -39,8 +39,7 @@ func (s *CleanShutdownSpamDetector) internalProcess() { } func (s *CleanShutdownSpamDetector) runChecks() { - id := "monitoring_clean_shutdown_spam" - r := NewStatusReport(id, constants.Default, 30*time.Second) + r := NewStatusReport(constants.Event_MonitoringCleanShutdownSpam, constants.Default, 30*time.Second) r(constants.Running, nil) if db.ClickHouseReader == nil { r(constants.Failure, map[string]string{"error": "clickhouse reader is nil"}) @@ -62,7 +61,7 @@ func (s *CleanShutdownSpamDetector) runChecks() { ctx, cancel := context.WithTimeout(s.ctx, 15*time.Second) defer cancel() var emitters []string - err := db.ClickHouseReader.SelectContext(ctx, &emitters, query, utils.Config.DeploymentType, constants.CleanShutdownEvent) + err := db.ClickHouseReader.SelectContext(ctx, &emitters, query, utils.Config.DeploymentType, constants.Event_MonitoringCleanShutdown) if err != nil { r(constants.Failure, map[string]string{"error": err.Error()}) return diff --git a/backend/pkg/monitoring/services/clickhouse_epoch.go b/backend/pkg/monitoring/services/clickhouse_epoch.go index 96a3f30e8..0d23ef247 100644 --- a/backend/pkg/monitoring/services/clickhouse_epoch.go +++ b/backend/pkg/monitoring/services/clickhouse_epoch.go @@ -36,8 +36,7 @@ func (s *ServiceClickhouseEpoch) internalProcess() { } func (s *ServiceClickhouseEpoch) runChecks() { - id := "ch_dashboard_epoch" - r := NewStatusReport(id, constants.Default, 30*time.Second) + r := NewStatusReport(constants.Event_ClickhouseDashboardEpoch, constants.Default, 30*time.Second) r(constants.Running, nil) if db.ClickHouseReader == nil { r(constants.Failure, map[string]string{"error": "clickhouse reader is nil"}) diff --git a/backend/pkg/monitoring/services/clickhouse_rollings.go b/backend/pkg/monitoring/services/clickhouse_rollings.go index f75fb6a44..8142b9c33 100644 --- a/backend/pkg/monitoring/services/clickhouse_rollings.go +++ b/backend/pkg/monitoring/services/clickhouse_rollings.go @@ -3,6 +3,7 @@ package services import ( "context" "fmt" + "maps" "sync" "time" @@ -41,22 +42,21 @@ func (s *ServiceClickhouseRollings) internalProcess() { } func (s *ServiceClickhouseRollings) runChecks() { - rollings := []string{ - "1h", - "24h", - "7d", - "30d", - "90d", - "total", + rollings := map[string]constants.Event{ + "1h": constants.Event_ClickhouseRolling_1h, + "24h": constants.Event_ClickhouseRolling_24h, + "7d": constants.Event_ClickhouseRolling_7d, + "30d": constants.Event_ClickhouseRolling_30d, + "90d": constants.Event_ClickhouseRolling_90d, + "total": constants.Event_ClickhouseRolling_total, } wg := sync.WaitGroup{} - for _, rolling := range rollings { + for rolling := range maps.Keys(rollings) { rolling := rolling wg.Add(1) go func() { defer wg.Done() - id := fmt.Sprintf("ch_rolling_%s", rolling) - r := NewStatusReport(id, constants.Default, 30*time.Second) + r := NewStatusReport(rollings[rolling], constants.Default, 30*time.Second) r(constants.Running, nil) if db.ClickHouseReader == nil { r(constants.Failure, map[string]string{"error": "clickhouse reader is nil"}) diff --git a/backend/pkg/monitoring/services/db_connections.go b/backend/pkg/monitoring/services/db_connections.go index e0fe180b8..3d8326f3f 100644 --- a/backend/pkg/monitoring/services/db_connections.go +++ b/backend/pkg/monitoring/services/db_connections.go @@ -47,11 +47,11 @@ func (s *ServerDbConnections) Start() { } type Entry struct { - ID string + ID constants.Event DB any } -func n[T interface{}](id string, db T) *Entry { +func n[T interface{}](id constants.Event, db T) *Entry { // use reflect to check if db is nil. use reflect. do not simply compare to nil if v := reflect.ValueOf(db); !v.IsValid() || v.IsNil() { return nil @@ -62,19 +62,19 @@ func n[T interface{}](id string, db T) *Entry { func (s *ServerDbConnections) checkDBConnections() { entries := []*Entry{ - n("db_conn_reader_db", db.ReaderDb), - n("db_conn_writer_db", db.WriterDb), - n("db_conn_user_reader", db.UserReader), - n("db_conn_user_writer", db.UserWriter), - n("db_conn_alloy_reader", db.AlloyReader), - n("db_conn_alloy_writer", db.AlloyWriter), - n("db_conn_frontend_reader_db", db.FrontendReaderDB), - n("db_conn_frontend_writer_db", db.FrontendWriterDB), - n("db_conn_clickhouse_reader", db.ClickHouseReader), - n("db_conn_clickhouse_writer", db.ClickHouseWriter), - n("db_conn_clickhouse_native_writer", db.ClickHouseNativeWriter), - n("db_conn_persistent_redis_db_client", db.PersistentRedisDbClient), - n("db_conn_tiered_cache", cache.TieredCache), + n(constants.Event_DBConnReaderDB, db.ReaderDb), + n(constants.Event_DBConnWriterDB, db.WriterDb), + n(constants.Event_DBConnUserReader, db.UserReader), + n(constants.Event_DBConnUserWriter, db.UserWriter), + n(constants.Event_DBConnAlloyReader, db.AlloyReader), + n(constants.Event_DBConnAlloyWriter, db.AlloyWriter), + n(constants.Event_DBConnFrontendReaderDB, db.FrontendReaderDB), + n(constants.Event_DBConnFrontendWriterDB, db.FrontendWriterDB), + n(constants.Event_DBConnClickhouseReader, db.ClickHouseReader), + n(constants.Event_DBConnClickhouseWriter, db.ClickHouseWriter), + n(constants.Event_DBConnClickhouseNativeWriter, db.ClickHouseNativeWriter), + n(constants.Event_DBConnPersistentRedisDbClient, db.PersistentRedisDbClient), + n(constants.Event_DBConnTieredCache, cache.TieredCache), } wg := sync.WaitGroup{} for _, entry := range entries { diff --git a/backend/pkg/monitoring/services/timeout_detector.go b/backend/pkg/monitoring/services/timeout_detector.go index 83f85ccd3..dbc149f14 100644 --- a/backend/pkg/monitoring/services/timeout_detector.go +++ b/backend/pkg/monitoring/services/timeout_detector.go @@ -38,8 +38,7 @@ func (s *ServiceTimeoutDetector) internalProcess() { } func (s *ServiceTimeoutDetector) runChecks() { - id := "monitoring_timeouts" - r := NewStatusReport(id, constants.Default, 30*time.Second) + r := NewStatusReport(constants.Event_MonitoringTimeouts, constants.Default, 30*time.Second) r(constants.Running, nil) if db.ClickHouseReader == nil { r(constants.Failure, map[string]string{"error": "clickhouse reader is nil"}) @@ -109,7 +108,7 @@ func (s *ServiceTimeoutDetector) runChecks() { TimeoutsAt time.Time `db:"timeouts_at"` Metadata map[string]string `db:"metadata"` } - err := db.ClickHouseReader.SelectContext(ctx, &victims, query, utils.Config.DeploymentType, constants.CleanShutdownEvent) + err := db.ClickHouseReader.SelectContext(ctx, &victims, query, utils.Config.DeploymentType, constants.Event_MonitoringCleanShutdown) if err != nil { r(constants.Failure, map[string]string{"error": err.Error()}) return