Skip to content

Commit

Permalink
Add metric for future items cache sizes (#5189)
Browse files Browse the repository at this point in the history
  • Loading branch information
rolfyone authored Mar 18, 2022
1 parent 91c03c1 commit f2ab5cd
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 15 deletions.
1 change: 1 addition & 0 deletions beacon/sync/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {
integrationTestImplementation testFixtures(project(':beacon:sync'))

testFixturesImplementation project(':infrastructure:serviceutils')
testFixturesImplementation project(':infrastructure:metrics')
testFixturesImplementation project(':ethereum:statetransition')
testFixturesImplementation project(':ethereum:weaksubjectivity')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.beacon.sync;

import static org.mockito.Mockito.mock;
import static tech.pegasys.teku.infrastructure.events.TestExceptionHandler.TEST_EXCEPTION_HANDLER;
import static tech.pegasys.teku.infrastructure.logging.EventLogger.EVENT_LOG;

Expand All @@ -30,6 +31,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.events.EventChannels;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.time.SystemTimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.Eth2P2PNetwork;
Expand Down Expand Up @@ -119,7 +121,7 @@ public static SyncingNodeManager create(
final PendingPool<SignedBeaconBlock> pendingBlocks =
new PendingPoolFactory(new NoOpMetricsSystem()).createForBlocks(spec);
final FutureItems<SignedBeaconBlock> futureBlocks =
FutureItems.create(SignedBeaconBlock::getSlot);
FutureItems.create(SignedBeaconBlock::getSlot, mock(SettableLabelledGauge.class), "blocks");
BlockManager blockManager =
new BlockManager(
recentChainData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.bytes.Bytes4;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
Expand Down Expand Up @@ -78,7 +79,10 @@ class AttestationManagerIntegrationTest {
private final PendingPool<ValidateableAttestation> pendingAttestations =
new PendingPoolFactory(storageSystem.getMetricsSystem()).createForAttestations(spec);
private final FutureItems<ValidateableAttestation> futureAttestations =
FutureItems.create(ValidateableAttestation::getEarliestSlotForForkChoiceProcessing);
FutureItems.create(
ValidateableAttestation::getEarliestSlotForForkChoiceProcessing,
mock(SettableLabelledGauge.class),
"attestations");
private final SignatureVerificationService signatureVerificationService =
SignatureVerificationService.createSimple();
private final AttestationValidator attestationValidator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.ethereum.events.SlotEventsChannel;
import tech.pegasys.teku.infrastructure.collections.LimitedSet;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

/** Holds items with slots that are in the future relative to our node's current slot */
Expand All @@ -32,24 +33,39 @@ public class FutureItems<T> implements SlotEventsChannel {
static final UInt64 DEFAULT_FUTURE_SLOT_TOLERANCE = UInt64.valueOf(2);
private static final int MAX_ITEMS_PER_SLOT = 500;

private final SettableLabelledGauge futureItemsCounter;
private final UInt64 futureSlotTolerance;
private final Function<T, UInt64> slotFunction;

private final NavigableMap<UInt64, Set<T>> queuedFutureItems = new ConcurrentSkipListMap<>();
private final String type;
private volatile UInt64 currentSlot = UInt64.ZERO;

private FutureItems(final Function<T, UInt64> slotFunction, final UInt64 futureSlotTolerance) {
private FutureItems(
final Function<T, UInt64> slotFunction,
final UInt64 futureSlotTolerance,
final SettableLabelledGauge futureItemsCounter,
final String type) {
this.slotFunction = slotFunction;
this.futureSlotTolerance = futureSlotTolerance;
this.futureItemsCounter = futureItemsCounter;
this.type = type;
}

public static <T> FutureItems<T> create(final Function<T, UInt64> slotFunction) {
return new FutureItems<T>(slotFunction, DEFAULT_FUTURE_SLOT_TOLERANCE);
public static <T> FutureItems<T> create(
final Function<T, UInt64> slotFunction,
final SettableLabelledGauge futureItemsCounter,
final String type) {
return new FutureItems<T>(
slotFunction, DEFAULT_FUTURE_SLOT_TOLERANCE, futureItemsCounter, type);
}

public static <T> FutureItems<T> create(
final Function<T, UInt64> slotFunction, final UInt64 futureSlotTolerance) {
return new FutureItems<T>(slotFunction, futureSlotTolerance);
final Function<T, UInt64> slotFunction,
final UInt64 futureSlotTolerance,
final SettableLabelledGauge futureItemsCounter,
final String type) {
return new FutureItems<T>(slotFunction, futureSlotTolerance, futureItemsCounter, type);
}

@Override
Expand All @@ -71,6 +87,7 @@ public void add(final T item) {

LOG.trace("Save future item at slot {} for later import: {}", slot, item);
queuedFutureItems.computeIfAbsent(slot, key -> createNewSet()).add(item);
futureItemsCounter.set(size(), type);
}

/**
Expand All @@ -79,12 +96,14 @@ public void add(final T item) {
* @param currentSlot The slot to be considered current
* @return The set of items that are no longer in the future
*/
@SuppressWarnings("rawtypes")
public List<T> prune(final UInt64 currentSlot) {
final List<T> dequeued = new ArrayList<>();
queuedFutureItems
.headMap(currentSlot, true)
.keySet()
.forEach(key -> dequeued.addAll(queuedFutureItems.remove(key)));
futureItemsCounter.set(size(), type);
return dequeued;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.mockito.ArgumentCaptor;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
Expand Down Expand Up @@ -69,7 +70,10 @@ class AttestationManagerTest {
private final PendingPool<ValidateableAttestation> pendingAttestations =
new PendingPoolFactory(metricsSystem).createForAttestations(spec);
private final FutureItems<ValidateableAttestation> futureAttestations =
FutureItems.create(ValidateableAttestation::getEarliestSlotForForkChoiceProcessing);
FutureItems.create(
ValidateableAttestation::getEarliestSlotForForkChoiceProcessing,
mock(SettableLabelledGauge.class),
"attestations");
private final SignatureVerificationService signatureVerificationService =
mock(SignatureVerificationService.class);
private final ActiveValidatorCache activeValidatorCache = mock(ActiveValidatorCache.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.logging.EventLogger;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.time.StubTimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand Down Expand Up @@ -84,7 +85,7 @@ public class BlockManagerTest {
new PendingPoolFactory(metricsSystem)
.createForBlocks(spec, historicalBlockTolerance, futureBlockTolerance, maxPendingBlocks);
private final FutureItems<SignedBeaconBlock> futureBlocks =
FutureItems.create(SignedBeaconBlock::getSlot);
FutureItems.create(SignedBeaconBlock::getSlot, mock(SettableLabelledGauge.class), "blocks");

private final StorageSystem localChain = InMemoryStorageSystemBuilder.buildDefault(spec);
private final RecentChainData localRecentChainData = localChain.recentChainData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.infrastructure.logging.EventLogger;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.time.StubTimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand Down Expand Up @@ -63,7 +64,7 @@ public class ReexecutingExecutionPayloadBlockManagerTest {
new PendingPoolFactory(metricsSystem)
.createForBlocks(spec, historicalBlockTolerance, futureBlockTolerance, maxPendingBlocks);
private final FutureItems<SignedBeaconBlock> futureBlocks =
FutureItems.create(SignedBeaconBlock::getSlot);
FutureItems.create(SignedBeaconBlock::getSlot, mock(SettableLabelledGauge.class), "blocks");

private final BlockImporter blockImporter = mock(BlockImporter.class);
private final RecentChainData recentChainData = mock(RecentChainData.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@
package tech.pegasys.teku.statetransition.util;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public class FutureItemsTest {

private final UInt64 currentSlot = UInt64.valueOf(5);
private final FutureItems<Item> futureItems = FutureItems.create(Item::getSlot);
private final SettableLabelledGauge gauge = mock(SettableLabelledGauge.class);
private final FutureItems<Item> futureItems = FutureItems.create(Item::getSlot, gauge, "items");

@BeforeEach
public void beforeEach() {
Expand Down Expand Up @@ -80,6 +84,21 @@ public void prune_itemAtSlot() {
assertThat(futureItems.size()).isEqualTo(0);
}

@Test
public void metrics_shouldIncreaseAndDecrease() {
futureItems.onSlot(currentSlot);
final UInt64 itemSlot = currentSlot.plus(FutureItems.DEFAULT_FUTURE_SLOT_TOLERANCE);
final Item item = new Item(itemSlot);

futureItems.add(item);
verify(gauge).set(1L, "items");

final UInt64 pruneSlot = item.getSlot().plus(UInt64.ONE);
assertThat(futureItems.prune(pruneSlot)).containsExactly(item);

verify(gauge).set(0L, "items");
}

@Test
public void prune_itemPriorToSlot() {
final UInt64 itemSlot = currentSlot.plus(FutureItems.DEFAULT_FUTURE_SLOT_TOLERANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static tech.pegasys.teku.infrastructure.logging.EventLogger.EVENT_LOG;
import static tech.pegasys.teku.infrastructure.logging.StatusLogger.STATUS_LOG;
import static tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory.BEACON;
import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO;

import com.google.common.base.Throwables;
Expand Down Expand Up @@ -44,6 +45,7 @@
import tech.pegasys.teku.infrastructure.events.EventChannels;
import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException;
import tech.pegasys.teku.infrastructure.io.PortAvailability;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.infrastructure.version.VersionProvider;
Expand Down Expand Up @@ -150,6 +152,7 @@
public class BeaconChainController extends Service implements BeaconChainControllerFacade {
private static final Logger LOG = LogManager.getLogger();

private final SettableLabelledGauge futureItemsMetric;
protected static final String KEY_VALUE_STORE_SUBDIRECTORY = "kvstore";

protected volatile BeaconChainConfiguration beaconConfig;
Expand Down Expand Up @@ -201,8 +204,6 @@ public class BeaconChainController extends Service implements BeaconChainControl
private TimerService timerService;
private PendingPoolFactory pendingPoolFactory;

protected BeaconChainController() {}

public BeaconChainController(
final ServiceConfig serviceConfig, final BeaconChainConfiguration beaconConfig) {
this.beaconConfig = beaconConfig;
Expand All @@ -220,6 +221,13 @@ public BeaconChainController(
this.pendingPoolFactory = new PendingPoolFactory(this.metricsSystem);
this.slotEventsChannelPublisher = eventChannels.getPublisher(SlotEventsChannel.class);
this.forkChoiceExecutor = new AsyncRunnerEventThread("forkchoice", asyncRunnerFactory);
this.futureItemsMetric =
SettableLabelledGauge.create(
metricsSystem,
BEACON,
"future_items_size",
"Current number of items held for future slots, labelled by type",
"type");
}

@Override
Expand Down Expand Up @@ -613,7 +621,10 @@ protected void initAttestationManager() {
pendingPoolFactory.createForAttestations(spec);
final FutureItems<ValidateableAttestation> futureAttestations =
FutureItems.create(
ValidateableAttestation::getEarliestSlotForForkChoiceProcessing, UInt64.valueOf(3));
ValidateableAttestation::getEarliestSlotForForkChoiceProcessing,
UInt64.valueOf(3),
futureItemsMetric,
"attestations");
AttestationValidator attestationValidator =
new AttestationValidator(spec, recentChainData, signatureVerificationService);
AggregateAttestationValidator aggregateValidator =
Expand Down Expand Up @@ -812,7 +823,7 @@ public void initBlockImporter() {
public void initBlockManager() {
LOG.debug("BeaconChainController.initBlockManager()");
final FutureItems<SignedBeaconBlock> futureBlocks =
FutureItems.create(SignedBeaconBlock::getSlot);
FutureItems.create(SignedBeaconBlock::getSlot, futureItemsMetric, "blocks");
BlockValidator blockValidator = new BlockValidator(spec, recentChainData);
if (spec.isMilestoneSupported(SpecMilestone.BELLATRIX)) {
blockManager =
Expand Down

0 comments on commit f2ab5cd

Please sign in to comment.