Skip to content

Commit

Permalink
Merge branch 'develop' into fix/prune-deposit-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
james-prysm authored Feb 6, 2025
2 parents 02fec8f + 49405c3 commit c2abfb6
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 16 deletions.
21 changes: 21 additions & 0 deletions beacon-chain/sync/pending_attestations_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/prysmaticlabs/prysm/v5/async"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
Expand Down Expand Up @@ -202,6 +204,7 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) {
log.WithError(err).Debug("Could not retrieve active validator count")
return
}

// Broadcasting the signed attestation again once a node is able to process it.
var attToBroadcast ethpb.Att
if singleAtt != nil {
Expand All @@ -212,6 +215,24 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) {
if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, attToBroadcast), attToBroadcast); err != nil {
log.WithError(err).Debug("Could not broadcast")
}

// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
// of a received unaggregated attestation.
if singleAtt != nil {
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.SingleAttReceived,
Data: &operation.SingleAttReceivedData{
Attestation: singleAtt,
},
})
} else {
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
}
}
}

Expand Down
80 changes: 64 additions & 16 deletions beacon-chain/sync/pending_attestations_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sync

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -10,6 +11,8 @@ import (
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/async/abool"
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing"
dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
Expand Down Expand Up @@ -105,15 +108,22 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
Epoch: 0,
},
}

done := make(chan *feed.Event, 1)
defer close(done)
opn := mock.NewEventFeedWrapper()
sub := opn.Subscribe(done)
defer sub.Unsubscribe()
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
ctx: ctx,
cfg: &config{
p2p: p1,
beaconDB: db,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
attPool: attestations.NewPool(),
p2p: p1,
beaconDB: db,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
attPool: attestations.NewPool(),
attestationNotifier: &mock.SimpleNotifier{Feed: opn},
},
blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof),
seenUnAggregatedAttestationCache: lruwrpr.New(10),
Expand All @@ -128,12 +138,28 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
r.blkRootToPendingAtts[root] = []ethpb.SignedAggregateAttAndProof{&ethpb.SignedAggregateAttestationAndProof{Message: aggregateAndProof}}
require.NoError(t, r.processPendingAtts(context.Background()))

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case received := <-done:
// make sure a single att was sent
require.Equal(t, operation.UnaggregatedAttReceived, int(received.Type))
return
case <-ctx.Done():
return
}
}
}()
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)
assert.Equal(t, 1, len(atts), "Did not save unaggregated att")
assert.DeepEqual(t, att, atts[0], "Incorrect saved att")
assert.Equal(t, 0, len(r.cfg.attPool.AggregatedAttestations()), "Did save aggregated att")
require.LogsContain(t, hook, "Verified and saved pending attestations to pool")
wg.Wait()
cancel()
}

Expand Down Expand Up @@ -179,15 +205,21 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAttElectra(t *testing.T) {
Epoch: 0,
},
}
done := make(chan *feed.Event, 1)
defer close(done)
opn := mock.NewEventFeedWrapper()
sub := opn.Subscribe(done)
defer sub.Unsubscribe()
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
ctx: ctx,
cfg: &config{
p2p: p1,
beaconDB: db,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
attPool: attestations.NewPool(),
p2p: p1,
beaconDB: db,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
attPool: attestations.NewPool(),
attestationNotifier: &mock.SimpleNotifier{Feed: opn},
},
blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof),
seenUnAggregatedAttestationCache: lruwrpr.New(10),
Expand All @@ -201,13 +233,28 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAttElectra(t *testing.T) {

r.blkRootToPendingAtts[root] = []ethpb.SignedAggregateAttAndProof{&ethpb.SignedAggregateAttestationAndProofSingle{Message: aggregateAndProof}}
require.NoError(t, r.processPendingAtts(context.Background()))

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case received := <-done:
// make sure a single att was sent
require.Equal(t, operation.SingleAttReceived, int(received.Type))
return
case <-ctx.Done():
return
}
}
}()
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)
require.Equal(t, 1, len(atts), "Did not save unaggregated att")
assert.DeepEqual(t, att.ToAttestationElectra(committee), atts[0], "Incorrect saved att")
assert.Equal(t, 0, len(r.cfg.attPool.AggregatedAttestations()), "Did save aggregated att")
require.LogsContain(t, hook, "Verified and saved pending attestations to pool")
wg.Wait()
cancel()
}

Expand Down Expand Up @@ -304,11 +351,12 @@ func TestProcessPendingAtts_NoBroadcastWithBadSignature(t *testing.T) {
r = &Service{
ctx: ctx,
cfg: &config{
p2p: p1,
beaconDB: db,
chain: chain2,
clock: startup.NewClock(chain2.Genesis, chain2.ValidatorsRoot),
attPool: attestations.NewPool(),
p2p: p1,
beaconDB: db,
chain: chain2,
clock: startup.NewClock(chain2.Genesis, chain2.ValidatorsRoot),
attPool: attestations.NewPool(),
attestationNotifier: &mock.MockOperationNotifier{},
},
blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof),
seenUnAggregatedAttestationCache: lruwrpr.New(10),
Expand Down
3 changes: 3 additions & 0 deletions changelog/radek_pending-att-queue-notification.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Added

- Send pending att queue's attestations through the notification feed.

0 comments on commit c2abfb6

Please sign in to comment.