Skip to content

Commit

Permalink
Merge pull request #846 from gobitfly/BEDS-306/clean_shutdown_event
Browse files Browse the repository at this point in the history
Beds 306/clean shutdown event
  • Loading branch information
invis-bitfly authored Sep 9, 2024
2 parents 0856fac + 43cfa46 commit 6294894
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 58 deletions.
3 changes: 1 addition & 2 deletions backend/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func Run() {
// enable light-weight db connection monitoring
monitoring.Init(false)
monitoring.Start()
defer monitoring.Stop()
}

var dataAccessor dataaccess.DataAccessor
Expand Down Expand Up @@ -98,7 +97,7 @@ func Run() {
}()

utils.WaitForCtrlC()

monitoring.Stop() // this will emit a clean shutdown event
log.Info("shutting down server")
if srv != nil {
shutDownCtx, cancelShutDownCtx := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion backend/cmd/monitoring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func Run() {

monitoring.Init(true)
monitoring.Start()
defer monitoring.Stop()

// gotta wait forever
utils.WaitForCtrlC()
monitoring.Stop()
}
18 changes: 15 additions & 3 deletions backend/pkg/api/data_access/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"slices"

ch "github.com/ClickHouse/clickhouse-go/v2"
"github.com/gobitfly/beaconchain/pkg/api/types"
"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
Expand All @@ -19,7 +20,17 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
var results []types.HealthzResult
var response types.HealthzData
query := `
with active_reports as (
with clean_shutdown_events as (
SELECT
emitter,
toNullable(inserted_at) as inserted_at
FROM
status_reports
WHERE
deployment_type = {deployment_type:String}
AND inserted_at >= now() - interval 1 days
AND event_id = {clean_shutdown_event_id:String}
), active_reports as (
SELECT
event_id,
emitter,
Expand All @@ -31,7 +42,8 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
status,
metadata
FROM status_reports
WHERE expires_at > now() and deployment_type = ?
LEFT JOIN clean_shutdown_events cse ON status_reports.emitter = clean_shutdown_events.emitter
WHERE expires_at > now() and deployment_type = {deployment_type:String} and (status_reports.inserted_at < cse.inserted_at or cse.inserted_at is null)
ORDER BY
event_id ASC,
emitter ASC,
Expand Down Expand Up @@ -99,7 +111,7 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
response.Reports = make(map[string][]types.HealthzResult)
response.ReportingUUID = utils.GetUUID()
response.DeploymentType = utils.Config.DeploymentType
err := db.ClickHouseReader.SelectContext(ctx, &results, query, utils.Config.DeploymentType)
err := db.ClickHouseReader.SelectContext(ctx, &results, query, ch.Named("deployment_type", utils.Config.DeploymentType), ch.Named("clean_shutdown_event_id", constants.CleanShutdownEvent))
if err != nil {
response.Reports["response_error"] = []types.HealthzResult{
{
Expand Down
32 changes: 6 additions & 26 deletions backend/pkg/commons/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,7 @@ func Infof(format string, args ...interface{}) {
}

func InfoWithFields(additionalInfos Fields, msg string) {
logFields := logrus.NewEntry(logrus.New())
for name, info := range additionalInfos {
logFields = logFields.WithField(name, info)
}

logFields.Info(msg)
logrus.WithFields(additionalInfos).Info(msg)
}

func Warn(args ...interface{}) {
Expand All @@ -53,34 +48,19 @@ func Warnf(format string, args ...interface{}) {
}

func WarnWithFields(additionalInfos Fields, msg string) {
logFields := logrus.NewEntry(logrus.New())
for name, info := range additionalInfos {
logFields = logFields.WithField(name, info)
}

logFields.Warn(msg)
logrus.WithFields(additionalInfos).Warn(msg)
}

func Tracef(format string, args ...interface{}) {
logrus.Tracef(format, args...)
}

func TraceWithFields(additionalInfos Fields, msg string) {
logFields := logrus.NewEntry(logrus.New())
for name, info := range additionalInfos {
logFields = logFields.WithField(name, info)
}

logFields.Trace(msg)
logrus.WithFields(additionalInfos).Trace(msg)
}

func DebugWithFields(additionalInfos Fields, msg string) {
logFields := logrus.NewEntry(logrus.New())
for name, info := range additionalInfos {
logFields = logFields.WithField(name, info)
}

logFields.Debug(msg)
logrus.WithFields(additionalInfos).Debug(msg)
}

func Debugf(format string, args ...interface{}) {
Expand All @@ -96,7 +76,7 @@ func logErrorInfo(err error, callerSkip int, additionalInfos ...Fields) *logrus.
}
pc, fullFilePath, line, ok := runtime.Caller(callerSkip + 2)
if ok {
logFields = logFields.WithFields(logrus.Fields{
logFields = logFields.WithFields(Fields{
"_file": filepath.Base(fullFilePath),
"_function": runtime.FuncForPC(pc).Name(),
"_line": line,
Expand Down Expand Up @@ -152,4 +132,4 @@ func logErrorInfo(err error, callerSkip int, additionalInfos ...Fields) *logrus.
return logFields
}

type Fields map[string]interface{}
type Fields = logrus.Fields
2 changes: 2 additions & 0 deletions backend/pkg/monitoring/constants/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ const (
Failure StatusType = "failure"
Default time.Duration = -1 * time.Second
)

const CleanShutdownEvent = "clean_shutdown"
7 changes: 7 additions & 0 deletions backend/pkg/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package monitoring

import (
"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
"github.com/gobitfly/beaconchain/pkg/monitoring/services"
)

Expand Down Expand Up @@ -33,6 +35,7 @@ func Init(full bool) {
&services.ServiceClickhouseRollings{},
&services.ServiceClickhouseEpoch{},
&services.ServiceTimeoutDetector{},
&services.CleanShutdownSpamDetector{},
)
}

Expand All @@ -42,13 +45,17 @@ func Init(full bool) {
}

func Start() {
log.Infof("starting monitoring services")
for _, service := range monitoredServices {
service.Start()
}
}

func Stop() {
log.Infof("stopping monitoring services")
for _, service := range monitoredServices {
service.Stop()
}
// this prevents status reports that werent shut down cleanly from triggering alerts
services.NewStatusReport(constants.CleanShutdownEvent, constants.Default, constants.Default)(constants.Success, nil)
}
31 changes: 9 additions & 22 deletions backend/pkg/monitoring/services/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@ package services
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/gobitfly/beaconchain/pkg/commons/db"
Expand Down Expand Up @@ -45,31 +42,12 @@ func (s *ServiceBase) Stop() {
s.wg.Wait()
}

var isShuttingDown atomic.Bool
var once sync.Once

func autoGracefulStop() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-c
isShuttingDown.Store(true)
}

func NewStatusReport(id string, timeout time.Duration, check_interval time.Duration) func(status constants.StatusType, metadata map[string]string) {
runId := uuid.New().String()
// run if it hasnt started yet
once.Do(func() { go autoGracefulStop() })
return func(status constants.StatusType, metadata map[string]string) {
// acquire snowflake synchronously
flake := utils.GetSnowflake()

go func() {
// check if we are alive
if isShuttingDown.Load() {
log.Info("shutting down, not reporting status")
return
}

if metadata == nil {
metadata = make(map[string]string)
}
Expand All @@ -90,6 +68,15 @@ func NewStatusReport(id string, timeout time.Duration, check_interval time.Durat
if check_interval >= 5*time.Minute {
expires_at = timeouts_at.Add(check_interval)
}
log.TraceWithFields(log.Fields{
"emitter": id,
"event_id": utils.GetUUID(),
"deployment_type": utils.Config.DeploymentType,
"insert_id": flake,
"expires_at": expires_at,
"timeouts_at": timeouts_at,
"metadata": metadata,
}, "sending status report")
var err error
if db.ClickHouseNativeWriter != nil {
err = db.ClickHouseNativeWriter.AsyncInsert(
Expand Down
86 changes: 86 additions & 0 deletions backend/pkg/monitoring/services/clean_shutdown_spam.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package services

import (
"context"
"encoding/json"
"strconv"
"time"

"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
)

type CleanShutdownSpamDetector struct {
ServiceBase
}

func (s *CleanShutdownSpamDetector) Start() {
if !s.running.CompareAndSwap(false, true) {
// already running, return error
return
}
s.wg.Add(1)
go s.internalProcess()
}

func (s *CleanShutdownSpamDetector) internalProcess() {
defer s.wg.Done()
s.runChecks()
for {
select {
case <-s.ctx.Done():
return
case <-time.After(30 * time.Second):
s.runChecks()
}
}
}

func (s *CleanShutdownSpamDetector) runChecks() {
id := "monitoring_clean_shutdown_spam"
r := NewStatusReport(id, constants.Default, 30*time.Second)
r(constants.Running, nil)
if db.ClickHouseReader == nil {
r(constants.Failure, map[string]string{"error": "clickhouse reader is nil"})
// ignore
return
}
log.Tracef("checking clean shutdown spam")

query := `
SELECT
emitter
FROM
status_reports
WHERE
deployment_type = ?
AND inserted_at >= now() - interval 5 minutes
AND event_id = ?
`
ctx, cancel := context.WithTimeout(s.ctx, 15*time.Second)
defer cancel()
var emitters []string
err := db.ClickHouseReader.SelectContext(ctx, &emitters, query, utils.Config.DeploymentType, constants.CleanShutdownEvent)
if err != nil {
r(constants.Failure, map[string]string{"error": err.Error()})
return
}
threshold := 10
md := map[string]string{
"count": strconv.Itoa(len(emitters)),
"threshold": strconv.Itoa(threshold),
}
if len(emitters) > threshold {
payload, err := json.Marshal(emitters)
if err != nil {
r(constants.Failure, map[string]string{"error": err.Error()})
return
}
md["emitters"] = string(payload)
r(constants.Failure, md)
return
}
r(constants.Success, md)
}
4 changes: 2 additions & 2 deletions backend/pkg/monitoring/services/clickhouse_rollings.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *ServiceClickhouseRollings) runChecks() {
err := db.ClickHouseReader.GetContext(ctx, &tsEpochTable, `
SELECT
max(epoch_timestamp)
FROM holesky.validator_dashboard_data_epoch`,
FROM validator_dashboard_data_epoch`,
)
if err != nil {
r(constants.Failure, map[string]string{"error": err.Error()})
Expand All @@ -81,7 +81,7 @@ func (s *ServiceClickhouseRollings) runChecks() {
err = db.ClickHouseReader.GetContext(ctx, &epochRollingTable, fmt.Sprintf(`
SELECT
max(epoch_end)
FROM holesky.validator_dashboard_data_rolling_%s`,
FROM validator_dashboard_data_rolling_%s`,
rolling,
),
)
Expand Down
6 changes: 4 additions & 2 deletions backend/pkg/monitoring/services/timeout_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *ServiceTimeoutDetector) runChecks() {
status,
metadata
FROM status_reports
WHERE expires_at > now() and deployment_type = ?
WHERE expires_at > now() and deployment_type = ? and emitter not in (select distinct emitter from status_reports where event_id = ? and inserted_at > now() - interval 1 days)
ORDER BY
event_id ASC,
emitter ASC,
Expand All @@ -87,6 +87,7 @@ func (s *ServiceTimeoutDetector) runChecks() {
)
SELECT
event_id,
emitter,
status,
inserted_at,
expires_at,
Expand All @@ -101,13 +102,14 @@ func (s *ServiceTimeoutDetector) runChecks() {
defer cancel()
var victims []struct {
EventID string `db:"event_id"`
Emitter string `db:"emitter"`
Status string `db:"status"`
InsertedAt time.Time `db:"inserted_at"`
ExpiresAt time.Time `db:"expires_at"`
TimeoutsAt time.Time `db:"timeouts_at"`
Metadata map[string]string `db:"metadata"`
}
err := db.ClickHouseReader.SelectContext(ctx, &victims, query, utils.Config.DeploymentType)
err := db.ClickHouseReader.SelectContext(ctx, &victims, query, utils.Config.DeploymentType, constants.CleanShutdownEvent)
if err != nil {
r(constants.Failure, map[string]string{"error": err.Error()})
return
Expand Down

0 comments on commit 6294894

Please sign in to comment.