Skip to content

Commit

Permalink
Merge pull request #297 from gobitfly/NOBIDS/execution-rewards-wip
Browse files Browse the repository at this point in the history
execution rewards
  • Loading branch information
guybrush authored May 16, 2024
2 parents 3302625 + e18ac51 commit 15e810e
Show file tree
Hide file tree
Showing 7 changed files with 490 additions and 56 deletions.
62 changes: 62 additions & 0 deletions backend/pkg/commons/db/bigtable_eth1.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,68 @@ func (bigtable *Bigtable) GetFullBlocksDescending(stream chan<- *types.Eth1Block
return nil
}

// same as above but for indexed blocks
func (bigtable *Bigtable) StreamBlocksIndexedDescending(stream chan<- *types.Eth1BlockIndexed, high, low uint64) error {
tmr := time.AfterFunc(REPORT_TIMEOUT, func() {
log.WarnWithFields(log.Fields{
"high": high,
"low": low,
"func": utils.GetCurrentFuncName(),
"duration": REPORT_TIMEOUT,
}, "call took longer than expected")
})
defer tmr.Stop()

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*3))
defer cancel()

if high < low {
return fmt.Errorf("invalid block range provided (high: %v, low: %v)", high, low)
}

if high > 0 {
limitedLow := low
if limitedLow == 0 {
// block 0 cannot be included in the range as it is padded incorrectly (will be fetched last, see below)
limitedLow = 1
}

highKey := fmt.Sprintf("%s:B:%s", bigtable.chainId, reversedPaddedBlockNumber(high))
lowKey := fmt.Sprintf("%s:B:%s\x00", bigtable.chainId, reversedPaddedBlockNumber(limitedLow)) // add \x00 to make the range inclusive

limit := high - limitedLow + 1

rowRange := gcp_bigtable.NewRange(highKey, lowKey)
rowFilter := gcp_bigtable.RowFilter(gcp_bigtable.ColumnFilter("d"))
rowHandler := func(row gcp_bigtable.Row) bool {
block := types.Eth1BlockIndexed{}
err := proto.Unmarshal(row["f"][0].Value, &block)
if err != nil {
log.Error(err, "error could not unmarschal proto object", 0)
return false
}
stream <- &block
return true
}

err := bigtable.tableData.ReadRows(ctx, rowRange, rowHandler, rowFilter, gcp_bigtable.LimitRows(int64(limit)))
if err != nil {
return err
}
}

if low == 0 {
// special handling for block 0 which is padded incorrectly
b, err := BigtableClient.GetBlocksDescending(0, 1)
if err != nil {
return fmt.Errorf("could not retrieve block 0: %v", err)
}
stream <- b[0]
}

return nil
}

