Skip to content

Commit

Permalink
Refactor dynamic subnet subscriptions.
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Dec 1, 2024
1 parent ecc8aa5 commit 9acc2c6
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 149 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Modified `ListAttestationsV2`, `GetAttesterSlashingsV2` and `GetAggregateAttestationV2` endpoints to use slot to determine fork version.
- Improvements to HTTP response handling. [pr](https://github.com/prysmaticlabs/prysm/pull/14673)
- Updated `Blobs` endpoint to return additional metadata fields.
- Refactor static and dynamic subnets subscription.

### Deprecated

Expand Down
287 changes: 142 additions & 145 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.validateCommitteeIndexBeaconAttestation,
s.committeeIndexBeaconAttestationSubscriber,
digest,
params.BeaconConfig().MaxCommitteesPerSlot,
s.subscribeToAttestationsSubnetsDynamic,
)
}

Expand All @@ -98,11 +100,13 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
params.BeaconConfig().SyncCommitteeSubnetCount,
)
} else {
s.subscribeDynamicWithSyncSubnets(
s.subscribeDynamicWithSubnets(
p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage, /* validator */
s.syncCommitteeMessageSubscriber, /* message handler */
s.validateSyncCommitteeMessage,
s.syncCommitteeMessageSubscriber,
digest,
params.BeaconConfig().SyncCommitteeSubnetCount,
s.subscribeToSyncSubnetsDynamic,
)
}
}
Expand Down Expand Up @@ -333,26 +337,22 @@ func (s *Service) subscribeStaticWithSubnets(
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
}

// Subscribe to all subnets.
for i := range subnetCount {
fullTopic := s.addDigestAndIndexToTopic(topic, digest, i)
s.subscribeWithBase(fullTopic, validator, handle)
}

// Define a ticker ticking every slot.
genesisTime := s.cfg.clock.GenesisTime()
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)

minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet

// Subscribe to all subnets.
for i := range subnetCount {
fullTopic := s.addDigestAndIndexToTopic(topic, digest, i)
s.subscribeWithBase(fullTopic, validator, handle)
}

go func() {
for {
select {
case <-s.ctx.Done():
ticker.Done()
return

case <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
Expand Down Expand Up @@ -389,126 +389,132 @@ func (s *Service) subscribeStaticWithSubnets(
}
}
}
case <-s.ctx.Done():
ticker.Done()
return
}
}
}()
}

// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible
// string for the topic name and the list of subnets for subscribed topics that should be
// maintained.
type specificSubscribeFunc func(
topic string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
) bool

// subscribeDynamicWithSubnets subscribes to a dynamically changing list of subnets.
func (s *Service) subscribeDynamicWithSubnets(
topicFormat string,
validate wrappedVal,
handle subHandler,
digest [4]byte,
subnetCount uint64,
specificSubscribe specificSubscribeFunc,
) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
// Initialize the subscriptions map.
subscriptions := make(map[uint64]*pubsub.Subscription, subnetCount)

// Retrieve the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()

// Retrieve the epoch of the fork corresponding to the digest.
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
if err != nil {
// Impossible condition as it would mean digest does not exist.
panic(err)
}
base := p2p.GossipTopicMappings(topicFormat, e)

// Retrieve the base protobuf message.
base := p2p.GossipTopicMappings(topicFormat, epoch)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
}
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot)
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)

// Retrieve the genesis time.
genesisTime := s.cfg.clock.GenesisTime()

// Define a ticker ticking every slot.
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)

// Retrieve the current slot.
currentSlot := s.cfg.clock.CurrentSlot()

go func() {
// Subscribe to the sync subnets.
specificSubscribe(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)

for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case currentSlot := <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
// Unsubscribes from all our current subnets.
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
isDigestValid := specificSubscribe(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)

// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
if !isDigestValid {
ticker.Done()
return
}
wantedSubs := s.retrievePersistentSubs(currentSlot)
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)

for _, idx := range wantedSubs {
s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle)
}
// find desired subs for attesters
attesterSubs := s.attesterSubnetIndices(currentSlot)
for _, idx := range attesterSubs {
s.lookupAttesterSubnets(digest, idx)
}
case <-s.ctx.Done():
ticker.Done()
return
}
}
}()
}

// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are
// not in the list of wanted subnets.
// TODO: Rename this functions as it does not only revalidate subscriptions.
func (s *Service) reValidateSubscriptions(
subscriptions map[uint64]*pubsub.Subscription,
wantedSubs []uint64,
func (s *Service) subscribeToAttestationsSubnetsDynamic(
topicFormat string,
digest [4]byte,
) {
for k, v := range subscriptions {
var wanted bool
for _, idx := range wantedSubs {
if k == idx {
wanted = true
break
}
}

if !wanted && v != nil {
v.Cancel()
fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.p2p.Encoding().ProtocolSuffix()
s.unSubscribeFromTopic(fullTopic)
delete(subscriptions, k)
}
}
}

// subscribe missing subnets for our aggregators.
func (s *Service) subscribeAggregatorSubnet(
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
idx uint64,
digest [4]byte,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
) {
// do not subscribe if we have no peers in the same
// subnet
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
// check if subscription exists and if not subscribe the relevant subnet.
if _, exists := subscriptions[idx]; !exists {
subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle)
) bool {
// Do not subscribe if not synced.
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
return true
}
if !s.enoughPeersAreConnected(subnetTopic) {
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}

// Do not subscribe is the digest is not valid.
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil {
log.Error(err)
return true
}

// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
if !valid {
const message = "Attestation subnets with this digest are no longer valid, unsubscribing from all of them."
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn(message)
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
return false
}

wantedSubnetsIndex := s.retrievePersistentSubs(currentSlot)
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)

for _, index := range wantedSubnetsIndex {
s.subscribeAggregatorSubnet(subscriptions, index, digest, validate, handle)
}

// Find desired subnets for attesters.
attesterSubnets := s.attesterSubnetIndices(currentSlot)
for _, index := range attesterSubnets {
s.lookupAttesterSubnets(digest, index)
}

return true
}

// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed.
// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise.
func (s *Service) subscribeToSyncSubnets(
func (s *Service) subscribeToSyncSubnetsDynamic(
topicFormat string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
Expand All @@ -535,7 +541,8 @@ func (s *Service) subscribeToSyncSubnets(

// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
if !valid {
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.")
const message = "Sync subnets with this digest are no longer valid, unsubscribing from all of them."
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn(message)
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
return false
}
Expand Down Expand Up @@ -582,65 +589,55 @@ func (s *Service) subscribeToSyncSubnets(
return true
}

// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets.
func (s *Service) subscribeDynamicWithSyncSubnets(
// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are
// not in the list of wanted subnets.
// TODO: Rename this functions as it does not only revalidate subscriptions.
func (s *Service) reValidateSubscriptions(
subscriptions map[uint64]*pubsub.Subscription,
wantedSubs []uint64,
topicFormat string,
validate wrappedVal,
handle subHandler,
digest [4]byte,
) {
// Retrieve the number of committee subnets we need to subscribe to.
syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount

// Initialize the subscriptions map.
subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount)

// Retrieve the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
for k, v := range subscriptions {
var wanted bool
for _, idx := range wantedSubs {
if k == idx {
wanted = true
break
}
}

// Retrieve the epoch of the fork corresponding to the digest.
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
if err != nil {
panic(err)
if !wanted && v != nil {
v.Cancel()
fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.p2p.Encoding().ProtocolSuffix()
s.unSubscribeFromTopic(fullTopic)
delete(subscriptions, k)
}
}
}

// Retrieve the base protobuf message.
base := p2p.GossipTopicMappings(topicFormat, epoch)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
// subscribe missing subnets for our aggregators.
func (s *Service) subscribeAggregatorSubnet(
subscriptions map[uint64]*pubsub.Subscription,
idx uint64,
digest [4]byte,
validate wrappedVal,
handle subHandler,
) {
// do not subscribe if we have no peers in the same
// subnet
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
// check if subscription exists and if not subscribe the relevant subnet.
if _, exists := subscriptions[idx]; !exists {
subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle)
}

// Retrieve the genesis time.
genesisTime := s.cfg.clock.GenesisTime()

// Define a ticker ticking every slot.
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)

// Retrieve the current slot.
currentSlot := s.cfg.clock.CurrentSlot()

go func() {
// Subscribe to the sync subnets.
s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)

for {
select {
case currentSlot := <-ticker.C():
isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)

// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
if !isDigestValid {
ticker.Done()
return
}

case <-s.ctx.Done():
ticker.Done()
return
}
if !s.enoughPeersAreConnected(subnetTopic) {
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}()
}
}

// lookup peers for attester specific subnets.
Expand Down
Loading

0 comments on commit 9acc2c6

Please sign in to comment.