Skip to content

Commit

Permalink
*: electra versioned aggregations (#3482)
Browse files Browse the repository at this point in the history
* Add electra versioned aggregations

* Bump go-eth2-client version and fix TODOs

* Write own switch cases, instead of waiting on go-eth2-client util funcs

* Fix linter

* WIP

* Aggregations fixes and tests updates

* Fix submitAttestations parsing

* Bump go-eth2-client

* Fix teku integration test
  • Loading branch information
KaloyanTanev committed Jan 31, 2025
1 parent 04bb2ff commit 21907ff
Show file tree
Hide file tree
Showing 52 changed files with 1,273 additions and 406 deletions.
14 changes: 7 additions & 7 deletions app/eth2wrap/eth2wrap_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions app/eth2wrap/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions app/eth2wrap/success.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package eth2wrap
import (
eth2api "github.com/attestantio/go-eth2-client/api"
eth2v1 "github.com/attestantio/go-eth2-client/api/v1"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
eth2spec "github.com/attestantio/go-eth2-client/spec"
)

// isSyncStateOk returns true if the sync state is not syncing.
Expand All @@ -14,6 +14,6 @@ func isSyncStateOk(resp *eth2api.Response[*eth2v1.SyncState]) bool {
}

// isAggregateAttestationOk returns true if the aggregate attestation is not nil (which can happen if the subscription wasn't successful).
func isAggregateAttestationOk(resp *eth2api.Response[*eth2p0.Attestation]) bool {
func isAggregateAttestationOk(resp *eth2api.Response[*eth2spec.VersionedAttestation]) bool {
return resp.Data != nil
}
11 changes: 5 additions & 6 deletions core/bcast/bcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
eth2api "github.com/attestantio/go-eth2-client/api"
eth2spec "github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/altair"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap"
Expand Down Expand Up @@ -240,18 +239,18 @@ func setToSyncMessages(set core.SignedDataSet) ([]*altair.SyncCommitteeMessage,
}

// setToAggAndProof converts a set of signed data into a list of aggregate and proofs.
func setToAggAndProof(set core.SignedDataSet) ([]*eth2p0.SignedAggregateAndProof, error) {
var resp []*eth2p0.SignedAggregateAndProof
func setToAggAndProof(set core.SignedDataSet) (*eth2api.SubmitAggregateAttestationsOpts, error) {
var resp []*eth2spec.VersionedSignedAggregateAndProof
for _, aggAndProof := range set {
aggAndProof, ok := aggAndProof.(core.SignedAggregateAndProof)
aggAndProof, ok := aggAndProof.(core.VersionedSignedAggregateAndProof)
if !ok {
return nil, errors.New("invalid aggregate and proof")
}

resp = append(resp, &aggAndProof.SignedAggregateAndProof)
resp = append(resp, &aggAndProof.VersionedSignedAggregateAndProof)
}

return resp, nil
return &eth2api.SubmitAggregateAttestationsOpts{SignedAggregateAndProofs: resp}, nil
}

// setToRegistrations converts a set of signed data into a list of registrations.
Expand Down
10 changes: 6 additions & 4 deletions core/bcast/bcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,13 @@ func aggregateAttestationData(t *testing.T, mock *beaconmock.Mock) test {
t.Helper()

asserted := make(chan struct{})
aggAndProof := testutil.RandomSignedAggregateAndProof()
aggData := core.SignedAggregateAndProof{SignedAggregateAndProof: *aggAndProof}
aggAndProof := testutil.RandomDenebVersionedSignedAggregateAndProof()
aggData := core.VersionedSignedAggregateAndProof{
VersionedSignedAggregateAndProof: *aggAndProof,
}

mock.SubmitAggregateAttestationsFunc = func(ctx context.Context, aggregateAndProofs []*eth2p0.SignedAggregateAndProof) error {
require.Equal(t, aggAndProof, aggregateAndProofs[0])
mock.SubmitAggregateAttestationsFunc = func(ctx context.Context, aggregateAndProofs *eth2api.SubmitAggregateAttestationsOpts) error {
require.Equal(t, aggAndProof, aggregateAndProofs.SignedAggregateAndProofs[0])
close(asserted)

return nil
Expand Down
25 changes: 15 additions & 10 deletions core/dutydb/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

eth2api "github.com/attestantio/go-eth2-client/api"
eth2spec "github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/altair"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"

Expand All @@ -22,7 +23,7 @@ func NewMemDB(deadliner core.Deadliner) *MemDB {
attPubKeys: make(map[pkKey]core.PubKey),
attKeysBySlot: make(map[uint64][]pkKey),
proDuties: make(map[uint64]*eth2api.VersionedProposal),
aggDuties: make(map[aggKey]core.AggregatedAttestation),
aggDuties: make(map[aggKey]core.VersionedAggregatedAttestation),
aggKeysBySlot: make(map[uint64][]aggKey),
contribDuties: make(map[contribKey]*altair.SyncCommitteeContribution),
contribKeysBySlot: make(map[uint64][]contribKey),
Expand All @@ -47,7 +48,7 @@ type MemDB struct {
proQueries []proQuery

// DutyAggregator
aggDuties map[aggKey]core.AggregatedAttestation
aggDuties map[aggKey]core.VersionedAggregatedAttestation
aggKeysBySlot map[uint64][]aggKey
aggQueries []aggQuery

Expand Down Expand Up @@ -195,10 +196,10 @@ func (db *MemDB) AwaitAttestation(ctx context.Context, slot uint64, commIdx uint
// AwaitAggAttestation blocks and returns the aggregated attestation for the slot
// and attestation when available.
func (db *MemDB) AwaitAggAttestation(ctx context.Context, slot uint64, attestationRoot eth2p0.Root,
) (*eth2p0.Attestation, error) {
) (*eth2spec.VersionedAttestation, error) {
cancel := make(chan struct{})
defer close(cancel)
response := make(chan core.AggregatedAttestation, 1) // Instance of one so resolving never blocks
response := make(chan core.VersionedAggregatedAttestation, 1) // Instance of one so resolving never blocks

db.mu.Lock()
db.aggQueries = append(db.aggQueries, aggQuery{
Expand All @@ -223,12 +224,12 @@ func (db *MemDB) AwaitAggAttestation(ctx context.Context, slot uint64, attestati
if err != nil {
return nil, err
}
aggAtt, ok := clone.(core.AggregatedAttestation)
aggAtt, ok := clone.(core.VersionedAggregatedAttestation)
if !ok {
return nil, errors.New("invalid aggregated attestation")
}

return &aggAtt.Attestation, nil
return &aggAtt.VersionedAttestation, nil
}
}

Expand Down Expand Up @@ -332,17 +333,21 @@ func (db *MemDB) storeAggAttestationUnsafe(unsignedData core.UnsignedData) error
return err
}

aggAtt, ok := cloned.(core.AggregatedAttestation)
aggAtt, ok := cloned.(core.VersionedAggregatedAttestation)
if !ok {
return errors.New("invalid unsigned aggregated attestation")
}

aggRoot, err := aggAtt.Attestation.Data.HashTreeRoot()
aggAttData, err := aggAtt.VersionedAttestation.Data()
if err != nil {
return err
}
aggRoot, err := aggAttData.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "hash aggregated attestation root")
}

slot := uint64(aggAtt.Attestation.Data.Slot)
slot := uint64(aggAttData.Slot)

// Store key and value for PubKeyByAttestation
key := aggKey{
Expand Down Expand Up @@ -606,7 +611,7 @@ type proQuery struct {
// aggQuery is a waiting aggQuery with a response channel.
type aggQuery struct {
Key aggKey
Response chan<- core.AggregatedAttestation
Response chan<- core.VersionedAggregatedAttestation
Cancel <-chan struct{}
}

Expand Down
16 changes: 10 additions & 6 deletions core/dutydb/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,25 +220,29 @@ func TestMemDBAggregator(t *testing.T) {
const queries = 3

for range queries {
agg := testutil.RandomAttestation()
att := testutil.RandomDenebVersionedAttestation()
aggAttest, err := core.NewVersionedAggregatedAttestation(att)
require.NoError(t, err)
set := core.UnsignedDataSet{
testutil.RandomCorePubKey(t): core.NewAggregatedAttestation(agg),
testutil.RandomCorePubKey(t): aggAttest,
}
slot := uint64(agg.Data.Slot)
attData, err := att.Data()
require.NoError(t, err)
slot := uint64(attData.Slot)

errCh := make(chan error, 1)
go func() {
err := db.Store(ctx, core.NewAggregatorDuty(slot), set)
errCh <- err
}()

root, err := agg.Data.HashTreeRoot()
root, err := attData.HashTreeRoot()
require.NoError(t, err)
err = <-errCh
require.NoError(t, err)
resp, err := db.AwaitAggAttestation(ctx, slot, root)
require.NoError(t, err)
require.Equal(t, agg, resp)
require.Equal(t, att, resp)
}
}

Expand Down Expand Up @@ -333,7 +337,7 @@ func TestMemDBSyncContribution(t *testing.T) {
)

err := db.Store(ctx, duty, core.UnsignedDataSet{
testutil.RandomCorePubKey(t): core.NewAggregatedAttestation(testutil.RandomAttestation()),
testutil.RandomCorePubKey(t): testutil.RandomDenebCoreVersionedAggregateAttestation(),
})
require.Error(t, err)
require.ErrorContains(t, err, "invalid unsigned sync committee contribution")
Expand Down
13 changes: 9 additions & 4 deletions core/eth2signeddata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var (
_ Eth2SignedData = VersionedSignedValidatorRegistration{}
_ Eth2SignedData = SignedRandao{}
_ Eth2SignedData = BeaconCommitteeSelection{}
_ Eth2SignedData = SignedAggregateAndProof{}
_ Eth2SignedData = VersionedSignedAggregateAndProof{}
_ Eth2SignedData = SignedSyncMessage{}
_ Eth2SignedData = SignedSyncContributionAndProof{}
_ Eth2SignedData = SyncCommitteeSelection{}
Expand Down Expand Up @@ -115,12 +115,17 @@ func (s BeaconCommitteeSelection) Epoch(ctx context.Context, eth2Cl eth2wrap.Cli

// Implement Eth2SignedData for SignedAggregateAndProof.

func (SignedAggregateAndProof) DomainName() signing.DomainName {
func (VersionedSignedAggregateAndProof) DomainName() signing.DomainName {
return signing.DomainAggregateAndProof
}

func (s SignedAggregateAndProof) Epoch(ctx context.Context, eth2Cl eth2wrap.Client) (eth2p0.Epoch, error) {
return eth2util.EpochFromSlot(ctx, eth2Cl, s.Message.Aggregate.Data.Slot)
func (ap VersionedSignedAggregateAndProof) Epoch(ctx context.Context, eth2Cl eth2wrap.Client) (eth2p0.Epoch, error) {
slot, err := ap.Slot()
if err != nil {
return 0, err
}

return eth2util.EpochFromSlot(ctx, eth2Cl, slot)
}

// Implement Eth2SignedData for SignedSyncMessage.
Expand Down
10 changes: 7 additions & 3 deletions core/eth2signeddata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"testing"

eth2spec "github.com/attestantio/go-eth2-client/spec"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -48,9 +49,12 @@ func TestVerifyEth2SignedData(t *testing.T) {
},
{
name: "verify attestation aggregate and proof",
data: core.SignedAggregateAndProof{
SignedAggregateAndProof: eth2p0.SignedAggregateAndProof{
Message: testutil.RandomAggregateAndProof(),
data: core.VersionedSignedAggregateAndProof{
VersionedSignedAggregateAndProof: eth2spec.VersionedSignedAggregateAndProof{
Version: eth2spec.DataVersionDeneb,
Deneb: &eth2p0.SignedAggregateAndProof{
Message: testutil.RandomAggregateAndProof(),
},
},
},
},
Expand Down
16 changes: 11 additions & 5 deletions core/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (f *Fetcher) fetchAttesterData(ctx context.Context, slot uint64, defSet cor
// fetchAggregatorData fetches the attestation aggregation data.
func (f *Fetcher) fetchAggregatorData(ctx context.Context, slot uint64, defSet core.DutyDefinitionSet) (core.UnsignedDataSet, error) {
// We may have multiple aggregators in the same committee, use the same aggregated attestation in that case.
aggAttByCommIdx := make(map[eth2p0.CommitteeIndex]*eth2p0.Attestation)
aggAttByCommIdx := make(map[eth2p0.CommitteeIndex]*eth2spec.VersionedAttestation)

resp := make(core.UnsignedDataSet)
for pubkey, dutyDef := range defSet {
Expand Down Expand Up @@ -188,8 +188,8 @@ func (f *Fetcher) fetchAggregatorData(ctx context.Context, slot uint64, defSet c

aggAtt, ok := aggAttByCommIdx[attDef.CommitteeIndex]
if ok {
resp[pubkey] = core.AggregatedAttestation{
Attestation: *aggAtt,
resp[pubkey] = core.VersionedAggregatedAttestation{
VersionedAttestation: *aggAtt,
}

// Skips querying aggregate attestation for aggregators of same committee.
Expand Down Expand Up @@ -226,8 +226,8 @@ func (f *Fetcher) fetchAggregatorData(ctx context.Context, slot uint64, defSet c

aggAttByCommIdx[attDef.CommitteeIndex] = aggAtt

resp[pubkey] = core.AggregatedAttestation{
Attestation: *aggAtt,
resp[pubkey] = core.VersionedAggregatedAttestation{
VersionedAttestation: *aggAtt,
}
}

Expand Down Expand Up @@ -374,6 +374,12 @@ func verifyFeeRecipient(ctx context.Context, proposal *eth2api.VersionedProposal
} else {
actualAddr = fmt.Sprintf("%#x", proposal.Deneb.Block.Body.ExecutionPayload.FeeRecipient)
}
case eth2spec.DataVersionElectra:
if proposal.Blinded {
actualAddr = fmt.Sprintf("%#x", proposal.ElectraBlinded.Body.ExecutionPayloadHeader.FeeRecipient)
} else {
actualAddr = fmt.Sprintf("%#x", proposal.Electra.Block.Body.ExecutionPayload.FeeRecipient)
}
default:
return
}
Expand Down
Loading

0 comments on commit 21907ff

Please sign in to comment.