Skip to content

Commit

Permalink
reuse the information stored in db variable to avoid streaming creati…
Browse files Browse the repository at this point in the history
…ng streaming from 0 all the time.

Signed-off-by: Gabriel Fukushima <[email protected]>
  • Loading branch information
gfukushima committed Jan 24, 2025
1 parent af8f131 commit 0ca062a
Showing 1 changed file with 56 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -922,87 +922,83 @@ public boolean pruneOldestBlobSidecars(
final UInt64 lastSlotToPrune,
final int pruneLimit,
final DataArchiveWriter<List<BlobSidecar>> archiveWriter) {
try (final Stream<SlotAndBlockRootAndBlobIndex> prunableBlobKeys =
streamBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune)) {
return pruneBlobSidecars(pruneLimit, prunableBlobKeys, archiveWriter, false);
}
return pruneBlobSidecars(pruneLimit, lastSlotToPrune , archiveWriter, false );
}

@Override
public boolean pruneOldestNonCanonicalBlobSidecars(
final UInt64 lastSlotToPrune,
final int pruneLimit,
final DataArchiveWriter<List<BlobSidecar>> archiveWriter) {
try (final Stream<SlotAndBlockRootAndBlobIndex> prunableNoncanonicalBlobKeys =
streamNonCanonicalBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune)) {
return pruneBlobSidecars(pruneLimit, prunableNoncanonicalBlobKeys, archiveWriter, true);
}
return pruneBlobSidecars(pruneLimit, lastSlotToPrune, archiveWriter, true );

}

private boolean pruneBlobSidecars(
final int pruneLimit,
final Stream<SlotAndBlockRootAndBlobIndex> prunableBlobKeys,
final DataArchiveWriter<List<BlobSidecar>> archiveWriter,
final boolean nonCanonicalBlobSidecars) {
final int pruneLimit,
final UInt64 lastSlotToPrune, final DataArchiveWriter<List<BlobSidecar>> archiveWriter,
final boolean nonCanonicalBlobSidecars) {

int pruned = 0;
Optional<UInt64> earliestBlobSidecarSlot = Optional.empty();
Optional<UInt64> earliestBlobSidecarSlot = getEarliestBlobSidecarSlot();
try (final Stream<SlotAndBlockRootAndBlobIndex> prunableBlobKeys =
streamNonCanonicalBlobSidecarKeys(earliestBlobSidecarSlot.orElse(UInt64.ZERO), lastSlotToPrune)) {
// Group the BlobSidecars by slot. Potential for higher memory usage
// if it hasn't been pruned in a while
final Map<UInt64, List<SlotAndBlockRootAndBlobIndex>> prunableMap =
prunableBlobKeys.collect(groupingBy(SlotAndBlockRootAndBlobIndex::getSlot));

// pruneLimit is the number of slots to prune, not the number of BlobSidecars
final List<UInt64> slots = prunableMap.keySet().stream().sorted().limit(pruneLimit).toList();
try (final FinalizedUpdater updater = finalizedUpdater()) {
for (final UInt64 slot : slots) {
final List<SlotAndBlockRootAndBlobIndex> keys = prunableMap.get(slot);

// Retrieve the BlobSidecars for archiving.
final List<BlobSidecar> blobSidecars =
keys.stream()
.map(
nonCanonicalBlobSidecars
? this::getNonCanonicalBlobSidecar
: this::getBlobSidecar)
.filter(Optional::isPresent)
.map(Optional::get)
.toList();

// Just warn if we failed to find all the BlobSidecars.
if (keys.size() != blobSidecars.size()) {
LOG.warn("Failed to retrieve BlobSidecars for keys: {}", keys);
}

// Group the BlobSidecars by slot. Potential for higher memory usage
// if it hasn't been pruned in a while
final Map<UInt64, List<SlotAndBlockRootAndBlobIndex>> prunableMap =
prunableBlobKeys.collect(groupingBy(SlotAndBlockRootAndBlobIndex::getSlot));
// Attempt to archive the BlobSidecars.
final boolean blobSidecarArchived = archiveWriter.archive(blobSidecars);
if (!blobSidecarArchived) {
LOG.error("Failed to archive and prune BlobSidecars. Stopping pruning");
break;
}

// pruneLimit is the number of slots to prune, not the number of BlobSidecars
final List<UInt64> slots = prunableMap.keySet().stream().sorted().limit(pruneLimit).toList();
try (final FinalizedUpdater updater = finalizedUpdater()) {
for (final UInt64 slot : slots) {
final List<SlotAndBlockRootAndBlobIndex> keys = prunableMap.get(slot);

// Retrieve the BlobSidecars for archiving.
final List<BlobSidecar> blobSidecars =
keys.stream()
.map(
nonCanonicalBlobSidecars
? this::getNonCanonicalBlobSidecar
: this::getBlobSidecar)
.filter(Optional::isPresent)
.map(Optional::get)
.toList();

// Just warn if we failed to find all the BlobSidecars.
if (keys.size() != blobSidecars.size()) {
LOG.warn("Failed to retrieve BlobSidecars for keys: {}", keys);
}
// Remove the BlobSidecars from the database.
for (final SlotAndBlockRootAndBlobIndex key : keys) {
if (nonCanonicalBlobSidecars) {
updater.removeNonCanonicalBlobSidecar(key);
} else {
updater.removeBlobSidecar(key);
earliestBlobSidecarSlot = Optional.of(slot.plus(1));
}
}

// Attempt to archive the BlobSidecars.
final boolean blobSidecarArchived = archiveWriter.archive(blobSidecars);
if (!blobSidecarArchived) {
LOG.error("Failed to archive and prune BlobSidecars. Stopping pruning");
break;
++pruned;
}

// Remove the BlobSidecars from the database.
for (final SlotAndBlockRootAndBlobIndex key : keys) {
if (nonCanonicalBlobSidecars) {
updater.removeNonCanonicalBlobSidecar(key);
} else {
updater.removeBlobSidecar(key);
earliestBlobSidecarSlot = Optional.of(slot.plus(1));
}
if (!nonCanonicalBlobSidecars) {
earliestBlobSidecarSlot.ifPresent(updater::setEarliestBlobSidecarSlot);
}

++pruned;
updater.commit();
}

if (!nonCanonicalBlobSidecars) {
earliestBlobSidecarSlot.ifPresent(updater::setEarliestBlobSidecarSlot);
}
updater.commit();
// `pruned` will be greater when we reach pruneLimit not on the latest BlobSidecar in a slot
return pruned >= pruneLimit;
}

// `pruned` will be greater when we reach pruneLimit not on the latest BlobSidecar in a slot
return pruned >= pruneLimit;
}

@MustBeClosed
Expand Down

0 comments on commit 0ca062a

Please sign in to comment.