Skip to content

Commit

Permalink
add(monitoring): use constants, add v2 exporter monitoring, monitor c…
Browse files Browse the repository at this point in the history
…aller
  • Loading branch information
invis-bitfly committed Jan 13, 2025
1 parent 2e7483f commit 9b3be63
Show file tree
Hide file tree
Showing 26 changed files with 191 additions and 67 deletions.
5 changes: 5 additions & 0 deletions backend/cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/version"
"github.com/gobitfly/beaconchain/pkg/exporter/modules"
"github.com/gobitfly/beaconchain/pkg/exporter/services"
"github.com/gobitfly/beaconchain/pkg/monitoring"
)

func Run() {
Expand Down Expand Up @@ -136,6 +137,10 @@ func Run() {

wg.Wait()

// enable light-weight db connection monitoring
monitoring.Init(false)
monitoring.Start()

if utils.Config.TieredCacheProvider != "redis" {
log.Fatal(fmt.Errorf("no cache provider set, please set TierdCacheProvider (example redis)"), "", 0)
}
Expand Down
3 changes: 1 addition & 2 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ require (
github.com/juliangruber/go-intersect v1.1.0
github.com/jung-kurt/gofpdf v1.16.2
github.com/k3a/html2text v1.2.1
github.com/kelseyhightower/envconfig v1.4.0
github.com/klauspost/compress v1.17.6
github.com/klauspost/pgzip v1.2.6
github.com/lib/pq v1.10.9
Expand All @@ -66,6 +65,7 @@ require (
github.com/prysmaticlabs/go-ssz v0.0.0-20210121151755-f6208871c388
github.com/rocket-pool/rocketpool-go v1.8.4-0.20241009143357-7b6894d57365
github.com/rocket-pool/smartnode v1.14.1
github.com/sethvargo/go-envconfig v1.1.0
github.com/shopspring/decimal v1.4.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -234,7 +234,6 @@ require (
github.com/sanity-io/litter v1.5.5 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/sethvargo/go-envconfig v1.1.0 // indirect
github.com/sethvargo/go-retry v0.2.4 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions backend/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,6 @@ github.com/k3a/html2text v1.2.1 h1:nvnKgBvBR/myqrwfLuiqecUtaK1lB9hGziIJKatNFVY=
github.com/k3a/html2text v1.2.1/go.mod h1:ieEXykM67iT8lTvEWBh6fhpH4B23kB9OMKPdIBmgUqA=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
Expand Down
21 changes: 4 additions & 17 deletions backend/pkg/api/data_access/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
response.Reports = make(map[string][]types.HealthzResult)
response.ReportingUUID = utils.GetUUID()
response.DeploymentType = utils.Config.DeploymentType
err := db.ClickHouseReader.SelectContext(ctx, &results, query, ch.Named("deployment_type", utils.Config.DeploymentType), ch.Named("clean_shutdown_event_id", constants.CleanShutdownEvent))
err := db.ClickHouseReader.SelectContext(ctx, &results, query, ch.Named("deployment_type", utils.Config.DeploymentType), ch.Named("clean_shutdown_event_id", constants.Event_MonitoringCleanShutdown))
if err != nil {
response.Reports["response_error"] = []types.HealthzResult{
{
Expand All @@ -125,25 +125,12 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
return response
}

mustExist := []string{
"ch_rolling_1h",
"ch_rolling_24h",
"ch_rolling_7d",
"ch_rolling_30d",
"ch_rolling_90d",
"ch_rolling_total",
"ch_dashboard_epoch",
"api_service_avg_efficiency",
"api_service_validator_mapping",
"api_service_slot_viz",
"monitoring_timeouts",
}
for _, result := range results {
response.Reports[result.EventId] = append(response.Reports[result.EventId], result)
}
for _, id := range mustExist {
if _, ok := response.Reports[id]; !ok {
response.Reports[id] = []types.HealthzResult{
for _, id := range constants.RequiredEvents {
if _, ok := response.Reports[string(id)]; !ok {
response.Reports[string(id)] = []types.HealthzResult{
{
Status: constants.Failure,
Result: []map[string]string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (s *Services) startEfficiencyDataService(wg *sync.WaitGroup) {
for {
startTime := time.Now()
delay := time.Duration(utils.Config.Chain.ClConfig.SlotsPerEpoch*utils.Config.Chain.ClConfig.SecondsPerSlot) * time.Second
r := services.NewStatusReport("api_service_avg_efficiency", constants.Default, delay)
r := services.NewStatusReport(constants.Event_ApiServiceAvgEfficiency, constants.Default, delay)
r(constants.Running, nil)
err := s.updateEfficiencyData() // TODO: only update data if something has changed (new head epoch)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/api/services/service_slot_viz.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *Services) startSlotVizDataService(wg *sync.WaitGroup) {
for {
startTime := time.Now()
delay := time.Duration(utils.Config.Chain.ClConfig.SecondsPerSlot) * time.Second
r := services.NewStatusReport("api_service_slot_viz", constants.Default, delay)
r := services.NewStatusReport(constants.Event_ApiServiceSlotViz, constants.Default, delay)
r(constants.Running, nil)
err := s.updateSlotVizData() // TODO: only update data if something has changed (new head slot or new head epoch)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/api/services/service_validator_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *Services) startIndexMappingService(wg *sync.WaitGroup) {
startTime := time.Now()
delay := time.Duration(utils.Config.Chain.ClConfig.SecondsPerSlot) * time.Second
err = nil // clear error
r := services.NewStatusReport("api_service_validator_mapping", constants.Default, delay)
r := services.NewStatusReport(constants.Event_ApiServiceValidatorMapping, constants.Default, delay)
r(constants.Running, nil)
latestEpoch := cache.LatestEpoch.Get()
if currentValidatorMapping.Load() == nil || latestEpoch != lastEpochUpdate {
Expand Down
11 changes: 9 additions & 2 deletions backend/pkg/exporter/modules/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/gobitfly/beaconchain/pkg/consapi"
"github.com/gobitfly/beaconchain/pkg/consapi/types"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
"github.com/gobitfly/beaconchain/pkg/monitoring/services"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

type ModuleInterface interface {
Init() error
GetName() string // Used for logging
GetMonitoringEventId() constants.Event

OnHead(*types.StandardEventHeadResponse) error // !Do not block in this functions for an extended period of time!

Expand Down Expand Up @@ -63,7 +66,6 @@ func StartAll(context ModuleContext, modules []ModuleInterface, justV2 bool) {
log.Error(err, "beacon-node seems to be unavailable", 0)
time.Sleep(time.Second * 10)
}

// start subscription modules
startSubscriptionModules(&context, modules)
}
Expand Down Expand Up @@ -146,15 +148,20 @@ func notifyAllModules(goPool *errgroup.Group, modules []ModuleInterface, f func(
for _, module := range modules {
module := module
goPool.Go(func() error {
start := time.Now()
r := services.NewStatusReport(module.GetMonitoringEventId(), constants.Default, constants.Default)
r(constants.Running, nil)
err := f(module)
if err != nil {
log.Error(err, fmt.Sprintf("error in module %s", module.GetName()), 0)
r(constants.Failure, map[string]string{"error": err.Error()})
return nil // return never gets caught anywhere? lets not risk a memory leak and instead return nil
}
r(constants.Success, map[string]string{"took_raw": fmt.Sprintf("%v", time.Since(start).Milliseconds())})
return nil
})
}
}

func GetModuleContext() (ModuleContext, error) {
cl := consapi.NewClient("http://" + utils.Config.Indexer.Node.Host + ":" + utils.Config.Indexer.Node.Port)

Expand Down
5 changes: 5 additions & 0 deletions backend/pkg/exporter/modules/dashboard_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/consapi/network"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
edb "github.com/gobitfly/beaconchain/pkg/exporter/db"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -992,6 +993,10 @@ func (d *dashboardData) GetName() string {
return "Dashboard-Data"
}

func (d *dashboardData) GetMonitoringEventId() constants.Event {
return constants.Event_ExporterModuleDashboardData
}

func (d *dashboardData) OnHead(event *constypes.StandardEventHeadResponse) error {
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions backend/pkg/exporter/modules/execution_deposits_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
)

// if we ever end up in a situation where we possibly have gaps in the data remember: the merkletree_index is unique.
Expand Down Expand Up @@ -155,6 +156,10 @@ func (d *executionDepositsExporter) GetName() string {
return "ExecutionDeposits-Exporter"
}

func (d *executionDepositsExporter) GetMonitoringEventId() constants.Event {
return constants.Event_ExporterModuleELDepositsExporter
}

func (d *executionDepositsExporter) OnChainReorg(event *constypes.StandardEventChainReorg) (err error) {
return nil // nop
}
Expand Down
5 changes: 5 additions & 0 deletions backend/pkg/exporter/modules/execution_payloads_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/types"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -50,6 +51,10 @@ func (d *executionPayloadsExporter) GetName() string {
return "ExecutionPayloads-Exporter"
}

func (d *executionPayloadsExporter) GetMonitoringEventId() constants.Event {
return constants.Event_ExporterModuleELPayloadExporter
}

func (d *executionPayloadsExporter) OnChainReorg(event *constypes.StandardEventChainReorg) (err error) {
return nil // nop
}
Expand Down
7 changes: 7 additions & 0 deletions backend/pkg/exporter/modules/execution_rewards_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
"github.com/pkg/errors"
)

type executionRewardsFinalizer struct {
Expand All @@ -31,6 +33,10 @@ func (d *executionRewardsFinalizer) GetName() string {
return "ExecutionRewards-Finalizer"
}

func (d *executionRewardsFinalizer) GetMonitoringEventId() constants.Event {
return constants.Event_ExporterModuleELRewardsFinalizer
}

func (d *executionRewardsFinalizer) OnChainReorg(event *constypes.StandardEventChainReorg) (err error) {
return nil // nop
}
Expand Down Expand Up @@ -89,6 +95,7 @@ func (d *executionRewardsFinalizer) maintainTable() (err error) {
return nil
}
log.Infof("finalized rewards = last exported slot: %v, latest finalized slot: %v", lastExportedSlot, latestFinalizedSlot)
return errors.New("testing mode")

start := time.Now()

Check failure on line 100 in backend/pkg/exporter/modules/execution_rewards_finalizer.go

View workflow job for this annotation

GitHub Actions / lint

unreachable: unreachable code (govet)
ds := goqu.Dialect("postgres").Insert("execution_rewards_finalized").FromQuery(
Expand Down
12 changes: 12 additions & 0 deletions backend/pkg/exporter/modules/network_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
"github.com/gobitfly/beaconchain/pkg/monitoring/services"
)

func networkLivenessUpdater(client rpc.Client) {
Expand All @@ -21,20 +23,26 @@ func networkLivenessUpdater(client rpc.Client) {
slotDuration := time.Second * time.Duration(utils.Config.Chain.ClConfig.SecondsPerSlot)

for {
r := services.NewStatusReport(constants.Event_ExporterLegacyNetworkLiveness, constants.Default, slotDuration)
r(constants.Running, nil)

head, err := client.GetChainHead()
if err != nil {
log.Error(err, "error getting chainhead when exporting networkliveness", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
time.Sleep(slotDuration)
continue
}

if prevHeadEpoch == head.HeadEpoch {
r(constants.Success, nil)
time.Sleep(slotDuration)
continue
}

// wait for node to be synced
if time.Now().Add(-epochDuration).After(utils.EpochToTime(head.HeadEpoch)) {
r(constants.Failure, map[string]string{"error": "node not synced"})
time.Sleep(slotDuration)
continue
}
Expand All @@ -45,6 +53,7 @@ func networkLivenessUpdater(client rpc.Client) {
head.HeadEpoch, head.FinalizedEpoch, head.JustifiedEpoch, head.PreviousJustifiedEpoch)
if err != nil {
log.Error(err, "error saving networkliveness", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
} else {
log.Infof("updated networkliveness for epoch %v", head.HeadEpoch)
prevHeadEpoch = head.HeadEpoch
Expand All @@ -53,12 +62,15 @@ func networkLivenessUpdater(client rpc.Client) {
err = cache.LatestNodeEpoch.Set(head.HeadEpoch)
if err != nil {
log.Error(err, "error setting latestNodeEpoch in cache", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
}

err = cache.LatestNodeFinalizedEpoch.Set(head.FinalizedEpoch)
if err != nil {
log.Error(err, "error setting latestNodeFinalizedEpoch in cache", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
}
r(constants.Success, nil)

time.Sleep(slotDuration)
}
Expand Down
9 changes: 8 additions & 1 deletion backend/pkg/exporter/modules/pubkey_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
monitoringServices "github.com/gobitfly/beaconchain/pkg/monitoring/services"
)

func UpdatePubkeyTag() {
log.Infof("Started Pubkey Tags Updater")
for {
start := time.Now()

r := monitoringServices.NewStatusReport(constants.Event_ExporterLegacyPubkeyTags, constants.Default, time.Second*12)
r(constants.Running, nil)
tx, err := db.WriterDb.Beginx()
if err != nil {
log.Error(err, "Error connecting to DB", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
// return err
}
_, err = tx.Exec(`INSERT INTO validator_tags (publickey, tag)
Expand All @@ -26,16 +30,19 @@ func UpdatePubkeyTag() {
ON CONFLICT (publickey, tag) DO NOTHING;`)
if err != nil {
log.Error(err, "error updating validator_tags", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
// return err
}

err = tx.Commit()
if err != nil {
log.Error(err, "error committing transaction", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
}
_ = tx.Rollback()

log.Infof("Updating Pubkey Tags took %v sec.", time.Since(start).Seconds())
r(constants.Success, map[string]string{"took": time.Since(start).String(), "took_raw": time.Since(start).String()})
metrics.TaskDuration.WithLabelValues("validator_pubkey_tag_updater").Observe(time.Since(start).Seconds())

time.Sleep(time.Minute * 10)
Expand Down
7 changes: 7 additions & 0 deletions backend/pkg/exporter/modules/rocketpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
"github.com/gobitfly/beaconchain/pkg/commons/services"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
monitoringServices "github.com/gobitfly/beaconchain/pkg/monitoring/services"
"github.com/pkg/errors"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -189,23 +191,28 @@ func (rp *RocketpoolExporter) Run() error {

for {
t0 := time.Now()
r := monitoringServices.NewStatusReport(constants.Event_ExporterLegacyRocketPool, constants.Default, time.Second*12)
r(constants.Running, nil)
var err error
err = rp.Update(count)
if err != nil {
log.Error(err, "error updating rocketpool-data", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
time.Sleep(errorInterval)
continue
}
err = rp.Save(count)
if err != nil {
log.Error(err, "error saving rocketpool-data", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
time.Sleep(errorInterval)
continue
}

services.ReportStatus("rocketpoolExporter", "Running", nil)

metrics.TaskDuration.WithLabelValues("exporter_rocketpoolExporter").Observe(time.Since(t0).Seconds())
r(constants.Success, map[string]string{"took": time.Since(t0).String(), "took_raw": fmt.Sprintf("%v", time.Since(t0).Milliseconds())})

log.InfoWithFields(log.Fields{"duration": time.Since(t0)}, "exported rocketpool-data")
count++
Expand Down
Loading

0 comments on commit 9b3be63

Please sign in to comment.