diff --git a/backend/cmd/statistics/main.go b/backend/cmd/statistics/main.go index 9e8f7dfd1..99c9d1ec3 100644 --- a/backend/cmd/statistics/main.go +++ b/backend/cmd/statistics/main.go @@ -27,13 +27,15 @@ import ( ) type options struct { - configPath string - statisticsDayToExport int64 - statisticsDaysToExport string - statisticsValidatorToggle bool - statisticsChartToggle bool - statisticsGraffitiToggle bool - resetStatus bool + configPath string + statisticsDayToExport int64 + statisticsDaysToExport string + statisticsValidatorToggle bool + statisticsChartToggle bool + statisticsGraffitiToggle bool + statisticsDepositsToggle bool + statisticsDepositsInterval time.Duration + resetStatus bool } var opt = &options{} @@ -46,6 +48,8 @@ func Run() { fs.BoolVar(&opt.statisticsValidatorToggle, "validators.enabled", false, "Toggle exporting validator statistics") fs.BoolVar(&opt.statisticsChartToggle, "charts.enabled", false, "Toggle exporting chart series") fs.BoolVar(&opt.statisticsGraffitiToggle, "graffiti.enabled", false, "Toggle exporting graffiti statistics") + fs.BoolVar(&opt.statisticsDepositsToggle, "deposits.enabled", false, "Toggle aggregating deposits") + fs.DurationVar(&opt.statisticsDepositsInterval, "deposits.interval", time.Hour*24, "Duration to wait between deposit aggregation") fs.BoolVar(&opt.resetStatus, "validators.reset", false, "Export stats independent if they have already been exported previously") versionFlag := fs.Bool("version", false, "Show version and exit") @@ -214,6 +218,10 @@ func Run() { go statisticsLoop(rpcClient) + if opt.statisticsDepositsToggle { + go depositsLoop() + } + utils.WaitForCtrlC() log.Infof("exiting...") @@ -253,6 +261,7 @@ func statisticsLoop(client rpc.Client) { if lastExportedDayValidator != 0 { lastExportedDayValidator++ } + if lastExportedDayValidator <= previousDay || lastExportedDayValidator == 0 { for day := lastExportedDayValidator; day <= previousDay; day++ { err := db.WriteValidatorStatisticsForDay(day, client) @@ -324,6 +333,25 @@ func statisticsLoop(client rpc.Client) { } } +func depositsLoop() { + if opt.statisticsDepositsInterval < time.Minute { + log.Fatal(nil, "deposits.interval must be at least 1 minute", 0) + } + time.Sleep(time.Minute) // wait in case the process is in crashloop + for { + start := time.Now() + err := db.AggregateDeposits() + if err != nil { + log.Error(err, "error aggregating deposits", 0) + services.ReportStatus("deposits_aggregator", err.Error(), nil) + } else { + log.InfoWithFields(log.Fields{"duration": time.Since(start)}, "aggregated deposits") + services.ReportStatus("deposits_aggregator", "Running", nil) + } + time.Sleep(opt.statisticsDepositsInterval) + } +} + func clearStatsStatusTable(day uint64) { log.Infof("deleting validator_stats_status for day %v", day) _, err := db.WriterDb.Exec("DELETE FROM validator_stats_status WHERE day = $1", day) diff --git a/backend/pkg/commons/db/statistics.go b/backend/pkg/commons/db/statistics.go index af4642b84..1fc197ce7 100644 --- a/backend/pkg/commons/db/statistics.go +++ b/backend/pkg/commons/db/statistics.go @@ -1843,3 +1843,47 @@ func CheckIfDayIsFinalized(day uint64) error { return nil } + +func AggregateDeposits() error { + start := time.Now() + defer func() { + metrics.TaskDuration.WithLabelValues("statistics_aggregate_eth1_deposits").Observe(time.Since(start).Seconds()) + }() + _, err := WriterDb.Exec(` + INSERT INTO eth1_deposits_aggregated (from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count) + SELECT + eth1.from_address, + SUM(eth1.amount) as amount, + SUM(eth1.validcount) AS validcount, + SUM(eth1.invalidcount) AS invalidcount, + COUNT(CASE WHEN v.status = 'slashed' THEN 1 END) AS slashedcount, + COUNT(v.pubkey) AS totalcount, + COUNT(CASE WHEN v.status = 'active_online' OR v.status = 'active_offline' THEN 1 END) as activecount, + COUNT(CASE WHEN v.status = 'deposited' THEN 1 END) AS pendingcount, + COUNT(CASE WHEN v.status = 'exited' THEN 1 END) AS voluntary_exit_count + FROM ( + SELECT + from_address, + publickey, + SUM(amount) AS amount, + COUNT(CASE WHEN valid_signature = 't' THEN 1 END) AS validcount, + COUNT(CASE WHEN valid_signature = 'f' THEN 1 END) AS invalidcount + FROM eth1_deposits + GROUP BY from_address, publickey + ) eth1 + LEFT JOIN (SELECT pubkey, status FROM validators) v ON v.pubkey = eth1.publickey + GROUP BY eth1.from_address + ON CONFLICT (from_address) DO UPDATE SET + amount = excluded.amount, + validcount = excluded.validcount, + invalidcount = excluded.invalidcount, + slashedcount = excluded.slashedcount, + totalcount = excluded.totalcount, + activecount = excluded.activecount, + pendingcount = excluded.pendingcount, + voluntary_exit_count = excluded.voluntary_exit_count`) + if err != nil && err != sql.ErrNoRows { + return err + } + return nil +} diff --git a/backend/pkg/exporter/modules/execution_deposits_exporter.go b/backend/pkg/exporter/modules/execution_deposits_exporter.go index ef7d2129f..62e5aa668 100644 --- a/backend/pkg/exporter/modules/execution_deposits_exporter.go +++ b/backend/pkg/exporter/modules/execution_deposits_exporter.go @@ -26,7 +26,6 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/contracts/deposit_contract" "github.com/gobitfly/beaconchain/pkg/commons/db" "github.com/gobitfly/beaconchain/pkg/commons/log" - "github.com/gobitfly/beaconchain/pkg/commons/metrics" "github.com/gobitfly/beaconchain/pkg/commons/rpc" "github.com/gobitfly/beaconchain/pkg/commons/services" "github.com/gobitfly/beaconchain/pkg/commons/types" @@ -282,13 +281,6 @@ func (d *executionDepositsExporter) export() (err error) { log.Debugf("updating cached deposits view took %v", time.Since(start)) - if len(depositsToSave) > 0 { - err = d.aggregateDeposits() - if err != nil { - return err - } - } - return nil } @@ -676,51 +668,6 @@ func (d *executionDepositsExporter) getDepositTraces(txsToTrace []string) (filte return filteredTraces, nil } -func (d *executionDepositsExporter) aggregateDeposits() error { - /// this could be a materialized view - start := time.Now() - defer func() { - metrics.TaskDuration.WithLabelValues("exporter_aggregate_eth1_deposits").Observe(time.Since(start).Seconds()) - }() - _, err := db.WriterDb.Exec(` - INSERT INTO eth1_deposits_aggregated (from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count) - SELECT - eth1.from_address, - SUM(eth1.amount) as amount, - SUM(eth1.validcount) AS validcount, - SUM(eth1.invalidcount) AS invalidcount, - COUNT(CASE WHEN v.status = 'slashed' THEN 1 END) AS slashedcount, - COUNT(v.pubkey) AS totalcount, - COUNT(CASE WHEN v.status = 'active_online' OR v.status = 'active_offline' THEN 1 END) as activecount, - COUNT(CASE WHEN v.status = 'deposited' THEN 1 END) AS pendingcount, - COUNT(CASE WHEN v.status = 'exited' THEN 1 END) AS voluntary_exit_count - FROM ( - SELECT - from_address, - publickey, - SUM(amount) AS amount, - COUNT(CASE WHEN valid_signature = 't' THEN 1 END) AS validcount, - COUNT(CASE WHEN valid_signature = 'f' THEN 1 END) AS invalidcount - FROM eth1_deposits - GROUP BY from_address, publickey - ) eth1 - LEFT JOIN (SELECT pubkey, status FROM validators) v ON v.pubkey = eth1.publickey - GROUP BY eth1.from_address - ON CONFLICT (from_address) DO UPDATE SET - amount = excluded.amount, - validcount = excluded.validcount, - invalidcount = excluded.invalidcount, - slashedcount = excluded.slashedcount, - totalcount = excluded.totalcount, - activecount = excluded.activecount, - pendingcount = excluded.pendingcount, - voluntary_exit_count = excluded.voluntary_exit_count`) - if err != nil && err != sql.ErrNoRows { - return nil - } - return err -} - func (d *executionDepositsExporter) updateCachedView() error { err := db.CacheQuery(` SELECT