From ecc8aa5829615d0ab30378f3fe485bb91e0461c9 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 29 Nov 2024 19:49:57 +0100 Subject: [PATCH 1/2] Refactor static subnet subscriptions. Before this commit, we had 2 functions: - `subscribeStaticWithSubnets`, and - `subscribeStaticWithSyncSubnets`. These two functions were very similar. This commit merge these two functions into one. --- beacon-chain/sync/subscriber.go | 184 +++++++++------------------ beacon-chain/sync/subscriber_test.go | 6 +- 2 files changed, 66 insertions(+), 124 deletions(-) diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index baca434644cc..65535749f166 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -55,52 +55,30 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag // Register PubSub subscribers func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { - s.subscribe( - p2p.BlockSubnetTopicFormat, - s.validateBeaconBlockPubSub, - s.beaconBlockSubscriber, - digest, - ) - s.subscribe( - p2p.AggregateAndProofSubnetTopicFormat, - s.validateAggregateAndProof, - s.beaconAggregateProofSubscriber, - digest, - ) - s.subscribe( - p2p.ExitSubnetTopicFormat, - s.validateVoluntaryExit, - s.voluntaryExitSubscriber, - digest, - ) - s.subscribe( - p2p.ProposerSlashingSubnetTopicFormat, - s.validateProposerSlashing, - s.proposerSlashingSubscriber, - digest, - ) - s.subscribe( - p2p.AttesterSlashingSubnetTopicFormat, - s.validateAttesterSlashing, - s.attesterSlashingSubscriber, - digest, - ) + s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, digest) + s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, digest) + s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, digest) + s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, digest) + s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, digest) + if flags.Get().SubscribeToAllSubnets { s.subscribeStaticWithSubnets( p2p.AttestationSubnetTopicFormat, - s.validateCommitteeIndexBeaconAttestation, /* validator */ - s.committeeIndexBeaconAttestationSubscriber, /* message handler */ + s.validateCommitteeIndexBeaconAttestation, + s.committeeIndexBeaconAttestationSubscriber, digest, + "Attestation", params.BeaconConfig().AttestationSubnetCount, ) } else { s.subscribeDynamicWithSubnets( p2p.AttestationSubnetTopicFormat, - s.validateCommitteeIndexBeaconAttestation, /* validator */ - s.committeeIndexBeaconAttestationSubscriber, /* message handler */ + s.validateCommitteeIndexBeaconAttestation, + s.committeeIndexBeaconAttestationSubscriber, digest, ) } + // Altair Fork Version if epoch >= params.BeaconConfig().AltairForkEpoch { s.subscribe( @@ -109,12 +87,15 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { s.syncContributionAndProofSubscriber, digest, ) + if flags.Get().SubscribeToAllSubnets { - s.subscribeStaticWithSyncSubnets( + s.subscribeStaticWithSubnets( p2p.SyncCommitteeSubnetTopicFormat, - s.validateSyncCommitteeMessage, /* validator */ - s.syncCommitteeMessageSubscriber, /* message handler */ + s.validateSyncCommitteeMessage, + s.syncCommitteeMessageSubscriber, digest, + "Sync committee", + params.BeaconConfig().SyncCommitteeSubnetCount, ) } else { s.subscribeDynamicWithSyncSubnets( @@ -143,6 +124,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { s.validateBlob, /* validator */ s.blobSubscriber, /* message handler */ digest, + "Blob sidecar", params.BeaconConfig().BlobsidecarSubnetCount, ) } @@ -326,23 +308,43 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p // subscribe to a static subnet with the given topic and index. A given validator and subscription handler is // used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding. -func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte, subnetCount uint64) { - genRoot := s.cfg.clock.GenesisValidatorsRoot() - _, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) +func (s *Service) subscribeStaticWithSubnets( + topic string, + validator wrappedVal, + handle subHandler, + digest [4]byte, + humanDescription string, + subnetCount uint64, +) { + // Retrieve the genesis validators root. + genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() + + // Retrieve the epoch of the fork corresponding to the digest. + _, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:]) if err != nil { // Impossible condition as it would mean digest does not exist. panic(err) } - base := p2p.GossipTopicMappings(topic, e) + + // Retrieve the base protobuf message. + base := p2p.GossipTopicMappings(topic, epoch) if base == nil { // Impossible condition as it would mean topic does not exist. panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) } - for i := uint64(0); i < subnetCount; i++ { - s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle) + + // Subscribe to all subnets. + for i := range subnetCount { + fullTopic := s.addDigestAndIndexToTopic(topic, digest, i) + s.subscribeWithBase(fullTopic, validator, handle) } - genesis := s.cfg.clock.GenesisTime() - ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) + + // Define a ticker ticking every slot. + genesisTime := s.cfg.clock.GenesisTime() + secondsPerSlot := params.BeaconConfig().SecondsPerSlot + ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot) + + minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet go func() { for { @@ -350,35 +352,38 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, case <-s.ctx.Done(): ticker.Done() return + case <-ticker.C(): if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { continue } - valid, err := isDigestValid(digest, genesis, genRoot) + + valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot) if err != nil { log.Error(err) continue } + if !valid { - log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) + message := fmt.Sprintf("%s subnets with this digest are no longer valid, unsubscribing from all of them.", humanDescription) + log.WithField("digest", fmt.Sprintf("%#x", digest)).Warning(message) + // Unsubscribes from all our current subnets. - for i := uint64(0); i < subnetCount; i++ { + for i := range subnetCount { fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix() s.unSubscribeFromTopic(fullTopic) } + ticker.Done() return } - // Check every slot that there are enough peers - for i := uint64(0); i < subnetCount; i++ { - if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) { - _, err := s.cfg.p2p.FindPeersWithSubnet( - s.ctx, - s.addDigestAndIndexToTopic(topic, digest, i), - i, - flags.Get().MinimumPeersPerSubnet, - ) - if err != nil { + + // Check that all subnets have enough peers. + for i := range subnetCount { + fullTopic := s.addDigestAndIndexToTopic(topic, digest, i) + + if !s.enoughPeersAreConnected(fullTopic) { + if _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, fullTopic, i, minimumPeersPerSubnet); err != nil { log.WithError(err).Debug("Could not search for peers") return } @@ -501,69 +506,6 @@ func (s *Service) subscribeAggregatorSubnet( } } -// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is -// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding. -func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) { - genRoot := s.cfg.clock.GenesisValidatorsRoot() - _, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) - if err != nil { - panic(err) - } - base := p2p.GossipTopicMappings(topic, e) - if base == nil { - panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) - } - for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { - s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle) - } - genesis := s.cfg.clock.GenesisTime() - ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) - - go func() { - for { - select { - case <-s.ctx.Done(): - ticker.Done() - return - case <-ticker.C(): - if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { - continue - } - valid, err := isDigestValid(digest, genesis, genRoot) - if err != nil { - log.Error(err) - continue - } - if !valid { - log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) - // Unsubscribes from all our current subnets. - for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { - fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix() - s.unSubscribeFromTopic(fullTopic) - } - ticker.Done() - return - } - // Check every slot that there are enough peers - for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { - if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) { - _, err := s.cfg.p2p.FindPeersWithSubnet( - s.ctx, - s.addDigestAndIndexToTopic(topic, digest, i), - i, - flags.Get().MinimumPeersPerSubnet, - ) - if err != nil { - log.WithError(err).Debug("Could not search for peers") - return - } - } - } - } - } - }() -} - // subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed. // Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise. func (s *Service) subscribeToSyncSubnets( diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index d0bae7fe4aa9..f8eeac803c5d 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -335,7 +335,7 @@ func TestStaticSubnets(t *testing.T) { r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error { // no-op return nil - }, d, params.BeaconConfig().AttestationSubnetCount) + }, d, "Attestation", params.BeaconConfig().AttestationSubnetCount) topics := r.cfg.p2p.PubSub().GetTopics() if uint64(len(topics)) != params.BeaconConfig().AttestationSubnetCount { t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconConfig().AttestationSubnetCount, len(topics)) @@ -565,7 +565,7 @@ func TestSubscribeWithSyncSubnets_StaticOK(t *testing.T) { defer cache.SyncSubnetIDs.EmptyAllCaches() digest, err := r.currentForkDigest() assert.NoError(t, err) - r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) + r.subscribeStaticWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, "Sync committee", params.BeaconConfig().SyncCommitteeSubnetCount) assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics())) cancel() } @@ -645,7 +645,7 @@ func TestSubscribeWithSyncSubnets_StaticSwitchFork(t *testing.T) { genRoot := r.cfg.clock.GenesisValidatorsRoot() digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:]) assert.NoError(t, err) - r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) + r.subscribeStaticWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, "Sync committee", params.BeaconConfig().SyncCommitteeSubnetCount) assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics())) // Expect that all old topics will be unsubscribed. From 9acc2c6601840a7e2f7c452dd52a25366a890982 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Sat, 30 Nov 2024 16:21:53 +0100 Subject: [PATCH 2/2] Refactor dynamic subnet subscriptions. --- CHANGELOG.md | 1 + beacon-chain/sync/subscriber.go | 287 +++++++++++++-------------- beacon-chain/sync/subscriber_test.go | 8 +- 3 files changed, 147 insertions(+), 149 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 882e42f8ae9f..70a437535c28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve - Modified `ListAttestationsV2`, `GetAttesterSlashingsV2` and `GetAggregateAttestationV2` endpoints to use slot to determine fork version. - Improvements to HTTP response handling. [pr](https://github.com/prysmaticlabs/prysm/pull/14673) - Updated `Blobs` endpoint to return additional metadata fields. +- Refactor static and dynamic subnets subscription. ### Deprecated diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 65535749f166..92b5817fb1d5 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -76,6 +76,8 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { s.validateCommitteeIndexBeaconAttestation, s.committeeIndexBeaconAttestationSubscriber, digest, + params.BeaconConfig().MaxCommitteesPerSlot, + s.subscribeToAttestationsSubnetsDynamic, ) } @@ -98,11 +100,13 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { params.BeaconConfig().SyncCommitteeSubnetCount, ) } else { - s.subscribeDynamicWithSyncSubnets( + s.subscribeDynamicWithSubnets( p2p.SyncCommitteeSubnetTopicFormat, - s.validateSyncCommitteeMessage, /* validator */ - s.syncCommitteeMessageSubscriber, /* message handler */ + s.validateSyncCommitteeMessage, + s.syncCommitteeMessageSubscriber, digest, + params.BeaconConfig().SyncCommitteeSubnetCount, + s.subscribeToSyncSubnetsDynamic, ) } } @@ -333,12 +337,6 @@ func (s *Service) subscribeStaticWithSubnets( panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) } - // Subscribe to all subnets. - for i := range subnetCount { - fullTopic := s.addDigestAndIndexToTopic(topic, digest, i) - s.subscribeWithBase(fullTopic, validator, handle) - } - // Define a ticker ticking every slot. genesisTime := s.cfg.clock.GenesisTime() secondsPerSlot := params.BeaconConfig().SecondsPerSlot @@ -346,13 +344,15 @@ func (s *Service) subscribeStaticWithSubnets( minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet + // Subscribe to all subnets. + for i := range subnetCount { + fullTopic := s.addDigestAndIndexToTopic(topic, digest, i) + s.subscribeWithBase(fullTopic, validator, handle) + } + go func() { for { select { - case <-s.ctx.Done(): - ticker.Done() - return - case <-ticker.C(): if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { continue @@ -389,126 +389,132 @@ func (s *Service) subscribeStaticWithSubnets( } } } + case <-s.ctx.Done(): + ticker.Done() + return } } }() } -// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible -// string for the topic name and the list of subnets for subscribed topics that should be -// maintained. +type specificSubscribeFunc func( + topic string, + digest [4]byte, + genesisValidatorsRoot [fieldparams.RootLength]byte, + genesisTime time.Time, + subscriptions map[uint64]*pubsub.Subscription, + currentSlot primitives.Slot, + validate wrappedVal, + handle subHandler, +) bool + +// subscribeDynamicWithSubnets subscribes to a dynamically changing list of subnets. func (s *Service) subscribeDynamicWithSubnets( topicFormat string, validate wrappedVal, handle subHandler, digest [4]byte, + subnetCount uint64, + specificSubscribe specificSubscribeFunc, ) { - genRoot := s.cfg.clock.GenesisValidatorsRoot() - _, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) + // Initialize the subscriptions map. + subscriptions := make(map[uint64]*pubsub.Subscription, subnetCount) + + // Retrieve the genesis validators root. + genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() + + // Retrieve the epoch of the fork corresponding to the digest. + _, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:]) if err != nil { - // Impossible condition as it would mean digest does not exist. panic(err) } - base := p2p.GossipTopicMappings(topicFormat, e) + + // Retrieve the base protobuf message. + base := p2p.GossipTopicMappings(topicFormat, epoch) if base == nil { panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) } - subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot) - genesis := s.cfg.clock.GenesisTime() - ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) + + // Retrieve the genesis time. + genesisTime := s.cfg.clock.GenesisTime() + + // Define a ticker ticking every slot. + secondsPerSlot := params.BeaconConfig().SecondsPerSlot + ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot) + + // Retrieve the current slot. + currentSlot := s.cfg.clock.CurrentSlot() go func() { + // Subscribe to the sync subnets. + specificSubscribe(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) + for { select { - case <-s.ctx.Done(): - ticker.Done() - return case currentSlot := <-ticker.C(): - if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { - continue - } - valid, err := isDigestValid(digest, genesis, genRoot) - if err != nil { - log.Error(err) - continue - } - if !valid { - log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) - // Unsubscribes from all our current subnets. - s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) + isDigestValid := specificSubscribe(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) + + // Stop the ticker if the digest is not valid. Likely to happen after a hard fork. + if !isDigestValid { ticker.Done() return } - wantedSubs := s.retrievePersistentSubs(currentSlot) - s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) - for _, idx := range wantedSubs { - s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle) - } - // find desired subs for attesters - attesterSubs := s.attesterSubnetIndices(currentSlot) - for _, idx := range attesterSubs { - s.lookupAttesterSubnets(digest, idx) - } + case <-s.ctx.Done(): + ticker.Done() + return } } }() } -// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are -// not in the list of wanted subnets. -// TODO: Rename this functions as it does not only revalidate subscriptions. -func (s *Service) reValidateSubscriptions( - subscriptions map[uint64]*pubsub.Subscription, - wantedSubs []uint64, +func (s *Service) subscribeToAttestationsSubnetsDynamic( topicFormat string, digest [4]byte, -) { - for k, v := range subscriptions { - var wanted bool - for _, idx := range wantedSubs { - if k == idx { - wanted = true - break - } - } - - if !wanted && v != nil { - v.Cancel() - fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.p2p.Encoding().ProtocolSuffix() - s.unSubscribeFromTopic(fullTopic) - delete(subscriptions, k) - } - } -} - -// subscribe missing subnets for our aggregators. -func (s *Service) subscribeAggregatorSubnet( + genesisValidatorsRoot [fieldparams.RootLength]byte, + genesisTime time.Time, subscriptions map[uint64]*pubsub.Subscription, - idx uint64, - digest [4]byte, + currentSlot primitives.Slot, validate wrappedVal, handle subHandler, -) { - // do not subscribe if we have no peers in the same - // subnet - topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] - subnetTopic := fmt.Sprintf(topic, digest, idx) - // check if subscription exists and if not subscribe the relevant subnet. - if _, exists := subscriptions[idx]; !exists { - subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) +) bool { + // Do not subscribe if not synced. + if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { + return true } - if !s.enoughPeersAreConnected(subnetTopic) { - _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) - if err != nil { - log.WithError(err).Debug("Could not search for peers") - } + + // Do not subscribe is the digest is not valid. + valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot) + if err != nil { + log.Error(err) + return true + } + + // Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork. + if !valid { + const message = "Attestation subnets with this digest are no longer valid, unsubscribing from all of them." + log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn(message) + s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) + return false } + + wantedSubnetsIndex := s.retrievePersistentSubs(currentSlot) + s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest) + + for _, index := range wantedSubnetsIndex { + s.subscribeAggregatorSubnet(subscriptions, index, digest, validate, handle) + } + + // Find desired subnets for attesters. + attesterSubnets := s.attesterSubnetIndices(currentSlot) + for _, index := range attesterSubnets { + s.lookupAttesterSubnets(digest, index) + } + + return true } -// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed. -// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise. -func (s *Service) subscribeToSyncSubnets( +func (s *Service) subscribeToSyncSubnetsDynamic( topicFormat string, digest [4]byte, genesisValidatorsRoot [fieldparams.RootLength]byte, @@ -535,7 +541,8 @@ func (s *Service) subscribeToSyncSubnets( // Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork. if !valid { - log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.") + const message = "Sync subnets with this digest are no longer valid, unsubscribing from all of them." + log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn(message) s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) return false } @@ -582,65 +589,55 @@ func (s *Service) subscribeToSyncSubnets( return true } -// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets. -func (s *Service) subscribeDynamicWithSyncSubnets( +// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are +// not in the list of wanted subnets. +// TODO: Rename this functions as it does not only revalidate subscriptions. +func (s *Service) reValidateSubscriptions( + subscriptions map[uint64]*pubsub.Subscription, + wantedSubs []uint64, topicFormat string, - validate wrappedVal, - handle subHandler, digest [4]byte, ) { - // Retrieve the number of committee subnets we need to subscribe to. - syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount - - // Initialize the subscriptions map. - subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount) - - // Retrieve the genesis validators root. - genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() + for k, v := range subscriptions { + var wanted bool + for _, idx := range wantedSubs { + if k == idx { + wanted = true + break + } + } - // Retrieve the epoch of the fork corresponding to the digest. - _, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:]) - if err != nil { - panic(err) + if !wanted && v != nil { + v.Cancel() + fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.p2p.Encoding().ProtocolSuffix() + s.unSubscribeFromTopic(fullTopic) + delete(subscriptions, k) + } } +} - // Retrieve the base protobuf message. - base := p2p.GossipTopicMappings(topicFormat, epoch) - if base == nil { - panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) +// subscribe missing subnets for our aggregators. +func (s *Service) subscribeAggregatorSubnet( + subscriptions map[uint64]*pubsub.Subscription, + idx uint64, + digest [4]byte, + validate wrappedVal, + handle subHandler, +) { + // do not subscribe if we have no peers in the same + // subnet + topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] + subnetTopic := fmt.Sprintf(topic, digest, idx) + // check if subscription exists and if not subscribe the relevant subnet. + if _, exists := subscriptions[idx]; !exists { + subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) } - - // Retrieve the genesis time. - genesisTime := s.cfg.clock.GenesisTime() - - // Define a ticker ticking every slot. - secondsPerSlot := params.BeaconConfig().SecondsPerSlot - ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot) - - // Retrieve the current slot. - currentSlot := s.cfg.clock.CurrentSlot() - - go func() { - // Subscribe to the sync subnets. - s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) - - for { - select { - case currentSlot := <-ticker.C(): - isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) - - // Stop the ticker if the digest is not valid. Likely to happen after a hard fork. - if !isDigestValid { - ticker.Done() - return - } - - case <-s.ctx.Done(): - ticker.Done() - return - } + if !s.enoughPeersAreConnected(subnetTopic) { + _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) + if err != nil { + log.WithError(err).Debug("Could not search for peers") } - }() + } } // lookup peers for attester specific subnets. diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index f8eeac803c5d..f4794c8867c2 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -496,6 +496,7 @@ func TestFilterSubnetPeers(t *testing.T) { chainStarted: abool.New(), subHandler: newSubTopicHandler(), } + // Empty cache at the end of the test. defer cache.SubnetIDs.EmptyAllCaches() digest, err := r.currentForkDigest() @@ -511,8 +512,7 @@ func TestFilterSubnetPeers(t *testing.T) { p2 := createPeer(t, subnet10, subnet20) p3 := createPeer(t) - // Connect to all - // peers. + // Connect to all peers. p.Connect(p1) p.Connect(p2) p.Connect(p3) @@ -600,7 +600,7 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) { cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second) digest, err := r.currentForkDigest() assert.NoError(t, err) - r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) + r.subscribeDynamicWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, params.BeaconConfig().SyncCommitteeSubnetCount, r.subscribeToSyncSubnetsDynamic) time.Sleep(2 * time.Second) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) topicMap := map[string]bool{} @@ -689,7 +689,7 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) { digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:]) assert.NoError(t, err) - r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) + r.subscribeDynamicWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, params.BeaconConfig().SyncCommitteeSubnetCount, r.subscribeToSyncSubnetsDynamic) time.Sleep(2 * time.Second) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) topicMap := map[string]bool{}