From 0ca062a8380f4b5477b89eed019626680da568b0 Mon Sep 17 00:00:00 2001 From: Gabriel Fukushima Date: Fri, 24 Jan 2025 16:06:10 +1000 Subject: [PATCH] reuse the information stored in db variable to avoid streaming creating streaming from 0 all the time. Signed-off-by: Gabriel Fukushima --- .../server/kvstore/KvStoreDatabase.java | 116 +++++++++--------- 1 file changed, 56 insertions(+), 60 deletions(-) diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java index cb95d48d054..5fa0ce5726e 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java @@ -922,10 +922,7 @@ public boolean pruneOldestBlobSidecars( final UInt64 lastSlotToPrune, final int pruneLimit, final DataArchiveWriter> archiveWriter) { - try (final Stream prunableBlobKeys = - streamBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune)) { - return pruneBlobSidecars(pruneLimit, prunableBlobKeys, archiveWriter, false); - } + return pruneBlobSidecars(pruneLimit, lastSlotToPrune , archiveWriter, false ); } @Override @@ -933,76 +930,75 @@ public boolean pruneOldestNonCanonicalBlobSidecars( final UInt64 lastSlotToPrune, final int pruneLimit, final DataArchiveWriter> archiveWriter) { - try (final Stream prunableNoncanonicalBlobKeys = - streamNonCanonicalBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune)) { - return pruneBlobSidecars(pruneLimit, prunableNoncanonicalBlobKeys, archiveWriter, true); - } + return pruneBlobSidecars(pruneLimit, lastSlotToPrune, archiveWriter, true ); + } private boolean pruneBlobSidecars( - final int pruneLimit, - final Stream prunableBlobKeys, - final DataArchiveWriter> archiveWriter, - final boolean nonCanonicalBlobSidecars) { + final int pruneLimit, + final UInt64 lastSlotToPrune, final DataArchiveWriter> archiveWriter, + final boolean nonCanonicalBlobSidecars) { int pruned = 0; - Optional earliestBlobSidecarSlot = Optional.empty(); + Optional earliestBlobSidecarSlot = getEarliestBlobSidecarSlot(); + try (final Stream prunableBlobKeys = + streamNonCanonicalBlobSidecarKeys(earliestBlobSidecarSlot.orElse(UInt64.ZERO), lastSlotToPrune)) { + // Group the BlobSidecars by slot. Potential for higher memory usage + // if it hasn't been pruned in a while + final Map> prunableMap = + prunableBlobKeys.collect(groupingBy(SlotAndBlockRootAndBlobIndex::getSlot)); + + // pruneLimit is the number of slots to prune, not the number of BlobSidecars + final List slots = prunableMap.keySet().stream().sorted().limit(pruneLimit).toList(); + try (final FinalizedUpdater updater = finalizedUpdater()) { + for (final UInt64 slot : slots) { + final List keys = prunableMap.get(slot); + + // Retrieve the BlobSidecars for archiving. + final List blobSidecars = + keys.stream() + .map( + nonCanonicalBlobSidecars + ? this::getNonCanonicalBlobSidecar + : this::getBlobSidecar) + .filter(Optional::isPresent) + .map(Optional::get) + .toList(); + + // Just warn if we failed to find all the BlobSidecars. + if (keys.size() != blobSidecars.size()) { + LOG.warn("Failed to retrieve BlobSidecars for keys: {}", keys); + } - // Group the BlobSidecars by slot. Potential for higher memory usage - // if it hasn't been pruned in a while - final Map> prunableMap = - prunableBlobKeys.collect(groupingBy(SlotAndBlockRootAndBlobIndex::getSlot)); + // Attempt to archive the BlobSidecars. + final boolean blobSidecarArchived = archiveWriter.archive(blobSidecars); + if (!blobSidecarArchived) { + LOG.error("Failed to archive and prune BlobSidecars. Stopping pruning"); + break; + } - // pruneLimit is the number of slots to prune, not the number of BlobSidecars - final List slots = prunableMap.keySet().stream().sorted().limit(pruneLimit).toList(); - try (final FinalizedUpdater updater = finalizedUpdater()) { - for (final UInt64 slot : slots) { - final List keys = prunableMap.get(slot); - - // Retrieve the BlobSidecars for archiving. - final List blobSidecars = - keys.stream() - .map( - nonCanonicalBlobSidecars - ? this::getNonCanonicalBlobSidecar - : this::getBlobSidecar) - .filter(Optional::isPresent) - .map(Optional::get) - .toList(); - - // Just warn if we failed to find all the BlobSidecars. - if (keys.size() != blobSidecars.size()) { - LOG.warn("Failed to retrieve BlobSidecars for keys: {}", keys); - } + // Remove the BlobSidecars from the database. + for (final SlotAndBlockRootAndBlobIndex key : keys) { + if (nonCanonicalBlobSidecars) { + updater.removeNonCanonicalBlobSidecar(key); + } else { + updater.removeBlobSidecar(key); + earliestBlobSidecarSlot = Optional.of(slot.plus(1)); + } + } - // Attempt to archive the BlobSidecars. - final boolean blobSidecarArchived = archiveWriter.archive(blobSidecars); - if (!blobSidecarArchived) { - LOG.error("Failed to archive and prune BlobSidecars. Stopping pruning"); - break; + ++pruned; } - // Remove the BlobSidecars from the database. - for (final SlotAndBlockRootAndBlobIndex key : keys) { - if (nonCanonicalBlobSidecars) { - updater.removeNonCanonicalBlobSidecar(key); - } else { - updater.removeBlobSidecar(key); - earliestBlobSidecarSlot = Optional.of(slot.plus(1)); - } + if (!nonCanonicalBlobSidecars) { + earliestBlobSidecarSlot.ifPresent(updater::setEarliestBlobSidecarSlot); } - - ++pruned; + updater.commit(); } - if (!nonCanonicalBlobSidecars) { - earliestBlobSidecarSlot.ifPresent(updater::setEarliestBlobSidecarSlot); - } - updater.commit(); + // `pruned` will be greater when we reach pruneLimit not on the latest BlobSidecar in a slot + return pruned >= pruneLimit; } - - // `pruned` will be greater when we reach pruneLimit not on the latest BlobSidecar in a slot - return pruned >= pruneLimit; } @MustBeClosed