diff --git a/backend/cmd/blobindexer/main.go b/backend/cmd/blobindexer/main.go index 9f7d37b4a..5be4f6572 100644 --- a/backend/cmd/blobindexer/main.go +++ b/backend/cmd/blobindexer/main.go @@ -16,7 +16,7 @@ import ( func Run() { fs := flag.NewFlagSet("fs", flag.ExitOnError) - configFlag := fs.String("config", "config.yml", "path to config") + configFlag := fs.String("config", "", "path to config") versionFlag := fs.Bool("version", false, "print version and exit") _ = fs.Parse(os.Args[2:]) if *versionFlag { diff --git a/backend/go.mod b/backend/go.mod index 03f252a2b..934129130 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -14,6 +14,7 @@ require ( github.com/attestantio/go-eth2-client v0.19.10 github.com/awa/go-iap v1.26.5 github.com/aws/aws-sdk-go-v2 v1.25.0 + github.com/aws/aws-sdk-go-v2/config v1.18.45 github.com/aws/aws-sdk-go-v2/credentials v1.13.43 github.com/aws/aws-sdk-go-v2/service/s3 v1.49.0 github.com/bwmarrin/snowflake v0.3.0 @@ -99,13 +100,18 @@ require ( github.com/alessio/shellescape v1.4.1 // indirect github.com/andybalholm/brotli v1.0.6 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.45 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.15.2 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.17.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.23.2 // indirect github.com/aws/smithy-go v1.20.0 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -158,6 +164,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.3 // indirect github.com/gorilla/securecookie v1.1.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/herumi/bls-eth-go-binary v1.31.0 // indirect github.com/holiman/uint256 v1.2.4 // indirect github.com/huandu/go-clone v1.6.0 // indirect diff --git a/backend/go.sum b/backend/go.sum index 90e52e7fc..cc784e35b 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -73,8 +73,11 @@ github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBW github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0/go.mod h1:5zGj2eA85ClyedTDK+Whsu+w9yimnVIZvhvBKrDquM8= +github.com/aws/aws-sdk-go-v2/config v1.18.45 h1:Aka9bI7n8ysuwPeFdm77nfbyHCAKQ3z9ghB3S/38zes= +github.com/aws/aws-sdk-go-v2/config v1.18.45/go.mod h1:ZwDUgFnQgsazQTnWfeLWk5GjeqTQTL8lMkoE1UXzxdE= github.com/aws/aws-sdk-go-v2/credentials v1.13.43 h1:LU8vo40zBlo3R7bAvBVy/ku4nxGEyZe9N8MqAeFTzF8= github.com/aws/aws-sdk-go-v2/credentials v1.13.43/go.mod h1:zWJBz1Yf1ZtX5NGax9ZdNjhhI4rgjfgsyk6vTY1yfVg= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13 h1:PIktER+hwIG286DqXyvVENjgLTAwGgoeriLDD5C+YlQ= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13/go.mod h1:f/Ib/qYjhV2/qdsf79H3QP/eRE4AkVyEf6sk7XfZ1tg= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43/go.mod h1:auo+PiyLl0n1l8A0e8RIeR8tOzYPfZZH/JNlrJ8igTQ= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw= @@ -82,6 +85,8 @@ github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylga github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37/go.mod h1:Qe+2KtKml+FEsQF/DHmDV+xjtche/hwoF75EG4UlHW8= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0/go.mod h1:hL6BWM/d/qz113fVitZjbXR0E+RCTU1+x+1Idyn5NgE= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.45 h1:hze8YsjSh8Wl1rYa1CJpRmXP21BvOBuc76YhW0HsuQ4= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.45/go.mod h1:lD5M20o09/LCuQ2mE62Mb/iSdSlCNuj6H5ci7tW7OsE= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 h1:TkbRExyKSVHELwG9gz2+gql37jjec2R5vus9faTomwE= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0/go.mod h1:T3/9xMKudHhnj8it5EqIrhvv11tVZqWYkKcot+BFStc= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.0 h1:a33HuFlO0KsveiP90IUJh8Xr/cx9US2PqkSroaLc+o8= @@ -95,8 +100,11 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECu github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E= github.com/aws/aws-sdk-go-v2/service/s3 v1.49.0 h1:VfU15izXQjz4m9y1DkbY79iylIiuPwWtrram4cSpWEI= github.com/aws/aws-sdk-go-v2/service/s3 v1.49.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs= +github.com/aws/aws-sdk-go-v2/service/sso v1.15.2 h1:JuPGc7IkOP4AaqcZSIcyqLpFSqBWK32rM9+a1g6u73k= github.com/aws/aws-sdk-go-v2/service/sso v1.15.2/go.mod h1:gsL4keucRCgW+xA85ALBpRFfdSLH4kHOVSnLMSuBECo= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.17.3 h1:HFiiRkf1SdaAmV3/BHOFZ9DjFynPHj8G/UIO1lQS+fk= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.17.3/go.mod h1:a7bHA82fyUXOm+ZSWKU6PIoBxrjSprdLoM8xPYvzYVg= +github.com/aws/aws-sdk-go-v2/service/sts v1.23.2 h1:0BkLfgeDjfZnZ+MhB3ONb01u9pwFYTCZVhlsSSBvlbU= github.com/aws/aws-sdk-go-v2/service/sts v1.23.2/go.mod h1:Eows6e1uQEsc4ZaHANmsPRzAKcVDrcmjjWiih2+HUUQ= github.com/aws/smithy-go v1.15.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/aws/smithy-go v1.20.0 h1:6+kZsCXZwKxZS9RfISnPc4EXlHoyAkm2hPuM8X2BrrQ= @@ -433,6 +441,8 @@ github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mO github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/herumi/bls-eth-go-binary v1.31.0 h1:9eeW3EA4epCb7FIHt2luENpAW69MvKGL5jieHlBiP+w= github.com/herumi/bls-eth-go-binary v1.31.0/go.mod h1:luAnRm3OsMQeokhGzpYmc0ZKwawY7o87PUEP11Z7r7U= github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfkj3CTRcShK0KOL/w6iTiKyqqBA9a41Wnggw8= diff --git a/backend/pkg/blobindexer/blobindexer.go b/backend/pkg/blobindexer/blobindexer.go new file mode 100644 index 000000000..a690fea16 --- /dev/null +++ b/backend/pkg/blobindexer/blobindexer.go @@ -0,0 +1,470 @@ +package blobindexer + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "sync" + "time" + + "github.com/gobitfly/beaconchain/pkg/commons/db" + "github.com/gobitfly/beaconchain/pkg/commons/log" + "github.com/gobitfly/beaconchain/pkg/commons/metrics" + "github.com/gobitfly/beaconchain/pkg/commons/services" + "github.com/gobitfly/beaconchain/pkg/commons/types" + "github.com/gobitfly/beaconchain/pkg/commons/utils" + "github.com/gobitfly/beaconchain/pkg/commons/version" + "github.com/gobitfly/beaconchain/pkg/consapi" + "go.uber.org/atomic" + + "github.com/gobitfly/beaconchain/pkg/consapi/network" + constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" + + "github.com/aws/aws-sdk-go-v2/aws" + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + lru "github.com/hashicorp/golang-lru/v2" + "golang.org/x/sync/errgroup" +) + +var enableCheckingBeforePutting = false +var waitForOtherBlobIndexerDuration = time.Second * 60 + +type BlobIndexer struct { + S3Client *s3.Client + running bool + runningMu *sync.Mutex + clEndpoint string + cl consapi.Client + id string + networkID string + writtenBlobsCache *lru.Cache[string, bool] +} + +func NewBlobIndexer() (*BlobIndexer, error) { + initDB() + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( + utils.Config.BlobIndexer.S3.AccessKeyId, + utils.Config.BlobIndexer.S3.AccessKeySecret, + "", + )), + config.WithRegion("auto"), + ) + if err != nil { + return nil, err + } + s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = true + o.BaseEndpoint = aws.String(utils.Config.BlobIndexer.S3.Endpoint) + }) + + writtenBlobsCache, err := lru.New[string, bool](1000) + if err != nil { + return nil, err + } + + id := utils.GetUUID() + bi := &BlobIndexer{ + S3Client: s3Client, + runningMu: &sync.Mutex{}, + clEndpoint: "http://" + utils.Config.Indexer.Node.Host + ":" + utils.Config.Indexer.Node.Port, + cl: consapi.NewClient("http://" + utils.Config.Indexer.Node.Host + ":" + utils.Config.Indexer.Node.Port), + id: id, + writtenBlobsCache: writtenBlobsCache, + } + return bi, nil +} + +func initDB() { + if utils.Config.BlobIndexer.DisableStatusReports { + return + } + if db.WriterDb != nil && db.ReaderDb != nil { + return + } + db.WriterDb, db.ReaderDb = db.MustInitDB(&types.DatabaseConfig{ + Username: utils.Config.WriterDatabase.Username, + Password: utils.Config.WriterDatabase.Password, + Name: utils.Config.WriterDatabase.Name, + Host: utils.Config.WriterDatabase.Host, + Port: utils.Config.WriterDatabase.Port, + MaxOpenConns: utils.Config.WriterDatabase.MaxOpenConns, + MaxIdleConns: utils.Config.WriterDatabase.MaxIdleConns, + SSL: utils.Config.WriterDatabase.SSL, + }, &types.DatabaseConfig{ + Username: utils.Config.ReaderDatabase.Username, + Password: utils.Config.ReaderDatabase.Password, + Name: utils.Config.ReaderDatabase.Name, + Host: utils.Config.ReaderDatabase.Host, + Port: utils.Config.ReaderDatabase.Port, + MaxOpenConns: utils.Config.ReaderDatabase.MaxOpenConns, + MaxIdleConns: utils.Config.ReaderDatabase.MaxIdleConns, + SSL: utils.Config.ReaderDatabase.SSL, + }, "pgx", "postgres") +} + +func (bi *BlobIndexer) Start() { + bi.runningMu.Lock() + if bi.running { + bi.runningMu.Unlock() + return + } + bi.running = true + bi.runningMu.Unlock() + + log.InfoWithFields(log.Fields{"version": version.Version, "clEndpoint": bi.clEndpoint, "s3Endpoint": utils.Config.BlobIndexer.S3.Endpoint, "id": bi.id}, "starting blobindexer") + for { + err := bi.index() + if err != nil { + log.Error(err, "failed indexing blobs", 0) + } + time.Sleep(time.Second * 10) + } +} + +func (bi *BlobIndexer) index() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + headHeader := &constypes.StandardBeaconHeaderResponse{} + finalizedHeader := &constypes.StandardBeaconHeaderResponse{} + spec := &constypes.StandardSpecResponse{} + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(3) + g.Go(func() error { + var err error + spec, err = bi.cl.GetSpec() + if err != nil { + return fmt.Errorf("error bi.cl.GetSpec: %w", err) + } + return nil + }) + g.Go(func() error { + var err error + headHeader, err = bi.cl.GetBlockHeader("head") + if err != nil { + return fmt.Errorf("error bi.cl.GetBlockHeader(head): %w", err) + } + return nil + }) + g.Go(func() error { + var err error + finalizedHeader, err = bi.cl.GetBlockHeader("finalized") + if err != nil { + return fmt.Errorf("error bi.cl.GetBlockHeader(finalized): %w", err) + } + return nil + }) + err := g.Wait() + if err != nil { + return err + } + + if spec.Data.DenebForkEpoch == nil { + return fmt.Errorf("DENEB_FORK_EPOCH not set in spec") + } + if spec.Data.MinEpochsForBlobSidecarsRequests == nil { + return fmt.Errorf("MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS not set in spec") + } + + nodeDepositNetworkId := uint64(spec.Data.DepositNetworkID) + if utils.Config.Chain.ClConfig.DepositNetworkID != nodeDepositNetworkId { + return fmt.Errorf("config.DepositNetworkId != node.DepositNetworkId: %v != %v", utils.Config.Chain.ClConfig.DepositNetworkID, nodeDepositNetworkId) + } + bi.networkID = fmt.Sprintf("%d", nodeDepositNetworkId) + + status, err := bi.GetIndexerStatus() + if err != nil { + return fmt.Errorf("error bi.GetIndexerStatus: %w", err) + } + + // skip if another blobIndexer is already indexing - it is ok if multiple blobIndexers are indexing the same finalized slot, this is just best effort to avoid duplicate work + if status.CurrentBlobIndexerId != bi.id && status.LastUpdate.After(time.Now().Add(-waitForOtherBlobIndexerDuration)) { + log.InfoWithFields(log.Fields{"lastIndexedFinalizedSlot": status.LastIndexedFinalizedSlot, "currentBlobIndexerId": status.CurrentBlobIndexerId, "finalizedSlot": finalizedHeader.Data.Header.Message.Slot, "lastUpdate": status.LastUpdate}, "found other blobIndexer indexing, skipping") + return nil + } + + // check if node still has last indexed blobs (if its outside the range defined by MAX_REQUEST_BLOCKS_DENEB), otherwise assume that the node has pruned too far and we would miss blobs + minBlobSlotRange := *spec.Data.MinEpochsForBlobSidecarsRequests * uint64(spec.Data.SlotsPerEpoch) + minBlobSlot := uint64(0) + if headHeader.Data.Header.Message.Slot > minBlobSlotRange { + minBlobSlot = headHeader.Data.Header.Message.Slot - minBlobSlotRange + } + pruneMarginSlotRange := utils.Config.BlobIndexer.PruneMarginEpochs * uint64(spec.Data.SlotsPerEpoch) + if minBlobSlot > pruneMarginSlotRange { + minBlobSlot = minBlobSlot - pruneMarginSlotRange + } + if status.LastIndexedFinalizedSlot < minBlobSlot && status.LastIndexedFinalizedBlobSlot > 0 { + bs, err := bi.cl.GetBlobSidecars(status.LastIndexedFinalizedBlobSlot) + if err != nil { + return err + } + if len(bs.Data) == 0 { + return fmt.Errorf("no blobs found at lastIndexedFinalizedBlobSlot: %v, node has pruned too far?", status.LastIndexedFinalizedBlobSlot) + } + } + + lastIndexedFinalizedBlobSlot := atomic.NewUint64(status.LastIndexedFinalizedBlobSlot) + + denebForkSlot := *spec.Data.DenebForkEpoch * uint64(spec.Data.SlotsPerEpoch) + startSlot := status.LastIndexedFinalizedSlot + 1 + if status.LastIndexedFinalizedSlot <= denebForkSlot { + startSlot = denebForkSlot + } + + if headHeader.Data.Header.Message.Slot <= startSlot { + return fmt.Errorf("headHeader.Data.Header.Message.Slot <= startSlot: %v < %v (denebForkEpoch: %v, denebForkSlot: %v, slotsPerEpoch: %v)", headHeader.Data.Header.Message.Slot, startSlot, utils.Config.Chain.ClConfig.DenebForkEpoch, denebForkSlot, utils.Config.Chain.ClConfig.SlotsPerEpoch) + } + + start := time.Now() + log.InfoWithFields(log.Fields{ + "lastIndexedFinalizedSlot": status.LastIndexedFinalizedSlot, + "headSlot": headHeader.Data.Header.Message.Slot, + "finalizedSlot": finalizedHeader.Data.Header.Message.Slot, + "startSlot": startSlot, + "networkID": bi.networkID, + }, "indexing blobs") + defer func() { + log.InfoWithFields(log.Fields{ + "startSlot": startSlot, + "headSlot": headHeader.Data.Header.Message.Slot, + "finalizedSlot": finalizedHeader.Data.Header.Message.Slot, + "duration": time.Since(start), + "networkID": bi.networkID, + }, "finished indexing blobs") + }() + + batchSize := uint64(100) + for batchStart := startSlot; batchStart <= headHeader.Data.Header.Message.Slot; batchStart += batchSize { + batchStartTs := time.Now() + batchBlobsIndexed := atomic.NewInt64(0) + batchEnd := batchStart + batchSize + if batchEnd > headHeader.Data.Header.Message.Slot { + batchEnd = headHeader.Data.Header.Message.Slot + } + g, gCtx = errgroup.WithContext(context.Background()) + g.SetLimit(4) + for slot := batchStart; slot <= batchEnd; slot++ { + slot := slot + g.Go(func() error { + select { + case <-gCtx.Done(): + return gCtx.Err() + default: + } + numBlobs, err := bi.indexBlobsAtSlot(slot) + if err != nil { + return fmt.Errorf("error bi.IndexBlobsAtSlot(%v): %w", slot, err) + } + if numBlobs > 0 && slot <= finalizedHeader.Data.Header.Message.Slot && slot > lastIndexedFinalizedBlobSlot.Load() { + lastIndexedFinalizedBlobSlot.Store(slot) + } + batchBlobsIndexed.Add(int64(numBlobs)) + return nil + }) + } + err = g.Wait() + if err != nil { + return err + } + lastIndexedFinalizedSlot := uint64(0) + if batchEnd <= finalizedHeader.Data.Header.Message.Slot { + lastIndexedFinalizedSlot = batchEnd + } else { + lastIndexedFinalizedSlot = finalizedHeader.Data.Header.Message.Slot + } + newBlobIndexerStatus := BlobIndexerStatus{ + LastIndexedFinalizedSlot: lastIndexedFinalizedSlot, + LastIndexedFinalizedBlobSlot: lastIndexedFinalizedBlobSlot.Load(), + CurrentBlobIndexerId: bi.id, + LastUpdate: time.Now(), + BlobIndexerVersion: version.Version, + } + if status.LastIndexedFinalizedBlobSlot > newBlobIndexerStatus.LastIndexedFinalizedBlobSlot { + newBlobIndexerStatus.LastIndexedFinalizedBlobSlot = status.LastIndexedFinalizedBlobSlot + } + err := bi.putIndexerStatus(newBlobIndexerStatus) + if err != nil { + return fmt.Errorf("error updating indexer status at slot %v: %w", batchEnd, err) + } + slotsPerSecond := float64(batchEnd-batchStart) / time.Since(batchStartTs).Seconds() + blobsPerSecond := float64(batchBlobsIndexed.Load()) / time.Since(batchStartTs).Seconds() + estimatedTimeToHead := float64(headHeader.Data.Header.Message.Slot-batchStart) / slotsPerSecond + estimatedTimeToHeadDuration := time.Duration(estimatedTimeToHead) * time.Second + log.InfoWithFields(log.Fields{ + "lastIdxFinSlot": newBlobIndexerStatus.LastIndexedFinalizedSlot, + "lastIdxFinBlobSlot": newBlobIndexerStatus.LastIndexedFinalizedBlobSlot, + "batch": fmt.Sprintf("%d-%d", batchStart, batchEnd), + "duration": time.Since(batchStartTs), + "slotsPerSecond": fmt.Sprintf("%.3f", slotsPerSecond), + "blobsPerSecond": fmt.Sprintf("%.3f", blobsPerSecond), + "estimatedTimeToHead": estimatedTimeToHeadDuration, + "blobsIndexed": batchBlobsIndexed.Load(), + }, "updated indexer status") + if !utils.Config.BlobIndexer.DisableStatusReports { + services.ReportStatus("blobindexer", "Running", nil) + } + } + return nil +} + +func (bi *BlobIndexer) indexBlobsAtSlot(slot uint64) (int, error) { + tGetBlobSidcar := time.Now() + + blobSidecar, err := bi.cl.GetBlobSidecars(slot) + if err != nil { + httpErr := network.SpecificError(err) + if httpErr != nil && httpErr.StatusCode == http.StatusNotFound { + // no sidecar for this slot + return 0, nil + } + return 0, err + } + metrics.TaskDuration.WithLabelValues("blobindexer_get_blob_sidecars").Observe(time.Since(tGetBlobSidcar).Seconds()) + + if len(blobSidecar.Data) <= 0 { + return 0, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + defer cancel() + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(4) + for _, d := range blobSidecar.Data { + d := d + versionedBlobHash := fmt.Sprintf("%#x", utils.VersionedBlobHash(d.KzgCommitment).Bytes()) + key := fmt.Sprintf("%s/blobs/%s", bi.networkID, versionedBlobHash) + + if bi.writtenBlobsCache.Contains(key) { + continue + } + + g.Go(func() error { + select { + case <-gCtx.Done(): + return gCtx.Err() + default: + } + + if enableCheckingBeforePutting { + tS3HeadObj := time.Now() + _, err = bi.S3Client.HeadObject(gCtx, &s3.HeadObjectInput{ + Bucket: &utils.Config.BlobIndexer.S3.Bucket, + Key: &key, + }) + metrics.TaskDuration.WithLabelValues("blobindexer_check_blob").Observe(time.Since(tS3HeadObj).Seconds()) + if err != nil { + // Only put the object if it does not exist yet + var httpResponseErr *awshttp.ResponseError + if errors.As(err, &httpResponseErr) && (httpResponseErr.HTTPStatusCode() == http.StatusNotFound || httpResponseErr.HTTPStatusCode() == 403) { + return nil + } + return fmt.Errorf("error getting headObject: %s (%v/%v): %w", key, d.SignedBlockHeader.Message.Slot, d.Index, err) + } + } + + tS3PutObj := time.Now() + _, putErr := bi.S3Client.PutObject(gCtx, &s3.PutObjectInput{ + Bucket: &utils.Config.BlobIndexer.S3.Bucket, + Key: &key, + Body: bytes.NewReader(d.Blob), + Metadata: map[string]string{ + "blob_index": fmt.Sprintf("%d", d.Index), + "block_slot": fmt.Sprintf("%d", d.SignedBlockHeader.Message.Slot), + "block_proposer": fmt.Sprintf("%d", d.SignedBlockHeader.Message.ProposerIndex), + "block_state_root": d.SignedBlockHeader.Message.StateRoot.String(), + "block_parent_root": d.SignedBlockHeader.Message.ParentRoot.String(), + "block_body_root": d.SignedBlockHeader.Message.BodyRoot.String(), + "kzg_commitment": d.KzgCommitment.String(), + "kzg_proof": d.KzgProof.String(), + }, + }) + metrics.TaskDuration.WithLabelValues("blobindexer_put_blob").Observe(time.Since(tS3PutObj).Seconds()) + if putErr != nil { + return fmt.Errorf("error putting object: %s (%v/%v): %w", key, d.SignedBlockHeader.Message.Slot, d.Index, putErr) + } + bi.writtenBlobsCache.Add(key, true) + + return nil + }) + } + err = g.Wait() + if err != nil { + return len(blobSidecar.Data), fmt.Errorf("error indexing blobs at slot %v: %w", slot, err) + } + + return len(blobSidecar.Data), nil +} + +func (bi *BlobIndexer) GetIndexerStatus() (*BlobIndexerStatus, error) { + start := time.Now() + defer func() { + metrics.TaskDuration.WithLabelValues("blobindexer_get_indexer_status").Observe(time.Since(start).Seconds()) + }() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + key := fmt.Sprintf("%s/blob-indexer-status.json", bi.networkID) + obj, err := bi.S3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &utils.Config.BlobIndexer.S3.Bucket, + Key: &key, + }) + if err != nil { + // If the object that you request doesn’t exist, the error that Amazon S3 returns depends on whether you also have the s3:ListBucket permission. If you have the s3:ListBucket permission on the bucket, Amazon S3 returns an HTTP status code 404 (Not Found) error. If you don’t have the s3:ListBucket permission, Amazon S3 returns an HTTP status code 403 ("access denied") error. + var httpResponseErr *awshttp.ResponseError + if errors.As(err, &httpResponseErr) && (httpResponseErr.HTTPStatusCode() == 404 || httpResponseErr.HTTPStatusCode() == 403) { + return &BlobIndexerStatus{}, nil + } + return nil, err + } + status := &BlobIndexerStatus{} + err = json.NewDecoder(obj.Body).Decode(status) + return status, err +} + +func (bi *BlobIndexer) putIndexerStatus(status BlobIndexerStatus) error { + start := time.Now() + defer func() { + metrics.TaskDuration.WithLabelValues("blobindexer_put_indexer_status").Observe(time.Since(start).Seconds()) + }() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + key := fmt.Sprintf("%s/blob-indexer-status.json", bi.networkID) + contentType := "application/json" + body, err := json.Marshal(&status) + if err != nil { + return err + } + _, err = bi.S3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: &utils.Config.BlobIndexer.S3.Bucket, + Key: &key, + Body: bytes.NewReader(body), + ContentType: &contentType, + Metadata: map[string]string{ + "last_indexed_finalized_slot": fmt.Sprintf("%d", status.LastIndexedFinalizedSlot), + "last_indexed_finalized_blob_slot": fmt.Sprintf("%d", status.LastIndexedFinalizedBlobSlot), + "current_blob_indexer_id": status.CurrentBlobIndexerId, + "last_update": status.LastUpdate.Format(time.RFC3339), + "blob_indexer_version": status.BlobIndexerVersion, + }, + }) + if err != nil { + return err + } + return nil +} + +type BlobIndexerStatus struct { + LastIndexedFinalizedSlot uint64 `json:"last_indexed_finalized_slot"` // last finalized slot that was indexed + LastIndexedFinalizedBlobSlot uint64 `json:"last_indexed_finalized_blob_slot"` // last finalized slot that included a blob + CurrentBlobIndexerId string `json:"current_blob_indexer_id"` + LastUpdate time.Time `json:"last_update"` + BlobIndexerVersion string `json:"blob_indexer_version"` +} diff --git a/backend/pkg/blobindexer/blobs.go b/backend/pkg/blobindexer/blobs.go deleted file mode 100644 index ae99d2d1b..000000000 --- a/backend/pkg/blobindexer/blobs.go +++ /dev/null @@ -1,333 +0,0 @@ -package blobindexer - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "net/http" - "sync" - "time" - - "github.com/gobitfly/beaconchain/pkg/commons/log" - "github.com/gobitfly/beaconchain/pkg/commons/metrics" - "github.com/gobitfly/beaconchain/pkg/commons/utils" - "github.com/gobitfly/beaconchain/pkg/commons/version" - "github.com/gobitfly/beaconchain/pkg/consapi" - - "github.com/gobitfly/beaconchain/pkg/consapi/network" - constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" - - "github.com/aws/aws-sdk-go-v2/aws" - awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/coocood/freecache" - "golang.org/x/sync/errgroup" -) - -type BlobIndexer struct { - S3Client *s3.Client - running bool - runningMu *sync.Mutex - clEndpoint string - cache *freecache.Cache - cl consapi.Client -} - -func NewBlobIndexer() (*BlobIndexer, error) { - s3Resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { - return aws.Endpoint{ - PartitionID: "aws", - URL: utils.Config.BlobIndexer.S3.Endpoint, - SigningRegion: "us-east-2", - HostnameImmutable: true, - }, nil - }) - s3Client := s3.NewFromConfig(aws.Config{ - Region: "us-east-2", - Credentials: credentials.NewStaticCredentialsProvider( - utils.Config.BlobIndexer.S3.AccessKeyId, - utils.Config.BlobIndexer.S3.AccessKeySecret, - "", - ), - EndpointResolverWithOptions: s3Resolver, - }, func(o *s3.Options) { - o.UsePathStyle = true - }) - bi := &BlobIndexer{ - S3Client: s3Client, - runningMu: &sync.Mutex{}, - clEndpoint: "http://" + utils.Config.Indexer.Node.Host + ":" + utils.Config.Indexer.Node.Port, - cache: freecache.NewCache(1024 * 1024), - cl: consapi.NewClient("http://" + utils.Config.Indexer.Node.Host + ":" + utils.Config.Indexer.Node.Port), - } - return bi, nil -} - -func (bi *BlobIndexer) Start() { - bi.runningMu.Lock() - if bi.running { - bi.runningMu.Unlock() - return - } - bi.running = true - bi.runningMu.Unlock() - - log.InfoWithFields(log.Fields{"version": version.Version, "clEndpoint": bi.clEndpoint, "s3Endpoint": utils.Config.BlobIndexer.S3.Endpoint}, "starting blobindexer") - for { - err := bi.Index() - if err != nil { - log.Error(err, "failed indexing blobs", 0) - } - time.Sleep(time.Second * 10) - } -} - -func (bi *BlobIndexer) Index() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - - headHeader := &constypes.StandardBeaconHeaderResponse{} - finalizedHeader := &constypes.StandardBeaconHeaderResponse{} - spec := &constypes.StandardSpecResponse{} - - g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(3) - g.Go(func() error { - var err error - spec, err = bi.cl.GetSpec() - if err != nil { - return err - } - return nil - }) - g.Go(func() error { - var err error - headHeader, err = bi.cl.GetBlockHeader("head") - if err != nil { - return err - } - return nil - }) - g.Go(func() error { - var err error - finalizedHeader, err = bi.cl.GetBlockHeader("finalized") - if err != nil { - return err - } - return nil - }) - err := g.Wait() - if err != nil { - return err - } - - nodeDepositNetworkId := uint64(spec.Data.DepositNetworkID) - if utils.Config.Chain.ClConfig.DepositNetworkID != nodeDepositNetworkId { - return fmt.Errorf("config.DepositNetworkId != node.DepositNetworkId: %v != %v", utils.Config.Chain.ClConfig.DepositNetworkID, nodeDepositNetworkId) - } - - status, err := bi.GetIndexerStatus() - if err != nil { - return err - } - - denebForkSlot := utils.Config.Chain.ClConfig.DenebForkEpoch * utils.Config.Chain.ClConfig.SlotsPerEpoch - startSlot := status.LastIndexedFinalizedSlot + 1 - if status.LastIndexedFinalizedSlot <= denebForkSlot { - startSlot = denebForkSlot - } - - if headHeader.Data.Header.Message.Slot <= startSlot { - return fmt.Errorf("headHeader.Data.Header.Message.Slot <= startSlot: %v < %v", headHeader.Data.Header.Message.Slot, startSlot) - } - - start := time.Now() - log.InfoWithFields(log.Fields{"lastIndexedFinalizedSlot": status.LastIndexedFinalizedSlot, "headSlot": headHeader.Data.Header.Message.Slot}, "indexing blobs") - defer func() { - log.InfoWithFields(log.Fields{ - "startSlot": startSlot, - "endSlot": headHeader.Data.Header.Message.Slot, - "duration": time.Since(start), - }, "finished indexing blobs") - }() - - batchSize := uint64(100) - for batchStart := startSlot; batchStart <= headHeader.Data.Header.Message.Slot; batchStart += batchSize { - batchEnd := batchStart + batchSize - if batchEnd > headHeader.Data.Header.Message.Slot { - batchEnd = headHeader.Data.Header.Message.Slot - } - g, gCtx = errgroup.WithContext(context.Background()) - g.SetLimit(4) - for slot := batchStart; slot <= batchEnd; slot++ { - slot := slot - g.Go(func() error { - select { - case <-gCtx.Done(): - return gCtx.Err() - default: - } - err := bi.IndexBlobsAtSlot(slot) - if err != nil { - return err - } - return nil - }) - } - err = g.Wait() - if err != nil { - return err - } - if batchEnd <= finalizedHeader.Data.Header.Message.Slot { - err := bi.PutIndexerStatus(BlobIndexerStatus{ - LastIndexedFinalizedSlot: batchEnd, - }) - if err != nil { - return fmt.Errorf("error updating indexer status at slot %v: %w", batchEnd, err) - } - log.InfoWithFields(log.Fields{"lastIndexedFinalizedSlot": batchEnd}, "updated indexer status") - } - } - return nil -} - -func (bi *BlobIndexer) IndexBlobsAtSlot(slot uint64) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - - tGetBlobSidcar := time.Now() - - blobSidecar, err := bi.cl.GetBlobSidecars(slot) - if err != nil { - httpErr := network.SpecificError(err) - if httpErr != nil && httpErr.StatusCode == http.StatusNotFound { - // no sidecar for this slot - return nil - } - return err - } - metrics.TaskDuration.WithLabelValues("blobindexer_get_blob_sidecars").Observe(time.Since(tGetBlobSidcar).Seconds()) - - if len(blobSidecar.Data) <= 0 { - return nil - } - - g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(4) - for _, d := range blobSidecar.Data { - d := d - g.Go(func() error { - select { - case <-gCtx.Done(): - return gCtx.Err() - default: - } - - versionedBlobHash := fmt.Sprintf("%#x", utils.VersionedBlobHash(d.KzgCommitment).Bytes()) - key := fmt.Sprintf("blobs/%s", versionedBlobHash) - - tS3HeadObj := time.Now() - _, err = bi.S3Client.HeadObject(gCtx, &s3.HeadObjectInput{ - Bucket: &utils.Config.BlobIndexer.S3.Bucket, - Key: &key, - }) - metrics.TaskDuration.WithLabelValues("blobindexer_check_blob").Observe(time.Since(tS3HeadObj).Seconds()) - if err != nil { - // Only put the object if it does not exist yet - var httpResponseErr *awshttp.ResponseError - if errors.As(err, &httpResponseErr) && (httpResponseErr.HTTPStatusCode() == http.StatusNotFound || httpResponseErr.HTTPStatusCode() == 403) { - tS3PutObj := time.Now() - _, putErr := bi.S3Client.PutObject(gCtx, &s3.PutObjectInput{ - Bucket: &utils.Config.BlobIndexer.S3.Bucket, - Key: &key, - Body: bytes.NewReader(d.Blob), - Metadata: map[string]string{ - "slot": fmt.Sprintf("%d", d.Slot), - "index": fmt.Sprintf("%d", d.Index), - "block_root": d.BlockRoot.String(), - "block_parent_root": d.BlockParentRoot.String(), - "proposer_index": fmt.Sprintf("%d", d.ProposerIndex), - "kzg_commitment": d.KzgCommitment.String(), - "kzg_proof": d.KzgProof.String(), - }, - }) - metrics.TaskDuration.WithLabelValues("blobindexer_put_blob").Observe(time.Since(tS3PutObj).Seconds()) - if putErr != nil { - return fmt.Errorf("error putting object: %s (%v/%v): %w", key, d.Slot, d.Index, putErr) - } - return nil - } - return fmt.Errorf("error getting headObject: %s (%v/%v): %w", key, d.Slot, d.Index, err) - } - return nil - }) - } - err = g.Wait() - if err != nil { - return fmt.Errorf("error indexing blobs at slot %v: %w", slot, err) - } - - return nil -} - -func (bi *BlobIndexer) GetIndexerStatus() (*BlobIndexerStatus, error) { - start := time.Now() - defer func() { - metrics.TaskDuration.WithLabelValues("blobindexer_get_indexer_status").Observe(time.Since(start).Seconds()) - }() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - key := "blob-indexer-status.json" - obj, err := bi.S3Client.GetObject(ctx, &s3.GetObjectInput{ - Bucket: &utils.Config.BlobIndexer.S3.Bucket, - Key: &key, - }) - if err != nil { - // If the object that you request doesn’t exist, the error that Amazon S3 returns depends on whether you also have the s3:ListBucket permission. If you have the s3:ListBucket permission on the bucket, Amazon S3 returns an HTTP status code 404 (Not Found) error. If you don’t have the s3:ListBucket permission, Amazon S3 returns an HTTP status code 403 ("access denied") error. - var httpResponseErr *awshttp.ResponseError - if errors.As(err, &httpResponseErr) && (httpResponseErr.HTTPStatusCode() == 404 || httpResponseErr.HTTPStatusCode() == 403) { - return &BlobIndexerStatus{}, nil - } - return nil, err - } - status := &BlobIndexerStatus{} - err = json.NewDecoder(obj.Body).Decode(status) - return status, err -} - -func (bi *BlobIndexer) PutIndexerStatus(status BlobIndexerStatus) error { - start := time.Now() - defer func() { - metrics.TaskDuration.WithLabelValues("blobindexer_put_indexer_status").Observe(time.Since(start).Seconds()) - }() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - key := "blob-indexer-status.json" - contentType := "application/json" - body, err := json.Marshal(&status) - if err != nil { - return err - } - _, err = bi.S3Client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: &utils.Config.BlobIndexer.S3.Bucket, - Key: &key, - Body: bytes.NewReader(body), - ContentType: &contentType, - Metadata: map[string]string{ - "last_indexed_finalized_slot": fmt.Sprintf("%d", status.LastIndexedFinalizedSlot), - }, - }) - if err != nil { - return err - } - return nil -} - -type BlobIndexerStatus struct { - LastIndexedFinalizedSlot uint64 `json:"last_indexed_finalized_slot"` - // LastIndexedFinalizedRoot string `json:"last_indexed_finalized_root"` - // IndexedUnfinalized map[string]uint64 `json:"indexed_unfinalized"` -} diff --git a/backend/pkg/commons/types/config.go b/backend/pkg/commons/types/config.go index 5518cb131..9668cfd02 100644 --- a/backend/pkg/commons/types/config.go +++ b/backend/pkg/commons/types/config.go @@ -61,11 +61,13 @@ type Config struct { } `yaml:"bigtable"` BlobIndexer struct { S3 struct { - Endpoint string `yaml:"endpoint" envconfig:"BLOB_INDEXER_S3_ENDPOINT"` - Bucket string `yaml:"bucket" envconfig:"BLOB_INDEXER_S3_BUCKET"` - AccessKeyId string `yaml:"accessKeyId" envconfig:"BLOB_INDEXER_S3_ACCESS_KEY_ID"` - AccessKeySecret string `yaml:"accessKeySecret" envconfig:"BLOB_INDEXER_S3_ACCESS_KEY_SECRET"` + Endpoint string `yaml:"endpoint" envconfig:"BLOB_INDEXER_S3_ENDPOINT"` // s3 endpoint + Bucket string `yaml:"bucket" envconfig:"BLOB_INDEXER_S3_BUCKET"` // s3 bucket + AccessKeyId string `yaml:"accessKeyId" envconfig:"BLOB_INDEXER_S3_ACCESS_KEY_ID"` // s3 access key id + AccessKeySecret string `yaml:"accessKeySecret" envconfig:"BLOB_INDEXER_S3_ACCESS_KEY_SECRET"` // s3 access key secret } `yaml:"s3"` + PruneMarginEpochs uint64 `yaml:"pruneMarginEpochs" envconfig:"BLOB_INDEXER_PRUNE_MARGIN_EPOCHS"` // PruneMarginEpochs helps blobindexer to decide if connected node has pruned too far to have no holes in the data, set it to same value as lighthouse flag --blob-prune-margin-epochs + DisableStatusReports bool `yaml:"disableStatusReports" envconfig:"BLOB_INDEXER_DISABLE_STATUS_REPORTS"` // disable status reports (no connection to db needed) } `yaml:"blobIndexer"` Chain struct { Name string `yaml:"name" envconfig:"CHAIN_NAME"` diff --git a/backend/pkg/commons/utils/config.go b/backend/pkg/commons/utils/config.go index ad8529fd0..64a783b78 100644 --- a/backend/pkg/commons/utils/config.go +++ b/backend/pkg/commons/utils/config.go @@ -359,15 +359,19 @@ func setCLConfig(cfg *types.Config) error { maxForkEpoch := uint64(18446744073709551615) if jr.Data.AltairForkEpoch == nil { + log.Warnf("AltairForkEpoch not set, defaulting to maxForkEpoch") jr.Data.AltairForkEpoch = &maxForkEpoch } if jr.Data.BellatrixForkEpoch == nil { + log.Warnf("BellatrixForkEpoch not set, defaulting to maxForkEpoch") jr.Data.BellatrixForkEpoch = &maxForkEpoch } if jr.Data.CapellaForkEpoch == nil { + log.Warnf("CapellaForkEpoch not set, defaulting to maxForkEpoch") jr.Data.CapellaForkEpoch = &maxForkEpoch } if jr.Data.DenebForkEpoch == nil { + log.Warnf("DenebForkEpoch not set, defaulting to maxForkEpoch") jr.Data.DenebForkEpoch = &maxForkEpoch } diff --git a/backend/pkg/consapi/types/blobs.go b/backend/pkg/consapi/types/blobs.go index 236cefd8f..c8a94c60b 100644 --- a/backend/pkg/consapi/types/blobs.go +++ b/backend/pkg/consapi/types/blobs.go @@ -4,13 +4,20 @@ import "github.com/ethereum/go-ethereum/common/hexutil" type StandardBlobSidecarsResponse struct { Data []struct { - BlockRoot hexutil.Bytes `json:"block_root"` - Index uint64 `json:"index,string"` - Slot uint64 `json:"slot,string"` - BlockParentRoot hexutil.Bytes `json:"block_parent_root"` - ProposerIndex uint64 `json:"proposer_index,string"` - KzgCommitment hexutil.Bytes `json:"kzg_commitment"` - KzgProof hexutil.Bytes `json:"kzg_proof"` - Blob hexutil.Bytes `json:"blob"` + Index uint64 `json:"index,string"` + Blob hexutil.Bytes `json:"blob"` + KzgCommitment hexutil.Bytes `json:"kzg_commitment"` + KzgProof hexutil.Bytes `json:"kzg_proof"` + SignedBlockHeader struct { + Message struct { + Slot uint64 `json:"slot,string"` + ProposerIndex uint64 `json:"proposer_index,string"` + ParentRoot hexutil.Bytes `json:"parent_root"` + StateRoot hexutil.Bytes `json:"state_root"` + BodyRoot hexutil.Bytes `json:"body_root"` + } `json:"message"` + Signature hexutil.Bytes `json:"signature"` + } `json:"signed_block_header"` + KzgCommitmentInclusionProof []hexutil.Bytes `json:"kzg_commitment_inclusion_proof"` } } diff --git a/backend/pkg/consapi/types/spec.go b/backend/pkg/consapi/types/spec.go index 4241b15a4..181fd7b65 100644 --- a/backend/pkg/consapi/types/spec.go +++ b/backend/pkg/consapi/types/spec.go @@ -112,4 +112,9 @@ type StandardSpec struct { DomainSyncCommittee string `json:"DOMAIN_SYNC_COMMITTEE"` BlsWithdrawalPrefix string `json:"BLS_WITHDRAWAL_PREFIX"` ZeroHash [32]byte // ZeroHash is used to represent a zeroed out 32 byte array. + // DENEB + MaxRequestBlocksDeneb *uint64 `json:"MAX_REQUEST_BLOCKS_DENEB,string"` + MaxRequestBlobSidecars *uint64 `json:"MAX_REQUEST_BLOB_SIDECARS,string"` + MinEpochsForBlobSidecarsRequests *uint64 `json:"MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,string"` + BlobSidecarSubnetCount *uint64 `json:"BLOB_SIDECAR_SUBNET_COUNT,string"` }