Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
dB2510 committed Jan 15, 2025
1 parent 8f14bb0 commit b79ac4b
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 144 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Added support to update target and max blob count to different values per hard fork config.
- Log before blob filesystem cache warm-up.
- New design for the attestation pool. [PR](https://github.com/prysmaticlabs/prysm/pull/14324)
- Add field param placeholder for Electra blob target and max to pass spec tests.
- Add Beacon DB pruning service to prune historical data beyond weak subjectivity checkpoint.
- Add field param placeholder for Electra blob target and max to pass spec tests.
- Add Beacon DB pruning service to prune historical data older than MIN_EPOCHS_FOR_BLOCK_REQUESTS (roughly equivalent to the weak subjectivity period)

### Changed

Expand Down
54 changes: 29 additions & 25 deletions beacon-chain/db/kv/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,25 +249,27 @@ func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot p
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteHistoricalDataBeforeSlot")
defer span.End()

// Perform all deletions in a single transaction for atomicity
return s.db.Update(func(tx *bolt.Tx) error {
filter := filters.NewFilter().SetStartSlot(0).SetEndSlot(cutoffSlot)
keys, err := blockRootsByFilter(ctx, tx, filter)
// Collect slot/root pairs to perform deletions in a separate read only transaction.
var (
roots [][]byte
slts []primitives.Slot
)
err := s.db.View(func(tx *bolt.Tx) error {
var err error
roots, slts, err = blockRootsBySlotRange(tx.Bucket(blockSlotIndicesBucket), primitives.Slot(0), cutoffSlot, nil, nil, nil)
if err != nil {
return errors.Wrap(err, "could not retrieve block roots by filter")
}

// Query slots by block roots before pruning.
var slts []primitives.Slot
for _, root := range keys {
slot, err := s.slotByBlockRoot(ctx, tx, root)
if err != nil {
return errors.Wrapf(err, "could not retrieve slot from block root, slot: %d", slot)
}
slts = append(slts, slot)
return errors.Wrap(err, "could not retrieve block roots")
}
fmt.Println("slots: ", slts)
return nil
})
if err != nil {
return errors.Wrap(err, "could not retrieve block roots and slots")
}

for _, root := range keys {
// Perform all deletions in a single transaction for atomicity
return s.db.Update(func(tx *bolt.Tx) error {
for _, root := range roots {
// Delete block
if err = s.deleteBlock(tx, root); err != nil {
return err
Expand Down Expand Up @@ -306,7 +308,7 @@ func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot p

// Delete all caches after we have deleted everything from buckets.
// This is done after the buckets are deleted to avoid any issues in case of transaction rollback.
for _, root := range keys {
for _, root := range roots {
// Delete block from cache
s.blockCache.Del(string(root))
// Delete state summary from cache
Expand Down Expand Up @@ -685,7 +687,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter

// We retrieve block roots that match a filter criteria of slot ranges, if specified.
filtersMap := f.Filters()
rootsBySlotRange, err := blockRootsBySlotRange(
rootsBySlotRange, _, err := blockRootsBySlotRange(
tx.Bucket(blockSlotIndicesBucket),
filtersMap[filters.StartSlot],
filtersMap[filters.EndSlot],
Expand Down Expand Up @@ -728,10 +730,10 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter
func blockRootsBySlotRange(
bkt *bolt.Bucket,
startSlotEncoded, endSlotEncoded, startEpochEncoded, endEpochEncoded, slotStepEncoded interface{},
) ([][]byte, error) {
) ([][]byte, []primitives.Slot, error) {
// Return nothing when all slot parameters are missing
if startSlotEncoded == nil && endSlotEncoded == nil && startEpochEncoded == nil && endEpochEncoded == nil {
return [][]byte{}, nil
return [][]byte{}, nil, nil
}

var startSlot, endSlot primitives.Slot
Expand All @@ -752,11 +754,11 @@ func blockRootsBySlotRange(
if startEpochOk && endEpochOk {
startSlot, err = slots.EpochStart(startEpoch)
if err != nil {
return nil, err
return nil, nil, err
}
endSlot, err = slots.EpochStart(endEpoch)
if err != nil {
return nil, err
return nil, nil, err
}
endSlot = endSlot + params.BeaconConfig().SlotsPerEpoch - 1
}
Expand All @@ -767,14 +769,15 @@ func blockRootsBySlotRange(
return key != nil && bytes.Compare(key, max) <= 0
}
if endSlot < startSlot {
return nil, errInvalidSlotRange
return nil, nil, errInvalidSlotRange
}
rootsRange := endSlot.SubSlot(startSlot).Div(step)
roots := make([][]byte, 0, rootsRange)
var slts []primitives.Slot
c := bkt.Cursor()
for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() {
slot := bytesutil.BytesToSlotBigEndian(k)
if step > 1 {
slot := bytesutil.BytesToSlotBigEndian(k)
if slot.SubSlot(startSlot).Mod(step) != 0 {
continue
}
Expand All @@ -785,8 +788,9 @@ func blockRootsBySlotRange(
splitRoots = append(splitRoots, v[i:i+32])
}
roots = append(roots, splitRoots...)
slts = append(slts, slot)
}
return roots, nil
return roots, slts, nil
}

// blockRootsBySlot retrieves the block roots by slot
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/db/kv/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func TestStore_HistoricalDataBeforeSlot(t *testing.T) {
require.NoError(t, err)

// Delete data before slot at epoch 1
t.Log("Deleting historical data before slot: ", slotsPerEpoch)
require.NoError(t, db.DeleteHistoricalDataBeforeSlot(ctx, primitives.Slot(slotsPerEpoch)))

// Verify blocks from epoch 0 are deleted
Expand Down
94 changes: 45 additions & 49 deletions beacon-chain/db/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type ServiceOption func(*Service)
// The retention period is specified in epochs, and must be >= MIN_EPOCHS_FOR_BLOCK_REQUESTS.
func WithRetentionPeriod(retentionEpochs primitives.Epoch) ServiceOption {
return func(s *Service) {
defaultRetentionEpochs := helpers.MinEpochsForBlockRequests() - 1
defaultRetentionEpochs := helpers.MinEpochsForBlockRequests() + 1
if retentionEpochs < defaultRetentionEpochs {
log.WithField("userEpochs", retentionEpochs).
WithField("minRequired", defaultRetentionEpochs).
Expand All @@ -39,35 +39,27 @@ func WithSlotTicker(slotTicker slots.Ticker) ServiceOption {
}
}

type SyncChecker interface {
Synced() bool
}

type BackfillChecker interface {
IsComplete() bool
}

// Service defines a service that prunes beacon chain DB based on MIN_EPOCHS_FOR_BLOCK_REQUESTS.
type Service struct {
ctx context.Context
db db.Database
ps func(current primitives.Slot) primitives.Slot
prunedSlot primitives.Slot
done chan struct{}
slotTicker slots.Ticker
syncChecker SyncChecker
backfillChecker BackfillChecker
ctx context.Context
db db.Database
ps func(current primitives.Slot) primitives.Slot
prunedUpto primitives.Slot
done chan struct{}
slotTicker slots.Ticker
backfillWaiter func() error
initSyncWaiter func() error
}

func New(ctx context.Context, db iface.Database, genesisTime time.Time, syncChecker SyncChecker, backfillChecker BackfillChecker, opts ...ServiceOption) (*Service, error) {
func New(ctx context.Context, db iface.Database, genesisTime uint64, initSyncWaiter, backfillWaiter func() error, opts ...ServiceOption) (*Service, error) {
p := &Service{
ctx: ctx,
db: db,
ps: pruneStartSlotFunc(helpers.MinEpochsForBlockRequests() - 1), // Default retention epochs is MIN_EPOCHS_FOR_BLOCK_REQUESTS - 1 from the current slot.
done: make(chan struct{}),
slotTicker: slots.NewSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot),
syncChecker: syncChecker,
backfillChecker: backfillChecker,
ctx: ctx,
db: db,
ps: pruneStartSlotFunc(helpers.MinEpochsForBlockRequests() + 1), // Default retention epochs is (MIN_EPOCHS_FOR_BLOCK_REQUESTS + 1) epochs from the current slot.
done: make(chan struct{}),
slotTicker: slots.NewSlotTicker(slots.StartTime(genesisTime, 0), params.BeaconConfig().SecondsPerSlot),
initSyncWaiter: initSyncWaiter,
backfillWaiter: backfillWaiter,
}

for _, o := range opts {
Expand All @@ -79,7 +71,7 @@ func New(ctx context.Context, db iface.Database, genesisTime time.Time, syncChec

func (p *Service) Start() {
log.Info("Starting Beacon DB pruner service")
go p.run()
p.run()
}

func (p *Service) Stop() error {
Expand All @@ -93,34 +85,38 @@ func (p *Service) Status() error {
}

func (p *Service) run() {
if p.initSyncWaiter != nil {
log.Info("Waiting for initial sync service to complete before starting pruner")
if err := p.initSyncWaiter(); err != nil {
log.WithError(err).Error("Failed to start database pruner, error waiting for initial sync completion")
return
}
}

if p.backfillWaiter != nil {
log.Info("Waiting for backfill service to complete before starting pruner")
if err := p.backfillWaiter(); err != nil {
log.WithError(err).Error("Failed to start database pruner, error waiting for backfill completion")
return
}
}

defer p.slotTicker.Done()

for {
select {
case <-p.ctx.Done():
log.Debug("Stopping Beacon DB pruner service", "prunedSlot", p.prunedSlot)
log.Debug("Stopping Beacon DB pruner service", "prunedUpto", p.prunedUpto)
return
case <-p.done:
log.Debug("Stopping Beacon DB pruner service", "prunedSlot", p.prunedSlot)
log.Debug("Stopping Beacon DB pruner service", "prunedUpto", p.prunedUpto)
return
case slot := <-p.slotTicker.C():
// Prune at the middle of every epoch since we do a lot of things around epoch boundaries.
if slots.SinceEpochStarts(slot) != (params.BeaconConfig().SlotsPerEpoch / 2) {
continue
}

// Skip pruning if syncing is in progress.
if !p.syncChecker.Synced() {
log.Debug("Skipping pruning as initial sync is in progress")
continue
}

// Skip pruning if backfill is in progress.
if !p.backfillChecker.IsComplete() {
log.Debug("Skipping pruning as backfill is in progress")
continue
}

if err := p.prune(slot); err != nil {
log.WithError(err).Error("Failed to prune database")
}
Expand All @@ -130,36 +126,36 @@ func (p *Service) run() {

// prune deletes historical chain data beyond the pruneSlot.
func (p *Service) prune(slot primitives.Slot) error {
// Prune everything from this slot.
pruneSlot := p.ps(slot)
// Prune everything up to this slot (inclusive).
pruneUpto := p.ps(slot)

// Can't prune beyond genesis.
if pruneSlot == 0 {
if pruneUpto == 0 {
return nil
}

// Skip if already pruned up to this slot.
if pruneSlot <= p.prunedSlot {
if pruneUpto <= p.prunedUpto {
return nil
}

log.WithFields(logrus.Fields{
"pruneSlot": pruneSlot,
"pruneUpto": pruneUpto,
}).Debug("Pruning chain data")

tt := time.Now()
if err := p.db.DeleteHistoricalDataBeforeSlot(p.ctx, pruneSlot); err != nil {
return errors.Wrapf(err, "could not delete before slot %d", pruneSlot)
if err := p.db.DeleteHistoricalDataBeforeSlot(p.ctx, pruneUpto); err != nil {
return errors.Wrapf(err, "could not delete upto slot %d", pruneUpto)
}

log.WithFields(logrus.Fields{
"pruneSlot": pruneSlot,
"prunedUpto": pruneUpto,
"duration": time.Since(tt),
"currentSlot": slot,
}).Debug("Successfully pruned chain data")

// Update pruning checkpoint.
p.prunedSlot = pruneSlot
p.prunedUpto = pruneUpto

return nil
}
Expand Down
36 changes: 24 additions & 12 deletions beacon-chain/db/pruner/pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,44 @@ func TestPruner_PruningConditions(t *testing.T) {
name: "Not synced",
synced: false,
backfillCompleted: true,
expectedLog: "Skipping pruning as initial sync is in progress",
expectedLog: "Waiting for initial sync service to complete before starting pruner",
},
{
name: "Backfill incomplete",
synced: true,
backfillCompleted: false,
expectedLog: "Skipping pruning as backfill is in progress",
expectedLog: "Waiting for backfill service to complete before starting pruner",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
hook := logTest.NewGlobal()
t.Log(hook.Levels())
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
beaconDB := dbtest.SetupDB(t)

slotTicker := &slottest.MockTicker{Channel: make(chan primitives.Slot)}

p, err := New(ctx, beaconDB, time.Now(), &mockSyncChecker{synced: tt.synced}, &mockBackfillChecker{complete: tt.backfillCompleted}, WithSlotTicker(slotTicker))
waitChan := make(chan struct{})
waiter := func() error {
close(waitChan)
return nil
}

var initSyncWaiter, backfillWaiter func() error
if !tt.synced {
initSyncWaiter = waiter
}
if !tt.backfillCompleted {
backfillWaiter = waiter
}
p, err := New(ctx, beaconDB, uint64(time.Now().Unix()), initSyncWaiter, backfillWaiter, WithSlotTicker(slotTicker))
require.NoError(t, err)

p.Start()
slotTicker.Channel <- 16
slotTicker.Channel <- 16
go p.Start()
<-waitChan
cancel()

if tt.expectedLog != "" {
require.LogsContain(t, hook, tt.expectedLog)
Expand Down Expand Up @@ -88,9 +100,9 @@ func TestPruner_PruneSuccess(t *testing.T) {
p, err := New(
ctx,
beaconDB,
time.Now(),
&mockSyncChecker{synced: true},
&mockBackfillChecker{complete: true},
uint64(time.Now().Unix()),
nil,
nil,
WithSlotTicker(slotTicker),
)
require.NoError(t, err)
Expand All @@ -100,7 +112,7 @@ func TestPruner_PruneSuccess(t *testing.T) {
}

// Start pruner and trigger at middle of 3rd epoch (slot 80)
p.Start()
go p.Start()
currentSlot := primitives.Slot(80) // Middle of 3rd epoch
slotTicker.Channel <- currentSlot
// Send the same slot again to ensure the pruning operation completes
Expand Down
Loading

0 comments on commit b79ac4b

Please sign in to comment.