diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index bb6ba50a6493..33b07dc79f78 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -645,12 +645,12 @@ func uint64MapToSortedSlice(input map[uint64]bool) []uint64 { return output } -func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error { - if signed.Version() < version.Fulu { +func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, signedBlock interfaces.ReadOnlySignedBeaconBlock) error { + if signedBlock.Version() < version.Fulu { return nil } - block := signed.Block() + block := signedBlock.Block() if block == nil { return errors.New("invalid nil beacon block") } @@ -681,24 +681,19 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, si // All columns to sample need to be available for the block to be considered available. // https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#custody-sampling nodeID := s.cfg.P2P.NodeID() - custodyGroupSamplingSize := peerdas.CustodyGroupSamplingSize() - custodyGroups, err := peerdas.CustodyGroups(nodeID, custodyGroupSamplingSize) + // Get the custody group sampling size for the node. + custodyGroupSamplingSize := peerdas.CustodyGroupSamplingSize() + peerInfo, _, err := peerdas.Info(nodeID, custodyGroupSamplingSize) if err != nil { - return errors.Wrap(err, "custody groups") + return errors.Wrap(err, "peer info") } // Exit early if the node is not expected to custody any data columns. - if len(custodyGroups) == 0 { + if len(peerInfo.CustodyColumns) == 0 { return nil } - // Get the custody columns from the groups. - columnsMap, err := peerdas.CustodyColumns(custodyGroups) - if err != nil { - return errors.Wrap(err, "custody columns") - } - // Subscribe to newsly data columns stored in the database. rootIndexChan := make(chan filesystem.RootIndexPair) subscription := s.blobStorage.DataColumnFeed.Subscribe(rootIndexChan) @@ -722,7 +717,7 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, si } // Get a map of data column indices that are not currently available. - missingMap, err := missingDataColumns(s.blobStorage, root, columnsMap) + missingMap, err := missingDataColumns(s.blobStorage, root, peerInfo.CustodyColumns) if err != nil { return err } @@ -734,7 +729,7 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, si } // Log for DA checks that cross over into the next slot; helpful for debugging. - nextSlot := slots.BeginsAt(signed.Block().Slot()+1, s.genesisTime) + nextSlot := slots.BeginsAt(signedBlock.Block().Slot()+1, s.genesisTime) // Avoid logging if DA check is called after next slot start. if nextSlot.After(time.Now()) { nst := time.AfterFunc(time.Until(nextSlot), func() { @@ -750,10 +745,10 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, si ) numberOfColumns := params.BeaconConfig().NumberOfColumns - colMapCount := uint64(len(columnsMap)) + colMapCount := uint64(len(peerInfo.CustodyColumns)) if colMapCount < numberOfColumns { - expected = uint64MapToSortedSlice(columnsMap) + expected = uint64MapToSortedSlice(peerInfo.CustodyColumns) } if missingMapCount < numberOfColumns { @@ -761,7 +756,7 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, si } log.WithFields(logrus.Fields{ - "slot": signed.Block().Slot(), + "slot": signedBlock.Block().Slot(), "root": fmt.Sprintf("%#x", root), "columnsExpected": expected, "columnsWaiting": missing, diff --git a/beacon-chain/cache/BUILD.bazel b/beacon-chain/cache/BUILD.bazel index a30b7a5be22e..c548e83482ab 100644 --- a/beacon-chain/cache/BUILD.bazel +++ b/beacon-chain/cache/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "attestation_data.go", "balance_cache_key.go", "checkpoint_state.go", - "column_subnet_ids.go", "committee.go", "committee_disabled.go", # keep "committees.go", diff --git a/beacon-chain/cache/column_subnet_ids.go b/beacon-chain/cache/column_subnet_ids.go deleted file mode 100644 index 79de06f092a6..000000000000 --- a/beacon-chain/cache/column_subnet_ids.go +++ /dev/null @@ -1,70 +0,0 @@ -package cache - -import ( - "sync" - "time" - - "github.com/patrickmn/go-cache" - "github.com/prysmaticlabs/prysm/v5/config/params" -) - -type columnSubnetIDs struct { - colSubCache *cache.Cache - colSubLock sync.RWMutex -} - -// ColumnSubnetIDs for column subnet participants -var ColumnSubnetIDs = newColumnSubnetIDs() - -const columnKey = "columns" - -func newColumnSubnetIDs() *columnSubnetIDs { - secondsPerSlot := params.BeaconConfig().SecondsPerSlot - slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch - epochDuration := time.Duration(slotsPerEpoch.Mul(secondsPerSlot)) - - // Set the default duration of a column subnet subscription as the column expiry period. - minEpochsForDataColumnSidecarsRequest := time.Duration(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest) - subLength := epochDuration * minEpochsForDataColumnSidecarsRequest - - persistentCache := cache.New(subLength*time.Second, epochDuration*time.Second) - return &columnSubnetIDs{colSubCache: persistentCache} -} - -// GetColumnSubnets retrieves the data column subnets. -func (s *columnSubnetIDs) GetColumnSubnets() ([]uint64, bool, time.Time) { - s.colSubLock.RLock() - defer s.colSubLock.RUnlock() - - id, duration, ok := s.colSubCache.GetWithExpiration(columnKey) - if !ok { - return nil, false, time.Time{} - } - // Retrieve indices from the cache. - idxs, ok := id.([]uint64) - if !ok { - return nil, false, time.Time{} - } - - return idxs, ok, duration -} - -// AddColumnSubnets adds the relevant data column subnets. -func (s *columnSubnetIDs) AddColumnSubnets(colIdx []uint64) { - s.colSubLock.Lock() - defer s.colSubLock.Unlock() - - s.colSubCache.Set(columnKey, colIdx, 0) -} - -// EmptyAllCaches empties out all the related caches and flushes any stored -// entries on them. This should only ever be used for testing, in normal -// production, handling of the relevant subnets for each role is done -// separately. -func (s *columnSubnetIDs) EmptyAllCaches() { - // Clear the cache. - s.colSubLock.Lock() - defer s.colSubLock.Unlock() - - s.colSubCache.Flush() -} diff --git a/beacon-chain/core/peerdas/BUILD.bazel b/beacon-chain/core/peerdas/BUILD.bazel index 48c20bc1e726..0e46fdc1f12a 100644 --- a/beacon-chain/core/peerdas/BUILD.bazel +++ b/beacon-chain/core/peerdas/BUILD.bazel @@ -3,9 +3,12 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ - "helpers.go", - "log.go", + "das_core.go", + "info.go", "metrics.go", + "p2p_interface.go", + "peer_sampling.go", + "reconstruction.go", ], importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas", visibility = ["//visibility:public"], @@ -21,18 +24,25 @@ go_library( "//proto/prysm/v1alpha1:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", + "@com_github_hashicorp_golang_lru//:go_default_library", "@com_github_holiman_uint256//:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", - "@com_github_sirupsen_logrus//:go_default_library", "@org_golang_x_sync//errgroup:go_default_library", ], ) go_test( name = "go_default_test", - srcs = ["helpers_test.go"], + srcs = [ + "das_core_test.go", + "info_test.go", + "p2p_interface_test.go", + "peer_sampling_test.go", + "reconstruction_test.go", + "utils_test.go", + ], deps = [ ":go_default_library", "//beacon-chain/blockchain/kzg:go_default_library", @@ -45,7 +55,9 @@ go_test( "//testing/util:go_default_library", "@com_github_consensys_gnark_crypto//ecc/bls12-381/fr:go_default_library", "@com_github_crate_crypto_go_kzg_4844//:go_default_library", + "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", + "@com_github_pkg_errors//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", ], ) diff --git a/beacon-chain/core/peerdas/helpers.go b/beacon-chain/core/peerdas/das_core.go similarity index 53% rename from beacon-chain/core/peerdas/helpers.go rename to beacon-chain/core/peerdas/das_core.go index 03d324b466ff..72d9c142cef9 100644 --- a/beacon-chain/core/peerdas/helpers.go +++ b/beacon-chain/core/peerdas/das_core.go @@ -3,49 +3,28 @@ package peerdas import ( "context" "encoding/binary" - "fmt" "math" - "math/big" "slices" "time" - fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" - "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" "github.com/holiman/uint256" - errors "github.com/pkg/errors" - + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg" - - "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v5/crypto/hash" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "golang.org/x/sync/errgroup" ) -const ( - CustodyGroupCountEnrKey = "cgc" -) - -// https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/p2p-interface.md#the-discovery-domain-discv5 -type Cgc uint64 - -func (Cgc) ENRKey() string { return CustodyGroupCountEnrKey } - var ( // Custom errors errCustodyGroupCountTooLarge = errors.New("custody group count too large") errWrongComputedCustodyGroupCount = errors.New("wrong computed custody group count, should never happen") - errIndexTooLarge = errors.New("column index is larger than the specified columns count") - errMismatchLength = errors.New("mismatch in the length of the commitments and proofs") - errRecordNil = errors.New("record is nil") - errCannotLoadCustodyGroupCount = errors.New("cannot load the custody group count from peer") // maxUint256 is the maximum value of a uint256. maxUint256 = &uint256.Int{math.MaxUint64, math.MaxUint64, math.MaxUint64, math.MaxUint64} @@ -117,67 +96,6 @@ func ComputeColumnsForCustodyGroup(custodyGroup uint64) ([]uint64, error) { return columns, nil } -// ComputeCustodyGroupForColumn computes the custody group for a given column. -// It is the reciprocal function of ComputeColumnsForCustodyGroup. -func ComputeCustodyGroupForColumn(columnIndex uint64) (uint64, error) { - beaconConfig := params.BeaconConfig() - numberOfColumns := beaconConfig.NumberOfColumns - - if columnIndex >= numberOfColumns { - return 0, errIndexTooLarge - } - - numberOfCustodyGroups := beaconConfig.NumberOfCustodyGroups - columnsPerGroup := numberOfColumns / numberOfCustodyGroups - - return columnIndex / columnsPerGroup, nil -} - -// ComputeSubnetForDataColumnSidecar computes the subnet for a data column sidecar. -// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#compute_subnet_for_data_column_sidecar -func ComputeSubnetForDataColumnSidecar(columnIndex uint64) uint64 { - dataColumnSidecarSubnetCount := params.BeaconConfig().DataColumnSidecarSubnetCount - return columnIndex % dataColumnSidecarSubnetCount -} - -// CustodyColumns computes the columns the node should custody. -func CustodyColumns(custodyGroups map[uint64]bool) (map[uint64]bool, error) { - numberOfCustodyGroups := params.BeaconConfig().NumberOfCustodyGroups - - custodyGroupCount := len(custodyGroups) - - // Compute the columns for each custody group. - columns := make(map[uint64]bool, custodyGroupCount) - for group := range custodyGroups { - if group >= numberOfCustodyGroups { - return nil, errCustodyGroupCountTooLarge - } - - groupColumns, err := ComputeColumnsForCustodyGroup(group) - if err != nil { - return nil, errors.Wrap(err, "compute columns for custody group") - } - - for _, column := range groupColumns { - columns[column] = true - } - } - - return columns, nil -} - -// DataColumnSubnets computes the subnets for the data columns. -func DataColumnSubnets(dataColumns map[uint64]bool) map[uint64]bool { - subnets := make(map[uint64]bool, len(dataColumns)) - - for column := range dataColumns { - subnet := ComputeSubnetForDataColumnSidecar(column) - subnets[subnet] = true - } - - return subnets -} - // DataColumnSidecars computes the data column sidecars from the signed block and blobs. // https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/das-core.md#get_data_column_sidecars func DataColumnSidecars(signedBlock interfaces.ReadOnlySignedBeaconBlock, blobs []kzg.Blob) ([]*ethpb.DataColumnSidecar, error) { @@ -273,34 +191,39 @@ func DataColumnSidecars(signedBlock interfaces.ReadOnlySignedBeaconBlock, blobs return sidecars, nil } -// populateAndFilterIndices returns a sorted slices of indices, setting all indices if none are provided, -// and filtering out indices higher than the blob count. -func populateAndFilterIndices(indices map[uint64]bool, blobCount uint64) []uint64 { - // If no indices are provided, provide all blobs. - if len(indices) == 0 { - for i := range blobCount { - indices[i] = true +// CustodyGroupSamplingSize returns the number of custody groups the node should sample from. +// https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#custody-sampling +func CustodyGroupSamplingSize() uint64 { + samplesPerSlot := params.BeaconConfig().SamplesPerSlot + custodyGroupCount := CustodyGroupCount() + + return max(samplesPerSlot, custodyGroupCount) +} + +// CustodyColumns computes the custody columns from the custody groups. +func CustodyColumns(custodyGroups map[uint64]bool) (map[uint64]bool, error) { + numberOfCustodyGroups := params.BeaconConfig().NumberOfCustodyGroups + + custodyGroupCount := len(custodyGroups) + + // Compute the columns for each custody group. + columns := make(map[uint64]bool, custodyGroupCount) + for group := range custodyGroups { + if group >= numberOfCustodyGroups { + return nil, errCustodyGroupCountTooLarge } - } - // Filter blobs index higher than the blob count. - filteredIndices := make(map[uint64]bool, len(indices)) - for i := range indices { - if i < blobCount { - filteredIndices[i] = true + groupColumns, err := ComputeColumnsForCustodyGroup(group) + if err != nil { + return nil, errors.Wrap(err, "compute columns for custody group") } - } - // Transform set to slice. - indicesSlice := make([]uint64, 0, len(filteredIndices)) - for i := range filteredIndices { - indicesSlice = append(indicesSlice, i) + for _, column := range groupColumns { + columns[column] = true + } } - // Sort the indices. - slices.Sort[[]uint64](indicesSlice) - - return indicesSlice + return columns, nil } // Blobs extract blobs from `dataColumnsSidecar`. @@ -410,268 +333,32 @@ func Blobs(indices map[uint64]bool, dataColumnsSidecar []*ethpb.DataColumnSideca return verifiedROBlobs, nil } -// DataColumnSidecarsForReconstruct is a TEMPORARY function until there is an official specification for it. -// It is scheduled for deletion. -func DataColumnSidecarsForReconstruct( - blobKzgCommitments [][]byte, - signedBlockHeader *ethpb.SignedBeaconBlockHeader, - kzgCommitmentsInclusionProof [][]byte, - cellsAndProofs []kzg.CellsAndProofs, -) ([]*ethpb.DataColumnSidecar, error) { - // Each CellsAndProofs corresponds to a Blob - // So we can get the BlobCount by checking the length of CellsAndProofs - blobsCount := len(cellsAndProofs) - if blobsCount == 0 { - return nil, nil - } - - // Get the column sidecars. - sidecars := make([]*ethpb.DataColumnSidecar, 0, fieldparams.NumberOfColumns) - for columnIndex := uint64(0); columnIndex < fieldparams.NumberOfColumns; columnIndex++ { - column := make([]kzg.Cell, 0, blobsCount) - kzgProofOfColumn := make([]kzg.Proof, 0, blobsCount) - - for rowIndex := 0; rowIndex < blobsCount; rowIndex++ { - cellsForRow := cellsAndProofs[rowIndex].Cells - proofsForRow := cellsAndProofs[rowIndex].Proofs - - cell := cellsForRow[columnIndex] - column = append(column, cell) - - kzgProof := proofsForRow[columnIndex] - kzgProofOfColumn = append(kzgProofOfColumn, kzgProof) - } - - columnBytes := make([][]byte, 0, blobsCount) - for i := range column { - columnBytes = append(columnBytes, column[i][:]) - } - - kzgProofOfColumnBytes := make([][]byte, 0, blobsCount) - for _, kzgProof := range kzgProofOfColumn { - copiedProof := kzgProof - kzgProofOfColumnBytes = append(kzgProofOfColumnBytes, copiedProof[:]) - } - - sidecar := ðpb.DataColumnSidecar{ - ColumnIndex: columnIndex, - DataColumn: columnBytes, - KzgCommitments: blobKzgCommitments, - KzgProof: kzgProofOfColumnBytes, - SignedBlockHeader: signedBlockHeader, - KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof, - } - - sidecars = append(sidecars, sidecar) - } - - return sidecars, nil -} - -// VerifyDataColumnsSidecarKZGProofs verifies the provided KZG Proofs of data columns. -func VerifyDataColumnsSidecarKZGProofs(sidecars []blocks.RODataColumn) (bool, error) { - // Retrieve the number of columns. - numberOfColumns := params.BeaconConfig().NumberOfColumns - - // Compute the total count. - count := 0 - for _, sidecar := range sidecars { - count += len(sidecar.DataColumn) - } - - commitments := make([]kzg.Bytes48, 0, count) - indices := make([]uint64, 0, count) - cells := make([]kzg.Cell, 0, count) - proofs := make([]kzg.Bytes48, 0, count) - - for _, sidecar := range sidecars { - // Check if the columns index is not too large - if sidecar.ColumnIndex >= numberOfColumns { - return false, errIndexTooLarge - } - - // Check if the KZG commitments size and data column size match. - if len(sidecar.DataColumn) != len(sidecar.KzgCommitments) { - return false, errMismatchLength - } - - // Check if the KZG proofs size and data column size match. - if len(sidecar.DataColumn) != len(sidecar.KzgProof) { - return false, errMismatchLength - } - - for i := range sidecar.DataColumn { - commitments = append(commitments, kzg.Bytes48(sidecar.KzgCommitments[i])) - indices = append(indices, sidecar.ColumnIndex) - cells = append(cells, kzg.Cell(sidecar.DataColumn[i])) - proofs = append(proofs, kzg.Bytes48(sidecar.KzgProof[i])) - } - } - - // Verify all the batch at once. - verified, err := kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs) - if err != nil { - return false, errors.Wrap(err, "verify cell KZG proof batch") - } - - return verified, nil -} - -// CustodyGroupCount returns the number of groups the node should participate in for custody. -func CustodyGroupCount() uint64 { - if flags.Get().SubscribeToAllSubnets { - return params.BeaconConfig().NumberOfCustodyGroups - } - - return params.BeaconConfig().CustodyRequirement -} - -// CustodyGroupSamplingSize returns the number of custody groups the node should sample from. -// https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#custody-sampling -func CustodyGroupSamplingSize() uint64 { - samplesPerSlot := params.BeaconConfig().SamplesPerSlot - custodyGroupCount := CustodyGroupCount() - - return max(samplesPerSlot, custodyGroupCount) -} - -// HypergeomCDF computes the hypergeometric cumulative distribution function. -// https://en.wikipedia.org/wiki/Hypergeometric_distribution -func HypergeomCDF(k, M, n, N uint64) float64 { - denominatorInt := new(big.Int).Binomial(int64(M), int64(N)) // lint:ignore uintcast - denominator := new(big.Float).SetInt(denominatorInt) - - rBig := big.NewFloat(0) - - for i := uint64(0); i < k+1; i++ { - a := new(big.Int).Binomial(int64(n), int64(i)) // lint:ignore uintcast - b := new(big.Int).Binomial(int64(M-n), int64(N-i)) - numeratorInt := new(big.Int).Mul(a, b) - numerator := new(big.Float).SetInt(numeratorInt) - item := new(big.Float).Quo(numerator, denominator) - rBig.Add(rBig, item) - } - - r, _ := rBig.Float64() - - return r -} - -// ExtendedSampleCount computes, for a given number of samples per slot and allowed failures the -// number of samples we should actually query from peers. -// TODO: Add link to the specification once it is available. -func ExtendedSampleCount(samplesPerSlot, allowedFailures uint64) uint64 { - // Retrieve the columns count - columnsCount := params.BeaconConfig().NumberOfColumns - - // If half of the columns are missing, we are able to reconstruct the data. - // If half of the columns + 1 are missing, we are not able to reconstruct the data. - // This is the smallest worst case. - worstCaseMissing := columnsCount/2 + 1 - - // Compute the false positive threshold. - falsePositiveThreshold := HypergeomCDF(0, columnsCount, worstCaseMissing, samplesPerSlot) - - var sampleCount uint64 - - // Finally, compute the extended sample count. - for sampleCount = samplesPerSlot; sampleCount < columnsCount+1; sampleCount++ { - if HypergeomCDF(allowedFailures, columnsCount, worstCaseMissing, sampleCount) <= falsePositiveThreshold { - break +// populateAndFilterIndices returns a sorted slices of indices, setting all indices if none are provided, +// and filtering out indices higher than the blob count. +func populateAndFilterIndices(indices map[uint64]bool, blobCount uint64) []uint64 { + // If no indices are provided, provide all blobs. + if len(indices) == 0 { + for i := range blobCount { + indices[i] = true } } - return sampleCount -} - -// CustodyGroupCountFromRecord extracts the custody group count from an ENR record. -func CustodyGroupCountFromRecord(record *enr.Record) (uint64, error) { - if record == nil { - return 0, errRecordNil - } - - // Load the `cgc` - var cgc Cgc - if cgc := record.Load(&cgc); cgc != nil { - return 0, errCannotLoadCustodyGroupCount - } - - return uint64(cgc), nil -} - -func CanSelfReconstruct(custodyGroupCount uint64) bool { - total := params.BeaconConfig().NumberOfCustodyGroups - // If total is odd, then we need total / 2 + 1 columns to reconstruct. - // If total is even, then we need total / 2 columns to reconstruct. - custodyGroupsNeeded := total/2 + total%2 - return custodyGroupCount >= custodyGroupsNeeded -} - -// RecoverCellsAndProofs recovers the cells and proofs from the data column sidecars. -func RecoverCellsAndProofs( - dataColumnSideCars []*ethpb.DataColumnSidecar, - blockRoot [fieldparams.RootLength]byte, -) ([]kzg.CellsAndProofs, error) { - var wg errgroup.Group - - dataColumnSideCarsCount := len(dataColumnSideCars) - - if dataColumnSideCarsCount == 0 { - return nil, errors.New("no data column sidecars") - } - - // Check if all columns have the same length. - blobCount := len(dataColumnSideCars[0].DataColumn) - for _, sidecar := range dataColumnSideCars { - length := len(sidecar.DataColumn) - - if length != blobCount { - return nil, errors.New("columns do not have the same length") + // Filter blobs index higher than the blob count. + filteredIndices := make(map[uint64]bool, len(indices)) + for i := range indices { + if i < blobCount { + filteredIndices[i] = true } } - // Recover cells and compute proofs in parallel. - recoveredCellsAndProofs := make([]kzg.CellsAndProofs, blobCount) - - for blobIndex := 0; blobIndex < blobCount; blobIndex++ { - bIndex := blobIndex - wg.Go(func() error { - start := time.Now() - - cellsIndices := make([]uint64, 0, dataColumnSideCarsCount) - cells := make([]kzg.Cell, 0, dataColumnSideCarsCount) - - for _, sidecar := range dataColumnSideCars { - // Build the cell indices. - cellsIndices = append(cellsIndices, sidecar.ColumnIndex) - - // Get the cell. - column := sidecar.DataColumn - cell := column[bIndex] - - cells = append(cells, kzg.Cell(cell)) - } - - // Recover the cells and proofs for the corresponding blob - cellsAndProofs, err := kzg.RecoverCellsAndKZGProofs(cellsIndices, cells) - - if err != nil { - return errors.Wrapf(err, "recover cells and KZG proofs for blob %d", bIndex) - } - - recoveredCellsAndProofs[bIndex] = cellsAndProofs - log.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - "index": bIndex, - "root": fmt.Sprintf("%x", blockRoot), - }).Debug("Recovered cells and proofs") - return nil - }) + // Transform set to slice. + indicesSlice := make([]uint64, 0, len(filteredIndices)) + for i := range filteredIndices { + indicesSlice = append(indicesSlice, i) } - if err := wg.Wait(); err != nil { - return nil, err - } + // Sort the indices. + slices.Sort[[]uint64](indicesSlice) - return recoveredCellsAndProofs, nil + return indicesSlice } diff --git a/beacon-chain/core/peerdas/das_core_test.go b/beacon-chain/core/peerdas/das_core_test.go new file mode 100644 index 000000000000..e306cc838107 --- /dev/null +++ b/beacon-chain/core/peerdas/das_core_test.go @@ -0,0 +1,149 @@ +package peerdas_test + +import ( + "testing" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/testing/require" + "github.com/prysmaticlabs/prysm/v5/testing/util" +) + +func TestDataColumnSidecars(t *testing.T) { + var expected []*ethpb.DataColumnSidecar = nil + actual, err := peerdas.DataColumnSidecars(nil, []kzg.Blob{}) + require.NoError(t, err) + + require.DeepSSZEqual(t, expected, actual) +} + +func TestBlobs(t *testing.T) { + blobsIndice := map[uint64]bool{} + + almostAllColumns := make([]*ethpb.DataColumnSidecar, 0, fieldparams.NumberOfColumns/2) + for i := 2; i < fieldparams.NumberOfColumns/2+2; i++ { + almostAllColumns = append(almostAllColumns, ðpb.DataColumnSidecar{ + ColumnIndex: uint64(i), + }) + } + + testCases := []struct { + name string + input []*ethpb.DataColumnSidecar + expected []*blocks.VerifiedROBlob + err error + }{ + { + name: "empty input", + input: []*ethpb.DataColumnSidecar{}, + expected: nil, + err: errors.New("some columns are missing: [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63]"), + }, + { + name: "missing columns", + input: almostAllColumns, + expected: nil, + err: errors.New("some columns are missing: [0 1]"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual, err := peerdas.Blobs(blobsIndice, tc.input) + if tc.err != nil { + require.Equal(t, tc.err.Error(), err.Error()) + } else { + require.NoError(t, err) + } + require.DeepSSZEqual(t, tc.expected, actual) + }) + } +} + +func TestDataColumnsSidecarsBlobsRoundtrip(t *testing.T) { + const blobCount = 5 + blobsIndex := map[uint64]bool{} + + // Start the trusted setup. + err := kzg.Start() + require.NoError(t, err) + + // Create a protobuf signed beacon block. + signedBeaconBlockPb := util.NewBeaconBlockDeneb() + + // Generate random blobs and their corresponding commitments and proofs. + blobs := make([]kzg.Blob, 0, blobCount) + blobKzgCommitments := make([]*kzg.Commitment, 0, blobCount) + blobKzgProofs := make([]*kzg.Proof, 0, blobCount) + + for blobIndex := range blobCount { + // Create a random blob. + blob := getRandBlob(int64(blobIndex)) + blobs = append(blobs, blob) + + // Generate a blobKZGCommitment for the blob. + blobKZGCommitment, proof, err := generateCommitmentAndProof(&blob) + require.NoError(t, err) + + blobKzgCommitments = append(blobKzgCommitments, blobKZGCommitment) + blobKzgProofs = append(blobKzgProofs, proof) + } + + // Set the commitments into the block. + blobZkgCommitmentsBytes := make([][]byte, 0, blobCount) + for _, blobKZGCommitment := range blobKzgCommitments { + blobZkgCommitmentsBytes = append(blobZkgCommitmentsBytes, blobKZGCommitment[:]) + } + + signedBeaconBlockPb.Block.Body.BlobKzgCommitments = blobZkgCommitmentsBytes + + // Generate verified RO blobs. + verifiedROBlobs := make([]*blocks.VerifiedROBlob, 0, blobCount) + + // Create a signed beacon block from the protobuf. + signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb) + require.NoError(t, err) + + commitmentInclusionProof, err := blocks.MerkleProofKZGCommitments(signedBeaconBlock.Block().Body()) + require.NoError(t, err) + + for blobIndex := range blobCount { + blob := blobs[blobIndex] + blobKZGCommitment := blobKzgCommitments[blobIndex] + blobKzgProof := blobKzgProofs[blobIndex] + + // Get the signed beacon block header. + signedBeaconBlockHeader, err := signedBeaconBlock.Header() + require.NoError(t, err) + + blobSidecar := ðpb.BlobSidecar{ + Index: uint64(blobIndex), + Blob: blob[:], + KzgCommitment: blobKZGCommitment[:], + KzgProof: blobKzgProof[:], + SignedBlockHeader: signedBeaconBlockHeader, + CommitmentInclusionProof: commitmentInclusionProof, + } + + roBlob, err := blocks.NewROBlob(blobSidecar) + require.NoError(t, err) + + verifiedROBlob := blocks.NewVerifiedROBlob(roBlob) + verifiedROBlobs = append(verifiedROBlobs, &verifiedROBlob) + } + + // Compute data columns sidecars from the signed beacon block and from the blobs. + dataColumnsSidecar, err := peerdas.DataColumnSidecars(signedBeaconBlock, blobs) + require.NoError(t, err) + + // Compute the blobs from the data columns sidecar. + roundtripBlobs, err := peerdas.Blobs(blobsIndex, dataColumnsSidecar) + require.NoError(t, err) + + // Check that the blobs are the same. + require.DeepSSZEqual(t, verifiedROBlobs, roundtripBlobs) +} diff --git a/beacon-chain/core/peerdas/helpers_test.go b/beacon-chain/core/peerdas/helpers_test.go deleted file mode 100644 index 78578fa6a68d..000000000000 --- a/beacon-chain/core/peerdas/helpers_test.go +++ /dev/null @@ -1,531 +0,0 @@ -package peerdas_test - -import ( - "bytes" - "crypto/sha256" - "encoding/binary" - "errors" - "fmt" - "testing" - - "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" - GoKZG "github.com/crate-crypto/go-kzg-4844" - "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" - "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" - fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" - "github.com/prysmaticlabs/prysm/v5/config/params" - "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" - ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v5/testing/require" - "github.com/prysmaticlabs/prysm/v5/testing/util" - "github.com/sirupsen/logrus" -) - -func deterministicRandomness(seed int64) [32]byte { - // Converts an int64 to a byte slice - buf := new(bytes.Buffer) - err := binary.Write(buf, binary.BigEndian, seed) - if err != nil { - logrus.WithError(err).Error("Failed to write int64 to bytes buffer") - return [32]byte{} - } - bytes := buf.Bytes() - - return sha256.Sum256(bytes) -} - -// Returns a serialized random field element in big-endian -func GetRandFieldElement(seed int64) [32]byte { - bytes := deterministicRandomness(seed) - var r fr.Element - r.SetBytes(bytes[:]) - - return GoKZG.SerializeScalar(r) -} - -// Returns a random blob using the passed seed as entropy -func GetRandBlob(seed int64) kzg.Blob { - var blob kzg.Blob - bytesPerBlob := GoKZG.ScalarsPerBlob * GoKZG.SerializedScalarSize - for i := 0; i < bytesPerBlob; i += GoKZG.SerializedScalarSize { - fieldElementBytes := GetRandFieldElement(seed + int64(i)) - copy(blob[i:i+GoKZG.SerializedScalarSize], fieldElementBytes[:]) - } - return blob -} - -func GenerateCommitmentAndProof(blob *kzg.Blob) (*kzg.Commitment, *kzg.Proof, error) { - commitment, err := kzg.BlobToKZGCommitment(blob) - if err != nil { - return nil, nil, err - } - proof, err := kzg.ComputeBlobKZGProof(blob, commitment) - if err != nil { - return nil, nil, err - } - return &commitment, &proof, err -} - -func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) { - dbBlock := util.NewBeaconBlockDeneb() - require.NoError(t, kzg.Start()) - - var ( - comms [][]byte - blobs []kzg.Blob - ) - for i := int64(0); i < 6; i++ { - blob := GetRandBlob(i) - commitment, _, err := GenerateCommitmentAndProof(&blob) - require.NoError(t, err) - comms = append(comms, commitment[:]) - blobs = append(blobs, blob) - } - - dbBlock.Block.Body.BlobKzgCommitments = comms - sBlock, err := blocks.NewSignedBeaconBlock(dbBlock) - require.NoError(t, err) - sCars, err := peerdas.DataColumnSidecars(sBlock, blobs) - require.NoError(t, err) - - for i, sidecar := range sCars { - roCol, err := blocks.NewRODataColumn(sidecar) - require.NoError(t, err) - verified, err := peerdas.VerifyDataColumnsSidecarKZGProofs([]blocks.RODataColumn{roCol}) - require.NoError(t, err) - require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i)) - } -} - -func TestDataColumnSidecars(t *testing.T) { - var expected []*ethpb.DataColumnSidecar = nil - actual, err := peerdas.DataColumnSidecars(nil, []kzg.Blob{}) - require.NoError(t, err) - - require.DeepSSZEqual(t, expected, actual) -} - -func TestBlobs(t *testing.T) { - blobsIndice := map[uint64]bool{} - - almostAllColumns := make([]*ethpb.DataColumnSidecar, 0, fieldparams.NumberOfColumns/2) - for i := 2; i < fieldparams.NumberOfColumns/2+2; i++ { - almostAllColumns = append(almostAllColumns, ðpb.DataColumnSidecar{ - ColumnIndex: uint64(i), - }) - } - - testCases := []struct { - name string - input []*ethpb.DataColumnSidecar - expected []*blocks.VerifiedROBlob - err error - }{ - { - name: "empty input", - input: []*ethpb.DataColumnSidecar{}, - expected: nil, - err: errors.New("some columns are missing: [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63]"), - }, - { - name: "missing columns", - input: almostAllColumns, - expected: nil, - err: errors.New("some columns are missing: [0 1]"), - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - actual, err := peerdas.Blobs(blobsIndice, tc.input) - if tc.err != nil { - require.Equal(t, tc.err.Error(), err.Error()) - } else { - require.NoError(t, err) - } - require.DeepSSZEqual(t, tc.expected, actual) - }) - } -} - -func TestDataColumnsSidecarsBlobsRoundtrip(t *testing.T) { - const blobCount = 5 - blobsIndex := map[uint64]bool{} - - // Start the trusted setup. - err := kzg.Start() - require.NoError(t, err) - - // Create a protobuf signed beacon block. - signedBeaconBlockPb := util.NewBeaconBlockDeneb() - - // Generate random blobs and their corresponding commitments and proofs. - blobs := make([]kzg.Blob, 0, blobCount) - blobKzgCommitments := make([]*kzg.Commitment, 0, blobCount) - blobKzgProofs := make([]*kzg.Proof, 0, blobCount) - - for blobIndex := range blobCount { - // Create a random blob. - blob := GetRandBlob(int64(blobIndex)) - blobs = append(blobs, blob) - - // Generate a blobKZGCommitment for the blob. - blobKZGCommitment, proof, err := GenerateCommitmentAndProof(&blob) - require.NoError(t, err) - - blobKzgCommitments = append(blobKzgCommitments, blobKZGCommitment) - blobKzgProofs = append(blobKzgProofs, proof) - } - - // Set the commitments into the block. - blobZkgCommitmentsBytes := make([][]byte, 0, blobCount) - for _, blobKZGCommitment := range blobKzgCommitments { - blobZkgCommitmentsBytes = append(blobZkgCommitmentsBytes, blobKZGCommitment[:]) - } - - signedBeaconBlockPb.Block.Body.BlobKzgCommitments = blobZkgCommitmentsBytes - - // Generate verified RO blobs. - verifiedROBlobs := make([]*blocks.VerifiedROBlob, 0, blobCount) - - // Create a signed beacon block from the protobuf. - signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb) - require.NoError(t, err) - - commitmentInclusionProof, err := blocks.MerkleProofKZGCommitments(signedBeaconBlock.Block().Body()) - require.NoError(t, err) - - for blobIndex := range blobCount { - blob := blobs[blobIndex] - blobKZGCommitment := blobKzgCommitments[blobIndex] - blobKzgProof := blobKzgProofs[blobIndex] - - // Get the signed beacon block header. - signedBeaconBlockHeader, err := signedBeaconBlock.Header() - require.NoError(t, err) - - blobSidecar := ðpb.BlobSidecar{ - Index: uint64(blobIndex), - Blob: blob[:], - KzgCommitment: blobKZGCommitment[:], - KzgProof: blobKzgProof[:], - SignedBlockHeader: signedBeaconBlockHeader, - CommitmentInclusionProof: commitmentInclusionProof, - } - - roBlob, err := blocks.NewROBlob(blobSidecar) - require.NoError(t, err) - - verifiedROBlob := blocks.NewVerifiedROBlob(roBlob) - verifiedROBlobs = append(verifiedROBlobs, &verifiedROBlob) - } - - // Compute data columns sidecars from the signed beacon block and from the blobs. - dataColumnsSidecar, err := peerdas.DataColumnSidecars(signedBeaconBlock, blobs) - require.NoError(t, err) - - // Compute the blobs from the data columns sidecar. - roundtripBlobs, err := peerdas.Blobs(blobsIndex, dataColumnsSidecar) - require.NoError(t, err) - - // Check that the blobs are the same. - require.DeepSSZEqual(t, verifiedROBlobs, roundtripBlobs) -} - -func TestCustodyGroupCount(t *testing.T) { - testCases := []struct { - name string - subscribeToAllSubnets bool - expected uint64 - }{ - { - name: "subscribeToAllSubnets=false", - subscribeToAllSubnets: false, - expected: params.BeaconConfig().CustodyRequirement, - }, - { - name: "subscribeToAllSubnets=true", - subscribeToAllSubnets: true, - expected: params.BeaconConfig().DataColumnSidecarSubnetCount, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Set flags. - resetFlags := flags.Get() - defer func() { - flags.Init(resetFlags) - }() - - params.SetupTestConfigCleanup(t) - gFlags := new(flags.GlobalFlags) - gFlags.SubscribeToAllSubnets = tc.subscribeToAllSubnets - flags.Init(gFlags) - - // Get the custody subnet count. - actual := peerdas.CustodyGroupCount() - require.Equal(t, tc.expected, actual) - }) - } -} - -func TestHypergeomCDF(t *testing.T) { - // Test case from https://en.wikipedia.org/wiki/Hypergeometric_distribution - // Population size: 1000, number of successes in population: 500, sample size: 10, number of successes in sample: 5 - // Expected result: 0.072 - const ( - expected = 0.0796665913283742 - margin = 0.000001 - ) - - actual := peerdas.HypergeomCDF(5, 128, 65, 16) - require.Equal(t, true, expected-margin <= actual && actual <= expected+margin) -} - -func TestExtendedSampleCount(t *testing.T) { - const samplesPerSlot = 16 - - testCases := []struct { - name string - allowedMissings uint64 - extendedSampleCount uint64 - }{ - {name: "allowedMissings=0", allowedMissings: 0, extendedSampleCount: 16}, - {name: "allowedMissings=1", allowedMissings: 1, extendedSampleCount: 20}, - {name: "allowedMissings=2", allowedMissings: 2, extendedSampleCount: 24}, - {name: "allowedMissings=3", allowedMissings: 3, extendedSampleCount: 27}, - {name: "allowedMissings=4", allowedMissings: 4, extendedSampleCount: 29}, - {name: "allowedMissings=5", allowedMissings: 5, extendedSampleCount: 32}, - {name: "allowedMissings=6", allowedMissings: 6, extendedSampleCount: 35}, - {name: "allowedMissings=7", allowedMissings: 7, extendedSampleCount: 37}, - {name: "allowedMissings=8", allowedMissings: 8, extendedSampleCount: 40}, - {name: "allowedMissings=9", allowedMissings: 9, extendedSampleCount: 42}, - {name: "allowedMissings=10", allowedMissings: 10, extendedSampleCount: 44}, - {name: "allowedMissings=11", allowedMissings: 11, extendedSampleCount: 47}, - {name: "allowedMissings=12", allowedMissings: 12, extendedSampleCount: 49}, - {name: "allowedMissings=13", allowedMissings: 13, extendedSampleCount: 51}, - {name: "allowedMissings=14", allowedMissings: 14, extendedSampleCount: 53}, - {name: "allowedMissings=15", allowedMissings: 15, extendedSampleCount: 55}, - {name: "allowedMissings=16", allowedMissings: 16, extendedSampleCount: 57}, - {name: "allowedMissings=17", allowedMissings: 17, extendedSampleCount: 59}, - {name: "allowedMissings=18", allowedMissings: 18, extendedSampleCount: 61}, - {name: "allowedMissings=19", allowedMissings: 19, extendedSampleCount: 63}, - {name: "allowedMissings=20", allowedMissings: 20, extendedSampleCount: 65}, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - result := peerdas.ExtendedSampleCount(samplesPerSlot, tc.allowedMissings) - require.Equal(t, tc.extendedSampleCount, result) - }) - } -} - -func TestCustodyGroupCountFromRecord(t *testing.T) { - const expected uint64 = 7 - - // Create an Ethereum record. - record := &enr.Record{} - record.Set(peerdas.Cgc(expected)) - - actual, err := peerdas.CustodyGroupCountFromRecord(record) - require.NoError(t, err) - require.Equal(t, expected, actual) -} - -func TestCanSelfReconstruct(t *testing.T) { - testCases := []struct { - name string - totalNumberOfCustodyGroups uint64 - custodyNumberOfGroups uint64 - expected bool - }{ - { - name: "totalNumberOfCustodyGroups=64, custodyNumberOfGroups=31", - totalNumberOfCustodyGroups: 64, - custodyNumberOfGroups: 31, - expected: false, - }, - { - name: "totalNumberOfCustodyGroups=64, custodyNumberOfGroups=32", - totalNumberOfCustodyGroups: 64, - custodyNumberOfGroups: 32, - expected: true, - }, - { - name: "totalNumberOfCustodyGroups=65, custodyNumberOfGroups=32", - totalNumberOfCustodyGroups: 65, - custodyNumberOfGroups: 32, - expected: false, - }, - { - name: "totalNumberOfCustodyGroups=63, custodyNumberOfGroups=33", - totalNumberOfCustodyGroups: 65, - custodyNumberOfGroups: 33, - expected: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Set the total number of columns. - params.SetupTestConfigCleanup(t) - cfg := params.BeaconConfig().Copy() - cfg.NumberOfCustodyGroups = tc.totalNumberOfCustodyGroups - params.OverrideBeaconConfig(cfg) - - // Check if reconstuction is possible. - actual := peerdas.CanSelfReconstruct(tc.custodyNumberOfGroups) - require.Equal(t, tc.expected, actual) - }) - } -} - -func TestReconstructionRoundTrip(t *testing.T) { - params.SetupTestConfigCleanup(t) - - const blobCount = 5 - - var blockRoot [fieldparams.RootLength]byte - - signedBeaconBlockPb := util.NewBeaconBlockDeneb() - require.NoError(t, kzg.Start()) - - // Generate random blobs and their corresponding commitments. - var ( - blobsKzgCommitments [][]byte - blobs []kzg.Blob - ) - for i := range blobCount { - blob := GetRandBlob(int64(i)) - commitment, _, err := GenerateCommitmentAndProof(&blob) - require.NoError(t, err) - - blobsKzgCommitments = append(blobsKzgCommitments, commitment[:]) - blobs = append(blobs, blob) - } - - // Generate a signed beacon block. - signedBeaconBlockPb.Block.Body.BlobKzgCommitments = blobsKzgCommitments - signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb) - require.NoError(t, err) - - // Get the signed beacon block header. - signedBeaconBlockHeader, err := signedBeaconBlock.Header() - require.NoError(t, err) - - // Convert data columns sidecars from signed block and blobs. - dataColumnSidecars, err := peerdas.DataColumnSidecars(signedBeaconBlock, blobs) - require.NoError(t, err) - - // Create verified RO data columns. - verifiedRoDataColumns := make([]*blocks.VerifiedRODataColumn, 0, blobCount) - for _, dataColumnSidecar := range dataColumnSidecars { - roDataColumn, err := blocks.NewRODataColumn(dataColumnSidecar) - require.NoError(t, err) - - verifiedRoDataColumn := blocks.NewVerifiedRODataColumn(roDataColumn) - verifiedRoDataColumns = append(verifiedRoDataColumns, &verifiedRoDataColumn) - } - - verifiedRoDataColumn := verifiedRoDataColumns[0] - - numberOfColumns := params.BeaconConfig().NumberOfColumns - - var noDataColumns []*ethpb.DataColumnSidecar - dataColumnsWithDifferentLengths := []*ethpb.DataColumnSidecar{ - {DataColumn: [][]byte{{}, {}}}, - {DataColumn: [][]byte{{}}}, - } - notEnoughDataColumns := dataColumnSidecars[:numberOfColumns/2-1] - originalDataColumns := dataColumnSidecars[:numberOfColumns/2] - extendedDataColumns := dataColumnSidecars[numberOfColumns/2:] - evenDataColumns := make([]*ethpb.DataColumnSidecar, 0, numberOfColumns/2) - oddDataColumns := make([]*ethpb.DataColumnSidecar, 0, numberOfColumns/2) - allDataColumns := dataColumnSidecars - - for i, dataColumn := range dataColumnSidecars { - if i%2 == 0 { - evenDataColumns = append(evenDataColumns, dataColumn) - } else { - oddDataColumns = append(oddDataColumns, dataColumn) - } - } - - testCases := []struct { - name string - dataColumnsSidecar []*ethpb.DataColumnSidecar - isError bool - }{ - { - name: "No data columns sidecars", - dataColumnsSidecar: noDataColumns, - isError: true, - }, - { - name: "Data columns sidecar with different lengths", - dataColumnsSidecar: dataColumnsWithDifferentLengths, - isError: true, - }, - { - name: "All columns are present (no actual need to reconstruct)", - dataColumnsSidecar: allDataColumns, - isError: false, - }, - { - name: "Only original columns are present", - dataColumnsSidecar: originalDataColumns, - isError: false, - }, - { - name: "Only extended columns are present", - dataColumnsSidecar: extendedDataColumns, - isError: false, - }, - { - name: "Only even columns are present", - dataColumnsSidecar: evenDataColumns, - isError: false, - }, - { - name: "Only odd columns are present", - dataColumnsSidecar: oddDataColumns, - isError: false, - }, - { - name: "Not enough columns to reconstruct", - dataColumnsSidecar: notEnoughDataColumns, - isError: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Recover cells and proofs from available data columns sidecars. - cellsAndProofs, err := peerdas.RecoverCellsAndProofs(tc.dataColumnsSidecar, blockRoot) - isError := (err != nil) - require.Equal(t, tc.isError, isError) - - if isError { - return - } - - // Recover all data columns sidecars from cells and proofs. - reconstructedDataColumnsSideCars, err := peerdas.DataColumnSidecarsForReconstruct( - blobsKzgCommitments, - signedBeaconBlockHeader, - verifiedRoDataColumn.KzgCommitmentsInclusionProof, - cellsAndProofs, - ) - - require.NoError(t, err) - - expected := dataColumnSidecars - actual := reconstructedDataColumnsSideCars - require.DeepSSZEqual(t, expected, actual) - }) - } -} diff --git a/beacon-chain/core/peerdas/info.go b/beacon-chain/core/peerdas/info.go new file mode 100644 index 000000000000..b0ac6a8558a2 --- /dev/null +++ b/beacon-chain/core/peerdas/info.go @@ -0,0 +1,103 @@ +package peerdas + +import ( + "encoding/binary" + "sync" + + "github.com/ethereum/go-ethereum/p2p/enode" + lru "github.com/hashicorp/golang-lru" + "github.com/pkg/errors" +) + +// info contains all useful peerDAS related information regarding a peer. +type info struct { + CustodyGroups map[uint64]bool + CustodyColumns map[uint64]bool + DataColumnsSubnets map[uint64]bool +} + +const ( + cacheSize = 200 + keySize = 32 + 8 +) + +var ( + mut sync.Mutex + cache *lru.Cache +) + +// Info returns the peerDAS information for a given nodeID and custodyGroupCount. +// It returns a boolean indicating if the peer info was already in the cache and an error if any. +func Info(nodeID enode.ID, custodyGroupCount uint64) (*info, bool, error) { + // Create a new cache if it doesn't exist. + if err := createCacheIfNeeded(); err != nil { + return nil, false, errors.Wrap(err, "create cache if needed") + } + + // Compute the key. + key := computeKey(nodeID, custodyGroupCount) + + // If the value is already in the cache, return it. + if value, ok := cache.Get(key); ok { + peerInfo, ok := value.(*info) + if !ok { + return nil, false, errors.New("failed to cast peer info (should never happen)") + } + + return peerInfo, true, nil + } + + // The peer info is not in the cache, compute it. + // Compute custody groups. + custodyGroups, err := CustodyGroups(nodeID, custodyGroupCount) + if err != nil { + return nil, false, errors.Wrap(err, "custody groups") + } + + // Compute custody columns. + custodyColumns, err := CustodyColumns(custodyGroups) + if err != nil { + return nil, false, errors.Wrap(err, "custody columns") + } + + // Compute data columns subnets. + dataColumnsSubnets := DataColumnSubnets(custodyColumns) + + result := &info{ + CustodyGroups: custodyGroups, + CustodyColumns: custodyColumns, + DataColumnsSubnets: dataColumnsSubnets, + } + + // Add the result to the cache. + cache.Add(key, result) + + return result, false, nil +} + +// createCacheIfNeeded creates a new cache if it doesn't exist. +func createCacheIfNeeded() error { + mut.Lock() + defer mut.Unlock() + + if cache == nil { + c, err := lru.New(cacheSize) + if err != nil { + return errors.Wrap(err, "lru new") + } + + cache = c + } + + return nil +} + +// computeKey returns a unique key for a node and its custodyGroupCount. +func computeKey(nodeID enode.ID, custodyGroupCount uint64) [keySize]byte { + var key [keySize]byte + + copy(key[:32], nodeID[:]) + binary.BigEndian.PutUint64(key[32:], custodyGroupCount) + + return key +} diff --git a/beacon-chain/core/peerdas/info_test.go b/beacon-chain/core/peerdas/info_test.go new file mode 100644 index 000000000000..714c78ced673 --- /dev/null +++ b/beacon-chain/core/peerdas/info_test.go @@ -0,0 +1,27 @@ +package peerdas_test + +import ( + "testing" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +func TestInfo(t *testing.T) { + nodeID := enode.ID{} + custodyGroupCount := uint64(7) + + expectedCustodyGroup := map[uint64]bool{1: true, 17: true, 19: true, 42: true, 75: true, 87: true, 102: true} + expectedCustodyColumns := map[uint64]bool{1: true, 17: true, 19: true, 42: true, 75: true, 87: true, 102: true} + expectedDataColumnsSubnets := map[uint64]bool{1: true, 17: true, 19: true, 42: true, 75: true, 87: true, 102: true} + + for _, cached := range []bool{false, true} { + actual, ok, err := peerdas.Info(nodeID, custodyGroupCount) + require.NoError(t, err) + require.Equal(t, cached, ok) + require.DeepEqual(t, expectedCustodyGroup, actual.CustodyGroups) + require.DeepEqual(t, expectedCustodyColumns, actual.CustodyColumns) + require.DeepEqual(t, expectedDataColumnsSubnets, actual.DataColumnsSubnets) + } +} diff --git a/beacon-chain/core/peerdas/log.go b/beacon-chain/core/peerdas/log.go deleted file mode 100644 index ff09a77f8286..000000000000 --- a/beacon-chain/core/peerdas/log.go +++ /dev/null @@ -1,5 +0,0 @@ -package peerdas - -import "github.com/sirupsen/logrus" - -var log = logrus.WithField("prefix", "peerdas") diff --git a/beacon-chain/core/peerdas/p2p_interface.go b/beacon-chain/core/peerdas/p2p_interface.go new file mode 100644 index 000000000000..784ea1450a68 --- /dev/null +++ b/beacon-chain/core/peerdas/p2p_interface.go @@ -0,0 +1,136 @@ +package peerdas + +import ( + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg" + "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" +) + +const ( + CustodyGroupCountEnrKey = "cgc" +) + +var ( + // Custom errors + errRecordNil = errors.New("record is nil") + errCannotLoadCustodyGroupCount = errors.New("cannot load the custody group count from peer") + errIndexTooLarge = errors.New("column index is larger than the specified columns count") + errMismatchLength = errors.New("mismatch in the length of the commitments and proofs") +) + +// https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/p2p-interface.md#the-discovery-domain-discv5 +type Cgc uint64 + +func (Cgc) ENRKey() string { return CustodyGroupCountEnrKey } + +// VerifyDataColumnsSidecarKZGProofs verifies the provided KZG Proofs of data columns. +// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#verify_data_column_sidecar_kzg_proofs +func VerifyDataColumnsSidecarKZGProofs(sidecars []blocks.RODataColumn) (bool, error) { + // Retrieve the number of columns. + numberOfColumns := params.BeaconConfig().NumberOfColumns + + // Compute the total count. + count := 0 + for _, sidecar := range sidecars { + count += len(sidecar.DataColumn) + } + + commitments := make([]kzg.Bytes48, 0, count) + indices := make([]uint64, 0, count) + cells := make([]kzg.Cell, 0, count) + proofs := make([]kzg.Bytes48, 0, count) + + for _, sidecar := range sidecars { + // Check if the columns index is not too large + if sidecar.ColumnIndex >= numberOfColumns { + return false, errIndexTooLarge + } + + // Check if the KZG commitments size and data column size match. + if len(sidecar.DataColumn) != len(sidecar.KzgCommitments) { + return false, errMismatchLength + } + + // Check if the KZG proofs size and data column size match. + if len(sidecar.DataColumn) != len(sidecar.KzgProof) { + return false, errMismatchLength + } + + for i := range sidecar.DataColumn { + commitments = append(commitments, kzg.Bytes48(sidecar.KzgCommitments[i])) + indices = append(indices, sidecar.ColumnIndex) + cells = append(cells, kzg.Cell(sidecar.DataColumn[i])) + proofs = append(proofs, kzg.Bytes48(sidecar.KzgProof[i])) + } + } + + // Verify all the batch at once. + verified, err := kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs) + if err != nil { + return false, errors.Wrap(err, "verify cell KZG proof batch") + } + + return verified, nil +} + +// ComputeSubnetForDataColumnSidecar computes the subnet for a data column sidecar. +// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#compute_subnet_for_data_column_sidecar +func ComputeSubnetForDataColumnSidecar(columnIndex uint64) uint64 { + dataColumnSidecarSubnetCount := params.BeaconConfig().DataColumnSidecarSubnetCount + return columnIndex % dataColumnSidecarSubnetCount +} + +// DataColumnSubnets computes the subnets for the data columns. +func DataColumnSubnets(dataColumns map[uint64]bool) map[uint64]bool { + subnets := make(map[uint64]bool, len(dataColumns)) + + for column := range dataColumns { + subnet := ComputeSubnetForDataColumnSidecar(column) + subnets[subnet] = true + } + + return subnets +} + +// ComputeCustodyGroupForColumn computes the custody group for a given column. +// It is the reciprocal function of ComputeColumnsForCustodyGroup. +func ComputeCustodyGroupForColumn(columnIndex uint64) (uint64, error) { + beaconConfig := params.BeaconConfig() + numberOfColumns := beaconConfig.NumberOfColumns + + if columnIndex >= numberOfColumns { + return 0, errIndexTooLarge + } + + numberOfCustodyGroups := beaconConfig.NumberOfCustodyGroups + columnsPerGroup := numberOfColumns / numberOfCustodyGroups + + return columnIndex / columnsPerGroup, nil +} + +// CustodyGroupCount returns the number of groups we should participate in for custody. +func CustodyGroupCount() uint64 { + if flags.Get().SubscribeToAllSubnets { + return params.BeaconConfig().NumberOfCustodyGroups + } + + return params.BeaconConfig().CustodyRequirement +} + +// CustodyGroupCountFromRecord extracts the custody group count from an ENR record. +func CustodyGroupCountFromRecord(record *enr.Record) (uint64, error) { + if record == nil { + return 0, errRecordNil + } + + // Load the `cgc` + var cgc Cgc + if cgc := record.Load(&cgc); cgc != nil { + return 0, errCannotLoadCustodyGroupCount + } + + return uint64(cgc), nil +} diff --git a/beacon-chain/core/peerdas/p2p_interface_test.go b/beacon-chain/core/peerdas/p2p_interface_test.go new file mode 100644 index 000000000000..bcb1e8159926 --- /dev/null +++ b/beacon-chain/core/peerdas/p2p_interface_test.go @@ -0,0 +1,96 @@ +package peerdas_test + +import ( + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v5/testing/require" + "github.com/prysmaticlabs/prysm/v5/testing/util" +) + +func TestCustodyGroupCount(t *testing.T) { + testCases := []struct { + name string + subscribeToAllSubnets bool + expected uint64 + }{ + { + name: "subscribeToAllSubnets=false", + subscribeToAllSubnets: false, + expected: params.BeaconConfig().CustodyRequirement, + }, + { + name: "subscribeToAllSubnets=true", + subscribeToAllSubnets: true, + expected: params.BeaconConfig().DataColumnSidecarSubnetCount, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Set flags. + resetFlags := flags.Get() + defer func() { + flags.Init(resetFlags) + }() + + params.SetupTestConfigCleanup(t) + gFlags := new(flags.GlobalFlags) + gFlags.SubscribeToAllSubnets = tc.subscribeToAllSubnets + flags.Init(gFlags) + + // Get the custody subnet count. + actual := peerdas.CustodyGroupCount() + require.Equal(t, tc.expected, actual) + }) + } +} + +func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) { + dbBlock := util.NewBeaconBlockDeneb() + require.NoError(t, kzg.Start()) + + var ( + comms [][]byte + blobs []kzg.Blob + ) + for i := int64(0); i < 6; i++ { + blob := getRandBlob(i) + commitment, _, err := generateCommitmentAndProof(&blob) + require.NoError(t, err) + comms = append(comms, commitment[:]) + blobs = append(blobs, blob) + } + + dbBlock.Block.Body.BlobKzgCommitments = comms + sBlock, err := blocks.NewSignedBeaconBlock(dbBlock) + require.NoError(t, err) + sCars, err := peerdas.DataColumnSidecars(sBlock, blobs) + require.NoError(t, err) + + for i, sidecar := range sCars { + roCol, err := blocks.NewRODataColumn(sidecar) + require.NoError(t, err) + verified, err := peerdas.VerifyDataColumnsSidecarKZGProofs([]blocks.RODataColumn{roCol}) + require.NoError(t, err) + require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i)) + } +} + +func TestCustodyGroupCountFromRecord(t *testing.T) { + const expected uint64 = 7 + + // Create an Ethereum record. + record := &enr.Record{} + record.Set(peerdas.Cgc(expected)) + + actual, err := peerdas.CustodyGroupCountFromRecord(record) + require.NoError(t, err) + require.Equal(t, expected, actual) +} diff --git a/beacon-chain/core/peerdas/peer_sampling.go b/beacon-chain/core/peerdas/peer_sampling.go new file mode 100644 index 000000000000..ac0fcfbcbda3 --- /dev/null +++ b/beacon-chain/core/peerdas/peer_sampling.go @@ -0,0 +1,56 @@ +package peerdas + +import ( + "math/big" + + "github.com/prysmaticlabs/prysm/v5/config/params" +) + +// ExtendedSampleCount computes, for a given number of samples per slot and allowed failures the +// number of samples we should actually query from peers. +// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/peer-sampling.md#get_extended_sample_count +func ExtendedSampleCount(samplesPerSlot, allowedFailures uint64) uint64 { + // Retrieve the columns count + columnsCount := params.BeaconConfig().NumberOfColumns + + // If half of the columns are missing, we are able to reconstruct the data. + // If half of the columns + 1 are missing, we are not able to reconstruct the data. + // This is the smallest worst case. + worstCaseMissing := columnsCount/2 + 1 + + // Compute the false positive threshold. + falsePositiveThreshold := HypergeomCDF(0, columnsCount, worstCaseMissing, samplesPerSlot) + + var sampleCount uint64 + + // Finally, compute the extended sample count. + for sampleCount = samplesPerSlot; sampleCount < columnsCount+1; sampleCount++ { + if HypergeomCDF(allowedFailures, columnsCount, worstCaseMissing, sampleCount) <= falsePositiveThreshold { + break + } + } + + return sampleCount +} + +// HypergeomCDF computes the hypergeometric cumulative distribution function. +// https://en.wikipedia.org/wiki/Hypergeometric_distribution +func HypergeomCDF(k, M, n, N uint64) float64 { + denominatorInt := new(big.Int).Binomial(int64(M), int64(N)) // lint:ignore uintcast + denominator := new(big.Float).SetInt(denominatorInt) + + rBig := big.NewFloat(0) + + for i := uint64(0); i < k+1; i++ { + a := new(big.Int).Binomial(int64(n), int64(i)) // lint:ignore uintcast + b := new(big.Int).Binomial(int64(M-n), int64(N-i)) + numeratorInt := new(big.Int).Mul(a, b) + numerator := new(big.Float).SetInt(numeratorInt) + item := new(big.Float).Quo(numerator, denominator) + rBig.Add(rBig, item) + } + + r, _ := rBig.Float64() + + return r +} diff --git a/beacon-chain/core/peerdas/peer_sampling_test.go b/beacon-chain/core/peerdas/peer_sampling_test.go new file mode 100644 index 000000000000..0b533d7ebb13 --- /dev/null +++ b/beacon-chain/core/peerdas/peer_sampling_test.go @@ -0,0 +1,60 @@ +package peerdas_test + +import ( + "testing" + + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +func TestExtendedSampleCount(t *testing.T) { + const samplesPerSlot = 16 + + testCases := []struct { + name string + allowedMissings uint64 + extendedSampleCount uint64 + }{ + {name: "allowedMissings=0", allowedMissings: 0, extendedSampleCount: 16}, + {name: "allowedMissings=1", allowedMissings: 1, extendedSampleCount: 20}, + {name: "allowedMissings=2", allowedMissings: 2, extendedSampleCount: 24}, + {name: "allowedMissings=3", allowedMissings: 3, extendedSampleCount: 27}, + {name: "allowedMissings=4", allowedMissings: 4, extendedSampleCount: 29}, + {name: "allowedMissings=5", allowedMissings: 5, extendedSampleCount: 32}, + {name: "allowedMissings=6", allowedMissings: 6, extendedSampleCount: 35}, + {name: "allowedMissings=7", allowedMissings: 7, extendedSampleCount: 37}, + {name: "allowedMissings=8", allowedMissings: 8, extendedSampleCount: 40}, + {name: "allowedMissings=9", allowedMissings: 9, extendedSampleCount: 42}, + {name: "allowedMissings=10", allowedMissings: 10, extendedSampleCount: 44}, + {name: "allowedMissings=11", allowedMissings: 11, extendedSampleCount: 47}, + {name: "allowedMissings=12", allowedMissings: 12, extendedSampleCount: 49}, + {name: "allowedMissings=13", allowedMissings: 13, extendedSampleCount: 51}, + {name: "allowedMissings=14", allowedMissings: 14, extendedSampleCount: 53}, + {name: "allowedMissings=15", allowedMissings: 15, extendedSampleCount: 55}, + {name: "allowedMissings=16", allowedMissings: 16, extendedSampleCount: 57}, + {name: "allowedMissings=17", allowedMissings: 17, extendedSampleCount: 59}, + {name: "allowedMissings=18", allowedMissings: 18, extendedSampleCount: 61}, + {name: "allowedMissings=19", allowedMissings: 19, extendedSampleCount: 63}, + {name: "allowedMissings=20", allowedMissings: 20, extendedSampleCount: 65}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := peerdas.ExtendedSampleCount(samplesPerSlot, tc.allowedMissings) + require.Equal(t, tc.extendedSampleCount, result) + }) + } +} + +func TestHypergeomCDF(t *testing.T) { + // Test case from https://en.wikipedia.org/wiki/Hypergeometric_distribution + // Population size: 1000, number of successes in population: 500, sample size: 10, number of successes in sample: 5 + // Expected result: 0.072 + const ( + expected = 0.0796665913283742 + margin = 0.000001 + ) + + actual := peerdas.HypergeomCDF(5, 128, 65, 16) + require.Equal(t, true, expected-margin <= actual && actual <= expected+margin) +} diff --git a/beacon-chain/core/peerdas/reconstruction.go b/beacon-chain/core/peerdas/reconstruction.go new file mode 100644 index 000000000000..af9fc341d807 --- /dev/null +++ b/beacon-chain/core/peerdas/reconstruction.go @@ -0,0 +1,139 @@ +package peerdas + +import ( + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" + "github.com/prysmaticlabs/prysm/v5/config/params" + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "golang.org/x/sync/errgroup" +) + +// CanSelfReconstruct returns true if the node can self-reconstruct all the data columns from its custody group count. +func CanSelfReconstruct(custodyGroupCount uint64) bool { + total := params.BeaconConfig().NumberOfCustodyGroups + // If total is odd, then we need total / 2 + 1 columns to reconstruct. + // If total is even, then we need total / 2 columns to reconstruct. + custodyGroupsNeeded := total/2 + total%2 + return custodyGroupCount >= custodyGroupsNeeded +} + +// RecoverCellsAndProofs recovers the cells and proofs from the data column sidecars. +func RecoverCellsAndProofs( + dataColumnSideCars []*ethpb.DataColumnSidecar, + blockRoot [fieldparams.RootLength]byte, +) ([]kzg.CellsAndProofs, error) { + var wg errgroup.Group + + dataColumnSideCarsCount := len(dataColumnSideCars) + + if dataColumnSideCarsCount == 0 { + return nil, errors.New("no data column sidecars") + } + + // Check if all columns have the same length. + blobCount := len(dataColumnSideCars[0].DataColumn) + for _, sidecar := range dataColumnSideCars { + length := len(sidecar.DataColumn) + + if length != blobCount { + return nil, errors.New("columns do not have the same length") + } + } + + // Recover cells and compute proofs in parallel. + recoveredCellsAndProofs := make([]kzg.CellsAndProofs, blobCount) + + for blobIndex := 0; blobIndex < blobCount; blobIndex++ { + bIndex := blobIndex + wg.Go(func() error { + cellsIndices := make([]uint64, 0, dataColumnSideCarsCount) + cells := make([]kzg.Cell, 0, dataColumnSideCarsCount) + + for _, sidecar := range dataColumnSideCars { + // Build the cell indices. + cellsIndices = append(cellsIndices, sidecar.ColumnIndex) + + // Get the cell. + column := sidecar.DataColumn + cell := column[bIndex] + + cells = append(cells, kzg.Cell(cell)) + } + + // Recover the cells and proofs for the corresponding blob + cellsAndProofs, err := kzg.RecoverCellsAndKZGProofs(cellsIndices, cells) + + if err != nil { + return errors.Wrapf(err, "recover cells and KZG proofs for blob %d", bIndex) + } + + recoveredCellsAndProofs[bIndex] = cellsAndProofs + return nil + }) + } + + if err := wg.Wait(); err != nil { + return nil, err + } + + return recoveredCellsAndProofs, nil +} + +// DataColumnSidecarsForReconstruct is a TEMPORARY function until there is an official specification for it. +// It is scheduled for deletion. +func DataColumnSidecarsForReconstruct( + blobKzgCommitments [][]byte, + signedBlockHeader *ethpb.SignedBeaconBlockHeader, + kzgCommitmentsInclusionProof [][]byte, + cellsAndProofs []kzg.CellsAndProofs, +) ([]*ethpb.DataColumnSidecar, error) { + // Each CellsAndProofs corresponds to a Blob + // So we can get the BlobCount by checking the length of CellsAndProofs + blobsCount := len(cellsAndProofs) + if blobsCount == 0 { + return nil, nil + } + + // Get the column sidecars. + sidecars := make([]*ethpb.DataColumnSidecar, 0, fieldparams.NumberOfColumns) + for columnIndex := uint64(0); columnIndex < fieldparams.NumberOfColumns; columnIndex++ { + column := make([]kzg.Cell, 0, blobsCount) + kzgProofOfColumn := make([]kzg.Proof, 0, blobsCount) + + for rowIndex := 0; rowIndex < blobsCount; rowIndex++ { + cellsForRow := cellsAndProofs[rowIndex].Cells + proofsForRow := cellsAndProofs[rowIndex].Proofs + + cell := cellsForRow[columnIndex] + column = append(column, cell) + + kzgProof := proofsForRow[columnIndex] + kzgProofOfColumn = append(kzgProofOfColumn, kzgProof) + } + + columnBytes := make([][]byte, 0, blobsCount) + for i := range column { + columnBytes = append(columnBytes, column[i][:]) + } + + kzgProofOfColumnBytes := make([][]byte, 0, blobsCount) + for _, kzgProof := range kzgProofOfColumn { + copiedProof := kzgProof + kzgProofOfColumnBytes = append(kzgProofOfColumnBytes, copiedProof[:]) + } + + sidecar := ðpb.DataColumnSidecar{ + ColumnIndex: columnIndex, + DataColumn: columnBytes, + KzgCommitments: blobKzgCommitments, + KzgProof: kzgProofOfColumnBytes, + SignedBlockHeader: signedBlockHeader, + KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof, + } + + sidecars = append(sidecars, sidecar) + } + + return sidecars, nil +} diff --git a/beacon-chain/core/peerdas/reconstruction_test.go b/beacon-chain/core/peerdas/reconstruction_test.go new file mode 100644 index 000000000000..5367d009ab1a --- /dev/null +++ b/beacon-chain/core/peerdas/reconstruction_test.go @@ -0,0 +1,208 @@ +package peerdas_test + +import ( + "testing" + + "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" + "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/testing/require" + "github.com/prysmaticlabs/prysm/v5/testing/util" +) + +func TestCanSelfReconstruct(t *testing.T) { + testCases := []struct { + name string + totalNumberOfCustodyGroups uint64 + custodyNumberOfGroups uint64 + expected bool + }{ + { + name: "totalNumberOfCustodyGroups=64, custodyNumberOfGroups=31", + totalNumberOfCustodyGroups: 64, + custodyNumberOfGroups: 31, + expected: false, + }, + { + name: "totalNumberOfCustodyGroups=64, custodyNumberOfGroups=32", + totalNumberOfCustodyGroups: 64, + custodyNumberOfGroups: 32, + expected: true, + }, + { + name: "totalNumberOfCustodyGroups=65, custodyNumberOfGroups=32", + totalNumberOfCustodyGroups: 65, + custodyNumberOfGroups: 32, + expected: false, + }, + { + name: "totalNumberOfCustodyGroups=63, custodyNumberOfGroups=33", + totalNumberOfCustodyGroups: 65, + custodyNumberOfGroups: 33, + expected: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Set the total number of columns. + params.SetupTestConfigCleanup(t) + cfg := params.BeaconConfig().Copy() + cfg.NumberOfCustodyGroups = tc.totalNumberOfCustodyGroups + params.OverrideBeaconConfig(cfg) + + // Check if reconstuction is possible. + actual := peerdas.CanSelfReconstruct(tc.custodyNumberOfGroups) + require.Equal(t, tc.expected, actual) + }) + } +} + +func TestReconstructionRoundTrip(t *testing.T) { + params.SetupTestConfigCleanup(t) + + const blobCount = 5 + + var blockRoot [fieldparams.RootLength]byte + + signedBeaconBlockPb := util.NewBeaconBlockDeneb() + require.NoError(t, kzg.Start()) + + // Generate random blobs and their corresponding commitments. + var ( + blobsKzgCommitments [][]byte + blobs []kzg.Blob + ) + for i := range blobCount { + blob := getRandBlob(int64(i)) + commitment, _, err := generateCommitmentAndProof(&blob) + require.NoError(t, err) + + blobsKzgCommitments = append(blobsKzgCommitments, commitment[:]) + blobs = append(blobs, blob) + } + + // Generate a signed beacon block. + signedBeaconBlockPb.Block.Body.BlobKzgCommitments = blobsKzgCommitments + signedBeaconBlock, err := blocks.NewSignedBeaconBlock(signedBeaconBlockPb) + require.NoError(t, err) + + // Get the signed beacon block header. + signedBeaconBlockHeader, err := signedBeaconBlock.Header() + require.NoError(t, err) + + // Convert data columns sidecars from signed block and blobs. + dataColumnSidecars, err := peerdas.DataColumnSidecars(signedBeaconBlock, blobs) + require.NoError(t, err) + + // Create verified RO data columns. + verifiedRoDataColumns := make([]*blocks.VerifiedRODataColumn, 0, blobCount) + for _, dataColumnSidecar := range dataColumnSidecars { + roDataColumn, err := blocks.NewRODataColumn(dataColumnSidecar) + require.NoError(t, err) + + verifiedRoDataColumn := blocks.NewVerifiedRODataColumn(roDataColumn) + verifiedRoDataColumns = append(verifiedRoDataColumns, &verifiedRoDataColumn) + } + + verifiedRoDataColumn := verifiedRoDataColumns[0] + + numberOfColumns := params.BeaconConfig().NumberOfColumns + + var noDataColumns []*ethpb.DataColumnSidecar + dataColumnsWithDifferentLengths := []*ethpb.DataColumnSidecar{ + {DataColumn: [][]byte{{}, {}}}, + {DataColumn: [][]byte{{}}}, + } + notEnoughDataColumns := dataColumnSidecars[:numberOfColumns/2-1] + originalDataColumns := dataColumnSidecars[:numberOfColumns/2] + extendedDataColumns := dataColumnSidecars[numberOfColumns/2:] + evenDataColumns := make([]*ethpb.DataColumnSidecar, 0, numberOfColumns/2) + oddDataColumns := make([]*ethpb.DataColumnSidecar, 0, numberOfColumns/2) + allDataColumns := dataColumnSidecars + + for i, dataColumn := range dataColumnSidecars { + if i%2 == 0 { + evenDataColumns = append(evenDataColumns, dataColumn) + } else { + oddDataColumns = append(oddDataColumns, dataColumn) + } + } + + testCases := []struct { + name string + dataColumnsSidecar []*ethpb.DataColumnSidecar + isError bool + }{ + { + name: "No data columns sidecars", + dataColumnsSidecar: noDataColumns, + isError: true, + }, + { + name: "Data columns sidecar with different lengths", + dataColumnsSidecar: dataColumnsWithDifferentLengths, + isError: true, + }, + { + name: "All columns are present (no actual need to reconstruct)", + dataColumnsSidecar: allDataColumns, + isError: false, + }, + { + name: "Only original columns are present", + dataColumnsSidecar: originalDataColumns, + isError: false, + }, + { + name: "Only extended columns are present", + dataColumnsSidecar: extendedDataColumns, + isError: false, + }, + { + name: "Only even columns are present", + dataColumnsSidecar: evenDataColumns, + isError: false, + }, + { + name: "Only odd columns are present", + dataColumnsSidecar: oddDataColumns, + isError: false, + }, + { + name: "Not enough columns to reconstruct", + dataColumnsSidecar: notEnoughDataColumns, + isError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Recover cells and proofs from available data columns sidecars. + cellsAndProofs, err := peerdas.RecoverCellsAndProofs(tc.dataColumnsSidecar, blockRoot) + isError := (err != nil) + require.Equal(t, tc.isError, isError) + + if isError { + return + } + + // Recover all data columns sidecars from cells and proofs. + reconstructedDataColumnsSideCars, err := peerdas.DataColumnSidecarsForReconstruct( + blobsKzgCommitments, + signedBeaconBlockHeader, + verifiedRoDataColumn.KzgCommitmentsInclusionProof, + cellsAndProofs, + ) + + require.NoError(t, err) + + expected := dataColumnSidecars + actual := reconstructedDataColumnsSideCars + require.DeepSSZEqual(t, expected, actual) + }) + } +} diff --git a/beacon-chain/core/peerdas/utils_test.go b/beacon-chain/core/peerdas/utils_test.go new file mode 100644 index 000000000000..f8768276a76d --- /dev/null +++ b/beacon-chain/core/peerdas/utils_test.go @@ -0,0 +1,57 @@ +package peerdas_test + +import ( + "bytes" + "crypto/sha256" + "encoding/binary" + + "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" + GoKZG "github.com/crate-crypto/go-kzg-4844" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg" + "github.com/sirupsen/logrus" +) + +func generateCommitmentAndProof(blob *kzg.Blob) (*kzg.Commitment, *kzg.Proof, error) { + commitment, err := kzg.BlobToKZGCommitment(blob) + if err != nil { + return nil, nil, err + } + proof, err := kzg.ComputeBlobKZGProof(blob, commitment) + if err != nil { + return nil, nil, err + } + return &commitment, &proof, err +} + +// Returns a random blob using the passed seed as entropy +func getRandBlob(seed int64) kzg.Blob { + var blob kzg.Blob + bytesPerBlob := GoKZG.ScalarsPerBlob * GoKZG.SerializedScalarSize + for i := 0; i < bytesPerBlob; i += GoKZG.SerializedScalarSize { + fieldElementBytes := getRandFieldElement(seed + int64(i)) + copy(blob[i:i+GoKZG.SerializedScalarSize], fieldElementBytes[:]) + } + return blob +} + +// Returns a serialized random field element in big-endian +func getRandFieldElement(seed int64) [32]byte { + bytes := deterministicRandomness(seed) + var r fr.Element + r.SetBytes(bytes[:]) + + return GoKZG.SerializeScalar(r) +} + +func deterministicRandomness(seed int64) [32]byte { + // Converts an int64 to a byte slice + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.BigEndian, seed) + if err != nil { + logrus.WithError(err).Error("Failed to write int64 to bytes buffer") + return [32]byte{} + } + bytes := buf.Bytes() + + return sha256.Sum256(bytes) +} diff --git a/beacon-chain/das/availability_columns.go b/beacon-chain/das/availability_columns.go index 0d5a1d89b40d..b6fd7211d665 100644 --- a/beacon-chain/das/availability_columns.go +++ b/beacon-chain/das/availability_columns.go @@ -156,21 +156,14 @@ func fullCommitmentsToCheck(nodeID enode.ID, block blocks.ROBlock, currentSlot p // Retrieve the groups count. custodyGroupCount := peerdas.CustodyGroupCount() - // Retrieve custody groups. - custodyGroups, err := peerdas.CustodyGroups(nodeID, custodyGroupCount) + // Retrieve peer info. + peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) if err != nil { - return nil, errors.Wrap(err, "custody groups") + return nil, errors.Wrap(err, "peer info") } - - // Retrieve custody columns. - custodyColumns, err := peerdas.CustodyColumns(custodyGroups) - if err != nil { - return nil, errors.Wrap(err, "custody columns") - } - // Create a safe commitments array for the custody columns. commitmentsArray := &safeCommitmentsArray{} - for column := range custodyColumns { + for column := range peerInfo.CustodyColumns { commitmentsArray[column] = kzgCommitments } diff --git a/beacon-chain/p2p/custody.go b/beacon-chain/p2p/custody.go index b8e055de80b1..218b0b75eb4c 100644 --- a/beacon-chain/p2p/custody.go +++ b/beacon-chain/p2p/custody.go @@ -29,12 +29,15 @@ func (s *Service) custodyGroupsAdmissiblePeers(peers []peer.ID, custodyGroupCoun // Retrieve the local node ID. localNodeId := s.NodeID() - // Retrieve the needed custody groups. - neededCustodyGroups, err := peerdas.CustodyGroups(localNodeId, custodyGroupCount) + // Retrieve the local node info. + localNodeInfo, _, err := peerdas.Info(localNodeId, custodyGroupCount) if err != nil { - return nil, errors.Wrap(err, "custody groups") + return nil, errors.Wrap(err, "peer info") } + // Retrieve the needed custody groups. + neededCustodyGroups := localNodeInfo.CustodyGroups + // Find the valid peers. validPeers := make([]peer.ID, 0, len(peers)) @@ -54,12 +57,14 @@ loop: return nil, errors.Wrap(err, "convert peer ID to node ID") } - // Get the custody groups of the remote peer. - remoteCustodyGroups, err := peerdas.CustodyGroups(remoteNodeID, remoteCustodyGroupCount) + // Retrieve the remote peer info. + remotePeerInfo, _, err := peerdas.Info(remoteNodeID, remoteCustodyGroupCount) if err != nil { - return nil, errors.Wrap(err, "custody groups") + return nil, errors.Wrap(err, "peer info") } + // Retrieve the custody groups of the remote peer. + remoteCustodyGroups := remotePeerInfo.CustodyGroups remoteCustodyGroupsCount := uint64(len(remoteCustodyGroups)) // If the remote peers custodies all the possible columns, add it to the list. diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index d50e3fde03ce..fa7cb1f38dde 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -164,12 +164,6 @@ func (s *Service) RefreshPersistentSubnets() { return } - // Initialize persistent column subnets. - if err := initializePersistentColumnSubnets(nodeID); err != nil { - log.WithError(err).Error("Could not initialize persistent column subnets") - return - } - // Get the current attestation subnet bitfield. bitV := bitfield.NewBitvector64() attestationCommittees := cache.SubnetIDs.GetAllSubnets() diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index 00c987f2e71e..28640ef6ab80 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -391,42 +391,6 @@ func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error { return nil } -// initializePersistentColumnSubnets initialize persistent column subnets -func initializePersistentColumnSubnets(id enode.ID) error { - // Check if the column subnets are already cached. - _, ok, expTime := cache.ColumnSubnetIDs.GetColumnSubnets() - if ok && expTime.After(time.Now()) { - return nil - } - - // Compute the number of custody groups we should sample. - custodyGroupSamplingSize := peerdas.CustodyGroupSamplingSize() - - // Compute the custody groups we should sample. - custodyGroups, err := peerdas.CustodyGroups(id, custodyGroupSamplingSize) - if err != nil { - return errors.Wrap(err, "custody groups") - } - - // Compute the column subnets for the custody groups. - custodyColumns, err := peerdas.CustodyColumns(custodyGroups) - if err != nil { - return errors.Wrap(err, "custody columns") - } - - // Compute subnets from the custody columns. - subnets := make([]uint64, 0, len(custodyColumns)) - for column := range custodyColumns { - subnet := peerdas.ComputeSubnetForDataColumnSidecar(column) - subnets = append(subnets, subnet) - } - - // Add the subnets to the cache. - cache.ColumnSubnetIDs.AddColumnSubnets(subnets) - - return nil -} - // Spec pseudocode definition: // // def compute_subscribed_subnets(node_id: NodeID, epoch: Epoch) -> Sequence[SubnetID]: @@ -559,20 +523,14 @@ func dataColumnSubnets(nodeID enode.ID, record *enr.Record) (map[uint64]bool, er return nil, errors.Wrap(err, "custody group count from record") } - // Retrieve the custody groups from the remote peer. - custodyGroups, err := peerdas.CustodyGroups(nodeID, custodyGroupCount) - if err != nil { - return nil, errors.Wrap(err, "custody groups") - } - - // Retrieve the custody columns from the groups. - custodyColumns, err := peerdas.CustodyColumns(custodyGroups) + // Retrieve the peer info. + peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) if err != nil { - return nil, errors.Wrap(err, "custody columns") + return nil, errors.Wrap(err, "peer info") } // Get custody columns subnets from the columns. - return peerdas.DataColumnSubnets(custodyColumns), nil + return peerInfo.DataColumnsSubnets, nil } // Parses the attestation subnets ENR entry in a node and extracts its value diff --git a/beacon-chain/sync/data_columns_reconstruct.go b/beacon-chain/sync/data_columns_reconstruct.go index 92464ae08ff5..0dff36fc5394 100644 --- a/beacon-chain/sync/data_columns_reconstruct.go +++ b/beacon-chain/sync/data_columns_reconstruct.go @@ -57,16 +57,10 @@ func (s *Service) reconstructDataColumns(ctx context.Context, verifiedRODataColu // Compute the custody group count. custodyGroupCount := peerdas.CustodyGroupCount() - // Compute the custody groups. - custodyGroups, err := peerdas.CustodyGroups(nodeID, custodyGroupCount) + // Retrieve our local node info. + localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) if err != nil { - return errors.Wrap(err, "custody groups") - } - - // Compute the custody columns. - custodyColumns, err := peerdas.CustodyColumns(custodyGroups) - if err != nil { - return errors.Wrap(err, "custody columns") + return errors.Wrap(err, "peer info") } // Load the data columns sidecars. @@ -99,7 +93,7 @@ func (s *Service) reconstructDataColumns(ctx context.Context, verifiedRODataColu // Save the data columns sidecars in the database. for _, dataColumnSidecar := range dataColumnSidecars { - shouldSave := custodyColumns[dataColumnSidecar.ColumnIndex] + shouldSave := localNodeInfo.CustodyColumns[dataColumnSidecar.ColumnIndex] if !shouldSave { // We do not custody this column, so we dot not need to save it. continue @@ -176,17 +170,10 @@ func (s *Service) scheduleReconstructedDataColumnsBroadcast( // Get the custody group count. custodyGroupCount := peerdas.CustodyGroupCount() - // Compute the custody groups. - custodyGroups, err := peerdas.CustodyGroups(nodeID, custodyGroupCount) - if err != nil { - log.WithError(err).Error("Custody groups") - return - } - - // Compute the custody columns. - custodyDataColumns, err := peerdas.CustodyColumns(custodyGroups) + // Retrieve the local node info. + localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) if err != nil { - log.WithError(err).Error("Custody columns") + log.WithError(err).Error("Peer info") return } @@ -198,8 +185,8 @@ func (s *Service) scheduleReconstructedDataColumnsBroadcast( } // Compute the missing data columns (data columns we should custody but we do not have received via gossip.) - missingColumns := make(map[uint64]bool, len(custodyDataColumns)) - for column := range custodyDataColumns { + missingColumns := make(map[uint64]bool, len(localNodeInfo.CustodyColumns)) + for column := range localNodeInfo.CustodyColumns { if ok := receivedDataColumns[column]; !ok { missingColumns[column] = true } diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go index 2b816f645793..702c5aa15997 100644 --- a/beacon-chain/sync/data_columns_sampling.go +++ b/beacon-chain/sync/data_columns_sampling.go @@ -100,9 +100,11 @@ func (d *dataColumnSampler1D) Run(ctx context.Context) { // Verify if we need to run sampling or not, if not, return directly. custodyGroupCount := peerdas.CustodyGroupCount() - custodyGroups, err := peerdas.CustodyGroups(nodeID, custodyGroupCount) + + // Retrieve our local node info. + localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) if err != nil { - log.WithError(err).Error("custody groups") + log.WithError(err).Error("peer info") return } @@ -117,7 +119,7 @@ func (d *dataColumnSampler1D) Run(ctx context.Context) { // Initialize non custody groups. d.nonCustodyGroups = make(map[uint64]bool) for i := range numberOfCustodyGroups { - if !custodyGroups[i] { + if !localNodeInfo.CustodyGroups[i] { d.nonCustodyGroups[i] = true } } @@ -177,14 +179,14 @@ func (d *dataColumnSampler1D) refreshPeerInfo() { continue } - retrievedGroups, err := peerdas.CustodyGroups(nodeID, retrievedCustodyGroupCount) + // Retrieve the peer info. + peerInfo, _, err := peerdas.Info(nodeID, retrievedCustodyGroupCount) if err != nil { - log.WithError(err).WithField("peerID", pid).Error("Failed to determine peer custody groups") - continue + log.WithError(err).WithField("peerID", pid.String()).Error("Failed to determine peer info") } - d.groupsByPeer[pid] = retrievedGroups - for group := range retrievedGroups { + d.groupsByPeer[pid] = peerInfo.CustodyGroups + for group := range peerInfo.CustodyGroups { d.peersByCustodyGroup[group][pid] = true } } diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index aec563285ac1..11e78ec4be19 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -773,19 +773,13 @@ func (f *blocksFetcher) custodyColumns() (map[uint64]bool, error) { // Retrieve the number of groups we should custody. localCustodyGroupCount := peerdas.CustodyGroupCount() - // Compute the groups we should custody. - localCustodyGroups, err := peerdas.CustodyGroups(localNodeID, localCustodyGroupCount) + // Retrieve the local node info. + localNodeInfo, _, err := peerdas.Info(localNodeID, localCustodyGroupCount) if err != nil { - return nil, errors.Wrap(err, "custody groups") + return nil, errors.Wrap(err, "node info") } - // Compute the columns we should custody. - localCustodyColumns, err := peerdas.CustodyColumns(localCustodyGroups) - if err != nil { - return nil, errors.Wrap(err, "custody columns") - } - - return localCustodyColumns, nil + return localNodeInfo.CustodyColumns, nil } // missingColumnsFromRoot computes the columns corresponding to blocks in `bwbs` that diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index fe467830c318..c568abda64c6 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -397,13 +397,13 @@ func (f *blocksFetcher) custodyGroupsFromPeer(peers map[peer.ID]bool) (map[peer. // Get the custody group count of the peer. custodyGroupCount := f.p2p.CustodyGroupCountFromPeer(peer) - // Get the custody groups of the peer. - custodyGroups, err := peerdas.CustodyGroups(nodeID, custodyGroupCount) + // Retrieve the peer info. + peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) if err != nil { - return nil, errors.Wrap(err, "custody groups") + return nil, errors.Wrap(err, "peer info") } - custodyGroupsByPeer[peer] = custodyGroups + custodyGroupsByPeer[peer] = peerInfo.CustodyGroups } return custodyGroupsByPeer, nil diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index d3b79ba9f607..7fb98439b6ba 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -354,21 +354,15 @@ func (s *Service) missingColumnRequest(roBlock blocks.ROBlock, store *filesystem // Get the custody group count. custodyGroupsCount := peerdas.CustodyGroupCount() - // Compute the custody groups. - custodyGroups, err := peerdas.CustodyGroups(nodeID, custodyGroupsCount) + // Retrieve the peer info. + peerInfo, _, err := peerdas.Info(nodeID, custodyGroupsCount) if err != nil { - return nil, errors.Wrap(err, "custody groups") - } - - // Compute the custody columns. - custodyColumns, err := peerdas.CustodyColumns(custodyGroups) - if err != nil { - return nil, errors.Wrap(err, "custody columns") + return nil, errors.Wrap(err, "peer info") } // Build blob sidecars by root requests based on missing columns. req := make(p2ptypes.DataColumnSidecarsByRootReq, 0, len(commitments)) - for columnIndex := range custodyColumns { + for columnIndex := range peerInfo.CustodyColumns { isColumnAvailable := storedColumns[columnIndex] if !isColumnAvailable { req = append(req, ð.DataColumnIdentifier{ diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index 188967883f39..b573862e7172 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -315,18 +315,13 @@ func (s *Service) buildRequestsForMissingDataColumns(root [32]byte, block interf // Retrieve the number of groups we should sample from. samplingGroupSize := peerdas.CustodyGroupSamplingSize() - // Retrieve the groups we should sample from. - samplingGroups, err := peerdas.CustodyGroups(nodeID, samplingGroupSize) + // Retrieve the peer info. + peerInfo, _, err := peerdas.Info(nodeID, samplingGroupSize) if err != nil { - return nil, errors.Wrap(err, "custody groups") - } - - // Retrieve the columns we should sample from. - samplingColumns, err := peerdas.CustodyColumns(samplingGroups) - if err != nil { - return nil, errors.Wrap(err, "custody columns") + return nil, errors.Wrap(err, "peer info") } + samplingColumns := peerInfo.CustodyColumns samplingColumnCount := len(samplingColumns) // Build the request for the columns we should sample from and we don't actually store. diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_root.go b/beacon-chain/sync/rpc_blob_sidecars_by_root.go index 6e8a01564902..4d3417568ac1 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_root.go @@ -37,8 +37,9 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface blobIdents := *ref cs := s.cfg.clock.CurrentSlot() + remotePeer := stream.Conn().RemotePeer() if err := validateBlobByRootRequest(blobIdents, cs); err != nil { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) return err } @@ -75,6 +76,7 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface log.WithError(err).WithFields(logrus.Fields{ "root": fmt.Sprintf("%#x", root), "index": idx, + "peer": remotePeer.String(), }).Debugf("Peer requested blob sidecar by root not found in db") continue } diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go index 9f24a83436a9..892eeb2da0a2 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go @@ -101,20 +101,14 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i // Get the number of groups we should custody. custodyGroupCount := peerdas.CustodyGroupCount() - // Compute the groups we should custody. - custodyGroups, err := peerdas.CustodyGroups(nodeID, custodyGroupCount) + // Retrieve the peer info. + peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) if err != nil { s.writeErrorResponseToStream(responseCodeServerError, err.Error(), stream) - return errors.Wrap(err, "custody groups") - } - - // Compute the columns we should custody. - custodyColumns, err := peerdas.CustodyColumns(custodyGroups) - if err != nil { - s.writeErrorResponseToStream(responseCodeServerError, err.Error(), stream) - return errors.Wrap(err, "custody columns") + return errors.Wrap(err, "peer info") } + custodyColumns := peerInfo.CustodyColumns custodyColumnsCount := uint64(len(custodyColumns)) // Compute requested columns. diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go index 51ff4c3879b7..a39e59a5688b 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -109,21 +109,16 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int // Retrieve the number of groups we should custody. custodyGroupCount := peerdas.CustodyGroupCount() - // Compute the groups we should custody. - custodyGroups, err := peerdas.CustodyGroups(nodeID, custodyGroupCount) + // Retrieve the peer info. + peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) if err != nil { - return errors.Wrap(err, "custody groups") + s.writeErrorResponseToStream(responseCodeServerError, err.Error(), stream) + return errors.Wrap(err, "peer info") } - custodyColumns, err := peerdas.CustodyColumns(custodyGroups) + custodyColumns := peerInfo.CustodyColumns custodyColumnsCount := uint64(len(custodyColumns)) - if err != nil { - log.WithError(err).Errorf("unexpected error retrieving the node id") - s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) - return errors.Wrap(err, "custody columns") - } - var custody interface{} = "all" if custodyColumnsCount != numberOfColumns { diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 04107a039679..cf4f0709820b 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -16,6 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" @@ -589,12 +590,17 @@ func (s *Service) enoughPeersAreConnected(subnetTopic string) bool { return peersWithSubnetCount >= threshold } -func (s *Service) dataColumnSubnetIndices(currentSlot primitives.Slot) []uint64 { - if flags.Get().SubscribeToAllSubnets { - return sliceFromCount(params.BeaconConfig().DataColumnSidecarSubnetCount) +func (s *Service) dataColumnSubnetIndices(_ primitives.Slot) []uint64 { + nodeID := s.cfg.p2p.NodeID() + custodyGroupCount := peerdas.CustodyGroupSamplingSize() + + nodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) + if err != nil { + log.WithError(err).Error("Could not retrieve peer info") + return []uint64{} } - return s.retrieveActiveColumnSubnets() + return uint64MapToSortedSlice(nodeInfo.DataColumnsSubnets) } func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { @@ -609,14 +615,6 @@ func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Sl return slice.SetUint64(append(persistentSubnetIndices, aggregatorSubnetIndices...)) } -func (*Service) retrieveActiveColumnSubnets() []uint64 { - subs, ok, _ := cache.ColumnSubnetIDs.GetColumnSubnets() - if !ok { - return nil - } - return subs -} - // filters out required peers for the node to function, not // pruning peers who are in our attestation subnets. func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {