From b79ac4bf561e3b6b6fff20b8827ac1c730cb4f9e Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Wed, 15 Jan 2025 15:54:00 +0400 Subject: [PATCH] address feedback --- CHANGELOG.md | 4 +- beacon-chain/db/kv/blocks.go | 54 ++++++++------- beacon-chain/db/kv/blocks_test.go | 1 + beacon-chain/db/pruner/pruner.go | 94 +++++++++++++-------------- beacon-chain/db/pruner/pruner_test.go | 36 ++++++---- beacon-chain/node/node.go | 45 +++++-------- beacon-chain/sync/backfill/service.go | 41 ++++-------- 7 files changed, 131 insertions(+), 144 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01417bd0997d..49a53f86c565 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,8 +15,8 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve - Added support to update target and max blob count to different values per hard fork config. - Log before blob filesystem cache warm-up. - New design for the attestation pool. [PR](https://github.com/prysmaticlabs/prysm/pull/14324) -- Add field param placeholder for Electra blob target and max to pass spec tests. -- Add Beacon DB pruning service to prune historical data beyond weak subjectivity checkpoint. +- Add field param placeholder for Electra blob target and max to pass spec tests. +- Add Beacon DB pruning service to prune historical data older than MIN_EPOCHS_FOR_BLOCK_REQUESTS (roughly equivalent to the weak subjectivity period) ### Changed diff --git a/beacon-chain/db/kv/blocks.go b/beacon-chain/db/kv/blocks.go index c789ff73c05d..d0d656625c29 100644 --- a/beacon-chain/db/kv/blocks.go +++ b/beacon-chain/db/kv/blocks.go @@ -249,25 +249,27 @@ func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot p ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteHistoricalDataBeforeSlot") defer span.End() - // Perform all deletions in a single transaction for atomicity - return s.db.Update(func(tx *bolt.Tx) error { - filter := filters.NewFilter().SetStartSlot(0).SetEndSlot(cutoffSlot) - keys, err := blockRootsByFilter(ctx, tx, filter) + // Collect slot/root pairs to perform deletions in a separate read only transaction. + var ( + roots [][]byte + slts []primitives.Slot + ) + err := s.db.View(func(tx *bolt.Tx) error { + var err error + roots, slts, err = blockRootsBySlotRange(tx.Bucket(blockSlotIndicesBucket), primitives.Slot(0), cutoffSlot, nil, nil, nil) if err != nil { - return errors.Wrap(err, "could not retrieve block roots by filter") - } - - // Query slots by block roots before pruning. - var slts []primitives.Slot - for _, root := range keys { - slot, err := s.slotByBlockRoot(ctx, tx, root) - if err != nil { - return errors.Wrapf(err, "could not retrieve slot from block root, slot: %d", slot) - } - slts = append(slts, slot) + return errors.Wrap(err, "could not retrieve block roots") } + fmt.Println("slots: ", slts) + return nil + }) + if err != nil { + return errors.Wrap(err, "could not retrieve block roots and slots") + } - for _, root := range keys { + // Perform all deletions in a single transaction for atomicity + return s.db.Update(func(tx *bolt.Tx) error { + for _, root := range roots { // Delete block if err = s.deleteBlock(tx, root); err != nil { return err @@ -306,7 +308,7 @@ func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot p // Delete all caches after we have deleted everything from buckets. // This is done after the buckets are deleted to avoid any issues in case of transaction rollback. - for _, root := range keys { + for _, root := range roots { // Delete block from cache s.blockCache.Del(string(root)) // Delete state summary from cache @@ -685,7 +687,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter // We retrieve block roots that match a filter criteria of slot ranges, if specified. filtersMap := f.Filters() - rootsBySlotRange, err := blockRootsBySlotRange( + rootsBySlotRange, _, err := blockRootsBySlotRange( tx.Bucket(blockSlotIndicesBucket), filtersMap[filters.StartSlot], filtersMap[filters.EndSlot], @@ -728,10 +730,10 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter func blockRootsBySlotRange( bkt *bolt.Bucket, startSlotEncoded, endSlotEncoded, startEpochEncoded, endEpochEncoded, slotStepEncoded interface{}, -) ([][]byte, error) { +) ([][]byte, []primitives.Slot, error) { // Return nothing when all slot parameters are missing if startSlotEncoded == nil && endSlotEncoded == nil && startEpochEncoded == nil && endEpochEncoded == nil { - return [][]byte{}, nil + return [][]byte{}, nil, nil } var startSlot, endSlot primitives.Slot @@ -752,11 +754,11 @@ func blockRootsBySlotRange( if startEpochOk && endEpochOk { startSlot, err = slots.EpochStart(startEpoch) if err != nil { - return nil, err + return nil, nil, err } endSlot, err = slots.EpochStart(endEpoch) if err != nil { - return nil, err + return nil, nil, err } endSlot = endSlot + params.BeaconConfig().SlotsPerEpoch - 1 } @@ -767,14 +769,15 @@ func blockRootsBySlotRange( return key != nil && bytes.Compare(key, max) <= 0 } if endSlot < startSlot { - return nil, errInvalidSlotRange + return nil, nil, errInvalidSlotRange } rootsRange := endSlot.SubSlot(startSlot).Div(step) roots := make([][]byte, 0, rootsRange) + var slts []primitives.Slot c := bkt.Cursor() for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() { + slot := bytesutil.BytesToSlotBigEndian(k) if step > 1 { - slot := bytesutil.BytesToSlotBigEndian(k) if slot.SubSlot(startSlot).Mod(step) != 0 { continue } @@ -785,8 +788,9 @@ func blockRootsBySlotRange( splitRoots = append(splitRoots, v[i:i+32]) } roots = append(roots, splitRoots...) + slts = append(slts, slot) } - return roots, nil + return roots, slts, nil } // blockRootsBySlot retrieves the block roots by slot diff --git a/beacon-chain/db/kv/blocks_test.go b/beacon-chain/db/kv/blocks_test.go index 0de31331826f..32bd03829f67 100644 --- a/beacon-chain/db/kv/blocks_test.go +++ b/beacon-chain/db/kv/blocks_test.go @@ -437,6 +437,7 @@ func TestStore_HistoricalDataBeforeSlot(t *testing.T) { require.NoError(t, err) // Delete data before slot at epoch 1 + t.Log("Deleting historical data before slot: ", slotsPerEpoch) require.NoError(t, db.DeleteHistoricalDataBeforeSlot(ctx, primitives.Slot(slotsPerEpoch))) // Verify blocks from epoch 0 are deleted diff --git a/beacon-chain/db/pruner/pruner.go b/beacon-chain/db/pruner/pruner.go index 2486477d3f54..20387efc2d38 100644 --- a/beacon-chain/db/pruner/pruner.go +++ b/beacon-chain/db/pruner/pruner.go @@ -22,7 +22,7 @@ type ServiceOption func(*Service) // The retention period is specified in epochs, and must be >= MIN_EPOCHS_FOR_BLOCK_REQUESTS. func WithRetentionPeriod(retentionEpochs primitives.Epoch) ServiceOption { return func(s *Service) { - defaultRetentionEpochs := helpers.MinEpochsForBlockRequests() - 1 + defaultRetentionEpochs := helpers.MinEpochsForBlockRequests() + 1 if retentionEpochs < defaultRetentionEpochs { log.WithField("userEpochs", retentionEpochs). WithField("minRequired", defaultRetentionEpochs). @@ -39,35 +39,27 @@ func WithSlotTicker(slotTicker slots.Ticker) ServiceOption { } } -type SyncChecker interface { - Synced() bool -} - -type BackfillChecker interface { - IsComplete() bool -} - // Service defines a service that prunes beacon chain DB based on MIN_EPOCHS_FOR_BLOCK_REQUESTS. type Service struct { - ctx context.Context - db db.Database - ps func(current primitives.Slot) primitives.Slot - prunedSlot primitives.Slot - done chan struct{} - slotTicker slots.Ticker - syncChecker SyncChecker - backfillChecker BackfillChecker + ctx context.Context + db db.Database + ps func(current primitives.Slot) primitives.Slot + prunedUpto primitives.Slot + done chan struct{} + slotTicker slots.Ticker + backfillWaiter func() error + initSyncWaiter func() error } -func New(ctx context.Context, db iface.Database, genesisTime time.Time, syncChecker SyncChecker, backfillChecker BackfillChecker, opts ...ServiceOption) (*Service, error) { +func New(ctx context.Context, db iface.Database, genesisTime uint64, initSyncWaiter, backfillWaiter func() error, opts ...ServiceOption) (*Service, error) { p := &Service{ - ctx: ctx, - db: db, - ps: pruneStartSlotFunc(helpers.MinEpochsForBlockRequests() - 1), // Default retention epochs is MIN_EPOCHS_FOR_BLOCK_REQUESTS - 1 from the current slot. - done: make(chan struct{}), - slotTicker: slots.NewSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot), - syncChecker: syncChecker, - backfillChecker: backfillChecker, + ctx: ctx, + db: db, + ps: pruneStartSlotFunc(helpers.MinEpochsForBlockRequests() + 1), // Default retention epochs is (MIN_EPOCHS_FOR_BLOCK_REQUESTS + 1) epochs from the current slot. + done: make(chan struct{}), + slotTicker: slots.NewSlotTicker(slots.StartTime(genesisTime, 0), params.BeaconConfig().SecondsPerSlot), + initSyncWaiter: initSyncWaiter, + backfillWaiter: backfillWaiter, } for _, o := range opts { @@ -79,7 +71,7 @@ func New(ctx context.Context, db iface.Database, genesisTime time.Time, syncChec func (p *Service) Start() { log.Info("Starting Beacon DB pruner service") - go p.run() + p.run() } func (p *Service) Stop() error { @@ -93,15 +85,31 @@ func (p *Service) Status() error { } func (p *Service) run() { + if p.initSyncWaiter != nil { + log.Info("Waiting for initial sync service to complete before starting pruner") + if err := p.initSyncWaiter(); err != nil { + log.WithError(err).Error("Failed to start database pruner, error waiting for initial sync completion") + return + } + } + + if p.backfillWaiter != nil { + log.Info("Waiting for backfill service to complete before starting pruner") + if err := p.backfillWaiter(); err != nil { + log.WithError(err).Error("Failed to start database pruner, error waiting for backfill completion") + return + } + } + defer p.slotTicker.Done() for { select { case <-p.ctx.Done(): - log.Debug("Stopping Beacon DB pruner service", "prunedSlot", p.prunedSlot) + log.Debug("Stopping Beacon DB pruner service", "prunedUpto", p.prunedUpto) return case <-p.done: - log.Debug("Stopping Beacon DB pruner service", "prunedSlot", p.prunedSlot) + log.Debug("Stopping Beacon DB pruner service", "prunedUpto", p.prunedUpto) return case slot := <-p.slotTicker.C(): // Prune at the middle of every epoch since we do a lot of things around epoch boundaries. @@ -109,18 +117,6 @@ func (p *Service) run() { continue } - // Skip pruning if syncing is in progress. - if !p.syncChecker.Synced() { - log.Debug("Skipping pruning as initial sync is in progress") - continue - } - - // Skip pruning if backfill is in progress. - if !p.backfillChecker.IsComplete() { - log.Debug("Skipping pruning as backfill is in progress") - continue - } - if err := p.prune(slot); err != nil { log.WithError(err).Error("Failed to prune database") } @@ -130,36 +126,36 @@ func (p *Service) run() { // prune deletes historical chain data beyond the pruneSlot. func (p *Service) prune(slot primitives.Slot) error { - // Prune everything from this slot. - pruneSlot := p.ps(slot) + // Prune everything up to this slot (inclusive). + pruneUpto := p.ps(slot) // Can't prune beyond genesis. - if pruneSlot == 0 { + if pruneUpto == 0 { return nil } // Skip if already pruned up to this slot. - if pruneSlot <= p.prunedSlot { + if pruneUpto <= p.prunedUpto { return nil } log.WithFields(logrus.Fields{ - "pruneSlot": pruneSlot, + "pruneUpto": pruneUpto, }).Debug("Pruning chain data") tt := time.Now() - if err := p.db.DeleteHistoricalDataBeforeSlot(p.ctx, pruneSlot); err != nil { - return errors.Wrapf(err, "could not delete before slot %d", pruneSlot) + if err := p.db.DeleteHistoricalDataBeforeSlot(p.ctx, pruneUpto); err != nil { + return errors.Wrapf(err, "could not delete upto slot %d", pruneUpto) } log.WithFields(logrus.Fields{ - "pruneSlot": pruneSlot, + "prunedUpto": pruneUpto, "duration": time.Since(tt), "currentSlot": slot, }).Debug("Successfully pruned chain data") // Update pruning checkpoint. - p.prunedSlot = pruneSlot + p.prunedUpto = pruneUpto return nil } diff --git a/beacon-chain/db/pruner/pruner_test.go b/beacon-chain/db/pruner/pruner_test.go index 1ddb28648e30..8988af9d359b 100644 --- a/beacon-chain/db/pruner/pruner_test.go +++ b/beacon-chain/db/pruner/pruner_test.go @@ -30,13 +30,13 @@ func TestPruner_PruningConditions(t *testing.T) { name: "Not synced", synced: false, backfillCompleted: true, - expectedLog: "Skipping pruning as initial sync is in progress", + expectedLog: "Waiting for initial sync service to complete before starting pruner", }, { name: "Backfill incomplete", synced: true, backfillCompleted: false, - expectedLog: "Skipping pruning as backfill is in progress", + expectedLog: "Waiting for backfill service to complete before starting pruner", }, } @@ -44,18 +44,30 @@ func TestPruner_PruningConditions(t *testing.T) { t.Run(tt.name, func(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) hook := logTest.NewGlobal() - t.Log(hook.Levels()) - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) beaconDB := dbtest.SetupDB(t) slotTicker := &slottest.MockTicker{Channel: make(chan primitives.Slot)} - p, err := New(ctx, beaconDB, time.Now(), &mockSyncChecker{synced: tt.synced}, &mockBackfillChecker{complete: tt.backfillCompleted}, WithSlotTicker(slotTicker)) + waitChan := make(chan struct{}) + waiter := func() error { + close(waitChan) + return nil + } + + var initSyncWaiter, backfillWaiter func() error + if !tt.synced { + initSyncWaiter = waiter + } + if !tt.backfillCompleted { + backfillWaiter = waiter + } + p, err := New(ctx, beaconDB, uint64(time.Now().Unix()), initSyncWaiter, backfillWaiter, WithSlotTicker(slotTicker)) require.NoError(t, err) - p.Start() - slotTicker.Channel <- 16 - slotTicker.Channel <- 16 + go p.Start() + <-waitChan + cancel() if tt.expectedLog != "" { require.LogsContain(t, hook, tt.expectedLog) @@ -88,9 +100,9 @@ func TestPruner_PruneSuccess(t *testing.T) { p, err := New( ctx, beaconDB, - time.Now(), - &mockSyncChecker{synced: true}, - &mockBackfillChecker{complete: true}, + uint64(time.Now().Unix()), + nil, + nil, WithSlotTicker(slotTicker), ) require.NoError(t, err) @@ -100,7 +112,7 @@ func TestPruner_PruneSuccess(t *testing.T) { } // Start pruner and trigger at middle of 3rd epoch (slot 80) - p.Start() + go p.Start() currentSlot := primitives.Slot(80) // Middle of 3rd epoch slotTicker.Channel <- currentSlot // Send the same slot again to ensure the pruning operation completes diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index fb3b9f87151f..5302938841e9 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -67,7 +67,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/runtime/debug" "github.com/prysmaticlabs/prysm/v5/runtime/prereqs" "github.com/prysmaticlabs/prysm/v5/runtime/version" - "github.com/prysmaticlabs/prysm/v5/time/slots" "github.com/sirupsen/logrus" "github.com/urfave/cli/v2" ) @@ -123,7 +122,6 @@ type BeaconNode struct { BlobStorageOptions []filesystem.BlobStorageOption verifyInitWaiter *verification.InitializerWaiter syncChecker *initialsync.SyncChecker - backfillChecker *backfill.BackfillChecker } // New creates a new node instance, sets up configuration options, and registers @@ -161,7 +159,6 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco serviceFlagOpts: &serviceFlagOpts{}, initialSyncComplete: make(chan struct{}), syncChecker: &initialsync.SyncChecker{}, - backfillChecker: &backfill.BackfillChecker{}, } for _, opt := range opts { @@ -220,13 +217,6 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco // their initialization. beacon.finalizedStateAtStartUp = nil - prunerFlag := cliCtx.Bool(flags.BeaconDBPruning.Name) - if prunerFlag { - if err = beacon.registerPrunerService(cliCtx); err != nil { - return nil, err - } - } - return beacon, nil } @@ -380,6 +370,13 @@ func registerServices(cliCtx *cli.Context, beacon *BeaconNode, synchronizer *sta } } + if cliCtx.Bool(flags.BeaconDBPruning.Name) { + log.Debugln("Registering Pruner Service") + if err := beacon.registerPrunerService(cliCtx); err != nil { + return errors.Wrap(err, "could not register pruner service") + } + } + return nil } @@ -1102,30 +1099,24 @@ func (b *BeaconNode) registerBuilderService(cliCtx *cli.Context) error { func (b *BeaconNode) registerPrunerService(cliCtx *cli.Context) error { genesisTimeUnix := params.BeaconConfig().MinGenesisTime + params.BeaconConfig().GenesisDelay - genesisTime := slots.StartTime(genesisTimeUnix, 0) + var backfillService *backfill.Service + if err := b.services.FetchService(&backfillService); err != nil { + return err + } + var opts []pruner.ServiceOption if cliCtx.IsSet(flags.PrunerRetentionEpochs.Name) { uv := cliCtx.Uint64(flags.PrunerRetentionEpochs.Name) - p, err := pruner.New( - cliCtx.Context, - b.db, - genesisTime, - b.syncChecker, - b.backfillChecker, - pruner.WithRetentionPeriod(primitives.Epoch(uv)), - ) - if err != nil { - return err - } - return b.services.RegisterService(p) + opts = append(opts, pruner.WithRetentionPeriod(primitives.Epoch(uv))) } p, err := pruner.New( cliCtx.Context, b.db, - genesisTime, - b.syncChecker, - b.backfillChecker, + genesisTimeUnix, + initSyncWaiter(cliCtx.Context, b.initialSyncComplete), + backfillService.WaitForCompletion, + opts..., ) if err != nil { return err @@ -1136,7 +1127,7 @@ func (b *BeaconNode) registerPrunerService(cliCtx *cli.Context) error { func (b *BeaconNode) RegisterBackfillService(cliCtx *cli.Context, bfs *backfill.Store) error { pa := peers.NewAssigner(b.fetchP2P().Peers(), b.forkChoicer) - b.BackfillOpts = append(b.BackfillOpts, backfill.WithBackfillChecker(b.backfillChecker)) + b.BackfillOpts = append(b.BackfillOpts) bf, err := backfill.NewService(cliCtx.Context, bfs, b.BlobStorage, b.clockWaiter, b.fetchP2P(), pa, b.BackfillOpts...) if err != nil { return errors.Wrap(err, "error initializing backfill service") diff --git a/beacon-chain/sync/backfill/service.go b/beacon-chain/sync/backfill/service.go index 1290cc247633..c2aa6ddb15ff 100644 --- a/beacon-chain/sync/backfill/service.go +++ b/beacon-chain/sync/backfill/service.go @@ -41,6 +41,7 @@ type Service struct { blobStore *filesystem.BlobStorage initSyncWaiter func() error isComplete *abool.AtomicBool + complete chan struct{} } var _ runtime.Service = (*Service)(nil) @@ -138,30 +139,6 @@ func WithMinimumSlot(s primitives.Slot) ServiceOption { } } -// BackfillChecker allows other services to check the current status of -// backfill and use that internally in their service. -type BackfillChecker struct { - Svc *Service -} - -// IsComplete returns the status of the service. -func (s *BackfillChecker) IsComplete() bool { - if s.Svc == nil { - log.Warn("Calling backfill checker with a nil service initialized") - return false - } - return s.Svc.IsComplete() -} - -// WithBackfillChecker registers the backfill service -// in the checker. -func WithBackfillChecker(checker *BackfillChecker) ServiceOption { - return func(s *Service) error { - checker.Svc = s - return nil - } -} - // NewService initializes the backfill Service. Like all implementations of the Service interface, // the service won't begin its runloop until Start() is called. func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) { @@ -175,6 +152,7 @@ func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, pa: pa, batchImporter: defaultBatchImporter, isComplete: abool.New(), + complete: make(chan struct{}), } for _, o := range opts { if err := o(s); err != nil { @@ -395,12 +373,17 @@ func newBlobVerifierFromInitializer(ini *verification.Initializer) verification. } } -// IsComplete returns whether backfill has completed -func (s *Service) IsComplete() bool { - return s.isComplete.IsSet() -} - func (s *Service) markComplete() { s.isComplete.Set() + close(s.complete) log.Info("Backfill service marked as complete") } + +func (s *Service) WaitForCompletion() error { + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case <-s.complete: + return nil + } +}