func (bigtable *Bigtable) GetBlocksIndexedMultiple(blockNumbers []uint64, limit uint64) ([]*types.Eth1BlockIndexed, error) {
tmr := time.AfterFunc(REPORT_TIMEOUT, func() {
log.WarnWithFields(log.Fields{
Expand Down
48 changes: 48 additions & 0 deletions backend/pkg/commons/db/db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package db

import (
"context"
"database/sql"
"embed"
"encoding/hex"
Expand All @@ -25,7 +26,10 @@ import (
"github.com/pressly/goose/v3"

"github.com/go-redis/redis/v8"
pgxdecimal "github.com/jackc/pgx-shopspring-decimal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
)

//go:embed migrations/*.sql
Expand Down Expand Up @@ -2345,6 +2349,7 @@ func CacheQuery(query string, viewName string, indexes ...[]string) error {
if err != nil {
return fmt.Errorf("error granting all on %s materialized view: %w", tmpViewName, err)
}

// swap views
_, err = tx.Exec(fmt.Sprintf(`ALTER MATERIALIZED VIEW if exists %s RENAME TO %s;`, viewName, trashViewName))
if err != nil {
Expand Down Expand Up @@ -2373,3 +2378,46 @@ func CacheQuery(query string, viewName string, indexes ...[]string) error {
}
return nil
}

// copy from utils func
func CopyToTable[T []any](tableName string, columns []string, data []T) error {
conn, err := WriterDb.Conn(context.Background())
if err != nil {
return fmt.Errorf("error retrieving raw sql connection: %w", err)
}
defer conn.Close()
err = conn.Raw(func(driverConn interface{}) error {
conn := driverConn.(*stdlib.Conn).Conn()

pgxdecimal.Register(conn.TypeMap())
tx, err := conn.Begin(context.Background())

if err != nil {
return err
}
defer func() {
err := tx.Rollback(context.Background())
if err != nil && !errors.Is(err, pgx.ErrTxClosed) {
log.Error(err, "error rolling back transaction", 0)
}
}()
_, err = tx.CopyFrom(context.Background(), pgx.Identifier{tableName}, columns,
pgx.CopyFromSlice(len(data), func(i int) ([]interface{}, error) {
return data[i], nil
}))

if err != nil {
return err
}

err = tx.Commit(context.Background())
if err != nil {
return err
}
return nil
})
if err != nil {
return fmt.Errorf("error copying data to %s: %w", tableName, err)
}
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- +goose Up
-- +goose StatementBegin
SELECT('up SQL query - set all empty execution_block_hashes to NULL');
UPDATE blocks SET exec_block_hash = NULL WHERE exec_block_hash = '\x';
-- +goose StatementEnd
-- +goose StatementBegin
SELECT('up SQL query - add unique constraint on execution_block_hashes');
ALTER TABLE blocks ADD CONSTRAINT blocks_exec_block_hash_unique UNIQUE (exec_block_hash);
-- +goose StatementEnd
-- +goose StatementBegin
SELECT('up SQL query - create execution_payload table');
CREATE TABLE execution_payloads (
block_hash bytea NOT NULL,
fee_recipient_reward numeric(78, 18) NULL,
CONSTRAINT execution_payloads_pk PRIMARY KEY (block_hash)
);
-- +goose StatementEnd
-- +goose StatementBegin
SELECT('up SQL query - create index on execution_payloads table');
CREATE UNIQUE INDEX execution_payloads_block_hash_idx ON execution_payloads USING btree (block_hash, fee_recipient_reward);
-- +goose StatementEnd
-- +goose StatementBegin
SELECT('up SQL query - prefilling execution_payloads table with empty values');
INSERT INTO execution_payloads (block_hash) SELECT exec_block_hash FROM blocks where exec_block_hash IS NOT NULL;
-- +goose StatementEnd
-- +goose StatementBegin
SELECT('up SQL query - add foreign key constraint to blocks table');
ALTER TABLE blocks ADD CONSTRAINT blocks_execution_payloads_fk FOREIGN KEY (exec_block_hash) REFERENCES execution_payloads(block_hash);
-- +goose StatementEnd


-- +goose Down
-- +goose StatementBegin
SELECT('down SQL query - drop foreign key constraint to execution_payloads table');
ALTER TABLE blocks DROP CONSTRAINT blocks_execution_payloads_fk;
-- +goose StatementEnd
-- +goose StatementBegin
SELECT('down SQL query - drop execution_payloads table');
DROP TABLE execution_payloads;
-- +goose StatementEnd
-- +goose StatementBegin
SELECT('down SQL query - drop unique constraint on execution_block_hashes');
ALTER TABLE blocks DROP CONSTRAINT blocks_exec_block_hash_unique;
-- +goose StatementEnd
-- +goose StatementBegin
SELECT('down SQL query - set all NULL execution_block_hashes to empty (better safe than sorry)');
UPDATE blocks SET exec_block_hash = '\x' WHERE exec_block_hash IS NULL;
-- +goose StatementEnd
131 changes: 77 additions & 54 deletions backend/pkg/exporter/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ func saveBlocks(blocks map[uint64]map[string]*types.Block, tx *sqlx.Tx, forceSlo
return err
}

stmtExecutionPayload, err := tx.Prepare(`
INSERT INTO execution_payloads (block_hash)
VALUES ($1)
ON CONFLICT (block_hash) DO NOTHING`)
if err != nil {
return err
}
defer stmtExecutionPayload.Close()

stmtBlock, err := tx.Prepare(`
INSERT INTO blocks (epoch, slot, blockroot, parentroot, stateroot, signature, randaoreveal, graffiti, graffiti_text, eth1data_depositroot, eth1data_depositcount, eth1data_blockhash, syncaggregate_bits, syncaggregate_signature, proposerslashingscount, attesterslashingscount, attestationscount, depositscount, withdrawalcount, voluntaryexitscount, syncaggregate_participation, proposer, status, exec_parent_hash, exec_fee_recipient, exec_state_root, exec_receipts_root, exec_logs_bloom, exec_random, exec_block_number, exec_gas_limit, exec_gas_used, exec_timestamp, exec_extra_data, exec_base_fee_per_gas, exec_block_hash, exec_transactions_count, exec_blob_gas_used, exec_excess_blob_gas, exec_blob_transactions_count)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40)
Expand Down Expand Up @@ -185,43 +194,57 @@ func saveBlocks(blocks map[uint64]map[string]*types.Block, tx *sqlx.Tx, forceSlo
// blockLog = blockLog.WithField("syncParticipation", b.SyncAggregate.SyncAggregateParticipation)
}

parentHash := []byte{}
feeRecipient := []byte{}
stateRoot := []byte{}
receiptRoot := []byte{}
logsBloom := []byte{}
random := []byte{}
blockNumber := uint64(0)
gasLimit := uint64(0)
gasUsed := uint64(0)
timestamp := uint64(0)
extraData := []byte{}
baseFeePerGas := uint64(0)
blockHash := []byte{}
txCount := 0
withdrawalCount := 0
blobGasUsed := uint64(0)
excessBlobGas := uint64(0)
blobTxCount := 0
type exectionPayloadData struct {
ParentHash []byte
FeeRecipient []byte
StateRoot []byte
ReceiptRoot []byte
LogsBloom []byte
Random []byte
BlockNumber *uint64
GasLimit *uint64
GasUsed *uint64
Timestamp *uint64
ExtraData []byte
BaseFeePerGas *uint64
BlockHash []byte
TxCount *int64
WithdrawalCount *int64
BlobGasUsed *uint64
ExcessBlobGas *uint64
BlobTxCount *int64
}

execData := new(exectionPayloadData)

if b.ExecutionPayload != nil {
parentHash = b.ExecutionPayload.ParentHash
feeRecipient = b.ExecutionPayload.FeeRecipient
stateRoot = b.ExecutionPayload.StateRoot
receiptRoot = b.ExecutionPayload.ReceiptsRoot
logsBloom = b.ExecutionPayload.LogsBloom
random = b.ExecutionPayload.Random
blockNumber = b.ExecutionPayload.BlockNumber
gasLimit = b.ExecutionPayload.GasLimit
gasUsed = b.ExecutionPayload.GasUsed
timestamp = b.ExecutionPayload.Timestamp
extraData = b.ExecutionPayload.ExtraData
baseFeePerGas = b.ExecutionPayload.BaseFeePerGas
blockHash = b.ExecutionPayload.BlockHash
txCount = len(b.ExecutionPayload.Transactions)
withdrawalCount = len(b.ExecutionPayload.Withdrawals)
blobGasUsed = b.ExecutionPayload.BlobGasUsed
excessBlobGas = b.ExecutionPayload.ExcessBlobGas
blobTxCount = len(b.BlobKZGCommitments)
txCount := int64(len(b.ExecutionPayload.Transactions))
withdrawalCount := int64(len(b.ExecutionPayload.Withdrawals))
blobTxCount := int64(len(b.BlobKZGCommitments))
execData = &exectionPayloadData{
ParentHash: b.ExecutionPayload.ParentHash,
FeeRecipient: b.ExecutionPayload.FeeRecipient,
StateRoot: b.ExecutionPayload.StateRoot,
ReceiptRoot: b.ExecutionPayload.ReceiptsRoot,
LogsBloom: b.ExecutionPayload.LogsBloom,
Random: b.ExecutionPayload.Random,
BlockNumber: &b.ExecutionPayload.BlockNumber,
GasLimit: &b.ExecutionPayload.GasLimit,
GasUsed: &b.ExecutionPayload.GasUsed,
Timestamp: &b.ExecutionPayload.Timestamp,
ExtraData: b.ExecutionPayload.ExtraData,
BaseFeePerGas: &b.ExecutionPayload.BaseFeePerGas,
BlockHash: b.ExecutionPayload.BlockHash,
TxCount: &txCount,
WithdrawalCount: &withdrawalCount,
BlobGasUsed: &b.ExecutionPayload.BlobGasUsed,
ExcessBlobGas: &b.ExecutionPayload.ExcessBlobGas,
BlobTxCount: &blobTxCount,
}
_, err = stmtExecutionPayload.Exec(execData.BlockHash)
if err != nil {
return fmt.Errorf("error executing stmtExecutionPayload for block %v: %w", b.Slot, err)
}
}
_, err = stmtBlock.Exec(
b.Slot/utils.Config.Chain.ClConfig.SlotsPerEpoch,
Expand All @@ -242,28 +265,28 @@ func saveBlocks(blocks map[uint64]map[string]*types.Block, tx *sqlx.Tx, forceSlo
len(b.AttesterSlashings),
len(b.Attestations),
len(b.Deposits),
withdrawalCount,
execData.WithdrawalCount,
len(b.VoluntaryExits),
syncAggParticipation,
b.Proposer,
strconv.FormatUint(b.Status, 10),
parentHash,
feeRecipient,
stateRoot,
receiptRoot,
logsBloom,
random,
blockNumber,
gasLimit,
gasUsed,
timestamp,
extraData,
baseFeePerGas,
blockHash,
txCount,
blobGasUsed,
excessBlobGas,
blobTxCount,
execData.ParentHash,
execData.FeeRecipient,
execData.StateRoot,
execData.ReceiptRoot,
execData.LogsBloom,
execData.Random,
execData.BlockNumber,
execData.GasLimit,
execData.GasUsed,
execData.Timestamp,
execData.ExtraData,
execData.BaseFeePerGas,
execData.BlockHash,
execData.TxCount,
execData.BlobGasUsed,
execData.ExcessBlobGas,
execData.BlobTxCount,
)
if err != nil {
return fmt.Errorf("error executing stmtBlocks for block %v: %w", b.Slot, err)
Expand Down
4 changes: 3 additions & 1 deletion backend/pkg/exporter/modules/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func StartAll(context ModuleContext) {
} else {
modules = append(modules,
NewSlotExporter(context),
NewExecutionDepositsExporter(context))
NewExecutionDepositsExporter(context),
NewExecutionPayloadsExporter(context),
)
}

startSubscriptionModules(&context, modules)
Expand Down
4 changes: 3 additions & 1 deletion backend/pkg/exporter/modules/execution_deposits_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (

// if we ever end up in a situation where we possibly have gaps in the data remember: the merkletree_index is unique.
// if one is missing, simple take the blocks from the merkletree_index before and after and fetch the deposits again.
// tho it is sadly stored as a little endian encoded int so you will have to convert it to a number first.
// fixing this would've required messing with v1, better to do when its gone

type executionDepositsExporter struct {
ModuleContext
Expand Down Expand Up @@ -234,7 +236,7 @@ func (d *executionDepositsExporter) exportTillBlock(block uint64) (err error) {
return err
}

log.Debugf("updating cached view took %v", time.Since(start))
log.Debugf("updating cached deposits view took %v", time.Since(start))

if len(depositsToSave) > 0 {
err = d.aggregateDeposits()
Expand Down
Loading

0 comments on commit 15e810e

Please sign in to comment.