From f2ab5cdea99df43316b043c3397e8223c71960ea Mon Sep 17 00:00:00 2001 From: Paul Harris Date: Fri, 18 Mar 2022 10:56:49 +1000 Subject: [PATCH] Add metric for future items cache sizes (#5189) --- beacon/sync/build.gradle | 1 + .../teku/beacon/sync/SyncingNodeManager.java | 4 ++- .../AttestationManagerIntegrationTest.java | 6 +++- .../statetransition/util/FutureItems.java | 29 +++++++++++++++---- .../attestation/AttestationManagerTest.java | 6 +++- .../block/BlockManagerTest.java | 3 +- ...utingExecutionPayloadBlockManagerTest.java | 3 +- .../statetransition/util/FutureItemsTest.java | 21 +++++++++++++- .../beaconchain/BeaconChainController.java | 19 +++++++++--- 9 files changed, 77 insertions(+), 15 deletions(-) diff --git a/beacon/sync/build.gradle b/beacon/sync/build.gradle index d93a316f818..65cac26975d 100644 --- a/beacon/sync/build.gradle +++ b/beacon/sync/build.gradle @@ -35,6 +35,7 @@ dependencies { integrationTestImplementation testFixtures(project(':beacon:sync')) testFixturesImplementation project(':infrastructure:serviceutils') + testFixturesImplementation project(':infrastructure:metrics') testFixturesImplementation project(':ethereum:statetransition') testFixturesImplementation project(':ethereum:weaksubjectivity') diff --git a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java index 7082e2da71d..2ff69bf0b90 100644 --- a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java +++ b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.beacon.sync; +import static org.mockito.Mockito.mock; import static tech.pegasys.teku.infrastructure.events.TestExceptionHandler.TEST_EXCEPTION_HANDLER; import static tech.pegasys.teku.infrastructure.logging.EventLogger.EVENT_LOG; @@ -30,6 +31,7 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; import tech.pegasys.teku.infrastructure.events.EventChannels; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.time.SystemTimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.Eth2P2PNetwork; @@ -119,7 +121,7 @@ public static SyncingNodeManager create( final PendingPool pendingBlocks = new PendingPoolFactory(new NoOpMetricsSystem()).createForBlocks(spec); final FutureItems futureBlocks = - FutureItems.create(SignedBeaconBlock::getSlot); + FutureItems.create(SignedBeaconBlock::getSlot, mock(SettableLabelledGauge.class), "blocks"); BlockManager blockManager = new BlockManager( recentChainData, diff --git a/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java b/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java index cd285ceb715..0f0c5bbd0e6 100644 --- a/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java +++ b/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java @@ -25,6 +25,7 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; import tech.pegasys.teku.infrastructure.bytes.Bytes4; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; @@ -78,7 +79,10 @@ class AttestationManagerIntegrationTest { private final PendingPool pendingAttestations = new PendingPoolFactory(storageSystem.getMetricsSystem()).createForAttestations(spec); private final FutureItems futureAttestations = - FutureItems.create(ValidateableAttestation::getEarliestSlotForForkChoiceProcessing); + FutureItems.create( + ValidateableAttestation::getEarliestSlotForForkChoiceProcessing, + mock(SettableLabelledGauge.class), + "attestations"); private final SignatureVerificationService signatureVerificationService = SignatureVerificationService.createSimple(); private final AttestationValidator attestationValidator = diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/FutureItems.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/FutureItems.java index a520ec38a1d..2118792e08b 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/FutureItems.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/FutureItems.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger; import tech.pegasys.teku.ethereum.events.SlotEventsChannel; import tech.pegasys.teku.infrastructure.collections.LimitedSet; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.unsigned.UInt64; /** Holds items with slots that are in the future relative to our node's current slot */ @@ -32,24 +33,39 @@ public class FutureItems implements SlotEventsChannel { static final UInt64 DEFAULT_FUTURE_SLOT_TOLERANCE = UInt64.valueOf(2); private static final int MAX_ITEMS_PER_SLOT = 500; + private final SettableLabelledGauge futureItemsCounter; private final UInt64 futureSlotTolerance; private final Function slotFunction; private final NavigableMap> queuedFutureItems = new ConcurrentSkipListMap<>(); + private final String type; private volatile UInt64 currentSlot = UInt64.ZERO; - private FutureItems(final Function slotFunction, final UInt64 futureSlotTolerance) { + private FutureItems( + final Function slotFunction, + final UInt64 futureSlotTolerance, + final SettableLabelledGauge futureItemsCounter, + final String type) { this.slotFunction = slotFunction; this.futureSlotTolerance = futureSlotTolerance; + this.futureItemsCounter = futureItemsCounter; + this.type = type; } - public static FutureItems create(final Function slotFunction) { - return new FutureItems(slotFunction, DEFAULT_FUTURE_SLOT_TOLERANCE); + public static FutureItems create( + final Function slotFunction, + final SettableLabelledGauge futureItemsCounter, + final String type) { + return new FutureItems( + slotFunction, DEFAULT_FUTURE_SLOT_TOLERANCE, futureItemsCounter, type); } public static FutureItems create( - final Function slotFunction, final UInt64 futureSlotTolerance) { - return new FutureItems(slotFunction, futureSlotTolerance); + final Function slotFunction, + final UInt64 futureSlotTolerance, + final SettableLabelledGauge futureItemsCounter, + final String type) { + return new FutureItems(slotFunction, futureSlotTolerance, futureItemsCounter, type); } @Override @@ -71,6 +87,7 @@ public void add(final T item) { LOG.trace("Save future item at slot {} for later import: {}", slot, item); queuedFutureItems.computeIfAbsent(slot, key -> createNewSet()).add(item); + futureItemsCounter.set(size(), type); } /** @@ -79,12 +96,14 @@ public void add(final T item) { * @param currentSlot The slot to be considered current * @return The set of items that are no longer in the future */ + @SuppressWarnings("rawtypes") public List prune(final UInt64 currentSlot) { final List dequeued = new ArrayList<>(); queuedFutureItems .headMap(currentSlot, true) .keySet() .forEach(key -> dequeued.addAll(queuedFutureItems.remove(key))); + futureItemsCounter.set(size(), type); return dequeued; } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerTest.java index 1c042467f92..40b1e71dbd1 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerTest.java @@ -35,6 +35,7 @@ import org.mockito.ArgumentCaptor; import tech.pegasys.teku.bls.BLSSignature; import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; @@ -69,7 +70,10 @@ class AttestationManagerTest { private final PendingPool pendingAttestations = new PendingPoolFactory(metricsSystem).createForAttestations(spec); private final FutureItems futureAttestations = - FutureItems.create(ValidateableAttestation::getEarliestSlotForForkChoiceProcessing); + FutureItems.create( + ValidateableAttestation::getEarliestSlotForForkChoiceProcessing, + mock(SettableLabelledGauge.class), + "attestations"); private final SignatureVerificationService signatureVerificationService = mock(SignatureVerificationService.class); private final ActiveValidatorCache activeValidatorCache = mock(ActiveValidatorCache.class); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java index a9583a50fa6..139a5206402 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java @@ -40,6 +40,7 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; import tech.pegasys.teku.infrastructure.logging.EventLogger; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.time.StubTimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -84,7 +85,7 @@ public class BlockManagerTest { new PendingPoolFactory(metricsSystem) .createForBlocks(spec, historicalBlockTolerance, futureBlockTolerance, maxPendingBlocks); private final FutureItems futureBlocks = - FutureItems.create(SignedBeaconBlock::getSlot); + FutureItems.create(SignedBeaconBlock::getSlot, mock(SettableLabelledGauge.class), "blocks"); private final StorageSystem localChain = InMemoryStorageSystemBuilder.buildDefault(spec); private final RecentChainData localRecentChainData = localChain.recentChainData(); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/ReexecutingExecutionPayloadBlockManagerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/ReexecutingExecutionPayloadBlockManagerTest.java index f95c63a6e8f..0427842ff62 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/ReexecutingExecutionPayloadBlockManagerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/ReexecutingExecutionPayloadBlockManagerTest.java @@ -32,6 +32,7 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; import tech.pegasys.teku.infrastructure.logging.EventLogger; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.time.StubTimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -63,7 +64,7 @@ public class ReexecutingExecutionPayloadBlockManagerTest { new PendingPoolFactory(metricsSystem) .createForBlocks(spec, historicalBlockTolerance, futureBlockTolerance, maxPendingBlocks); private final FutureItems futureBlocks = - FutureItems.create(SignedBeaconBlock::getSlot); + FutureItems.create(SignedBeaconBlock::getSlot, mock(SettableLabelledGauge.class), "blocks"); private final BlockImporter blockImporter = mock(BlockImporter.class); private final RecentChainData recentChainData = mock(RecentChainData.class); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/FutureItemsTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/FutureItemsTest.java index 1f71511fe12..4789da97a22 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/FutureItemsTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/FutureItemsTest.java @@ -14,16 +14,20 @@ package tech.pegasys.teku.statetransition.util; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.unsigned.UInt64; public class FutureItemsTest { private final UInt64 currentSlot = UInt64.valueOf(5); - private final FutureItems futureItems = FutureItems.create(Item::getSlot); + private final SettableLabelledGauge gauge = mock(SettableLabelledGauge.class); + private final FutureItems futureItems = FutureItems.create(Item::getSlot, gauge, "items"); @BeforeEach public void beforeEach() { @@ -80,6 +84,21 @@ public void prune_itemAtSlot() { assertThat(futureItems.size()).isEqualTo(0); } + @Test + public void metrics_shouldIncreaseAndDecrease() { + futureItems.onSlot(currentSlot); + final UInt64 itemSlot = currentSlot.plus(FutureItems.DEFAULT_FUTURE_SLOT_TOLERANCE); + final Item item = new Item(itemSlot); + + futureItems.add(item); + verify(gauge).set(1L, "items"); + + final UInt64 pruneSlot = item.getSlot().plus(UInt64.ONE); + assertThat(futureItems.prune(pruneSlot)).containsExactly(item); + + verify(gauge).set(0L, "items"); + } + @Test public void prune_itemPriorToSlot() { final UInt64 itemSlot = currentSlot.plus(FutureItems.DEFAULT_FUTURE_SLOT_TOLERANCE); diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 03b6848d015..5aa68afd7c4 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -15,6 +15,7 @@ import static tech.pegasys.teku.infrastructure.logging.EventLogger.EVENT_LOG; import static tech.pegasys.teku.infrastructure.logging.StatusLogger.STATUS_LOG; +import static tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory.BEACON; import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO; import com.google.common.base.Throwables; @@ -44,6 +45,7 @@ import tech.pegasys.teku.infrastructure.events.EventChannels; import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException; import tech.pegasys.teku.infrastructure.io.PortAvailability; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.infrastructure.version.VersionProvider; @@ -150,6 +152,7 @@ public class BeaconChainController extends Service implements BeaconChainControllerFacade { private static final Logger LOG = LogManager.getLogger(); + private final SettableLabelledGauge futureItemsMetric; protected static final String KEY_VALUE_STORE_SUBDIRECTORY = "kvstore"; protected volatile BeaconChainConfiguration beaconConfig; @@ -201,8 +204,6 @@ public class BeaconChainController extends Service implements BeaconChainControl private TimerService timerService; private PendingPoolFactory pendingPoolFactory; - protected BeaconChainController() {} - public BeaconChainController( final ServiceConfig serviceConfig, final BeaconChainConfiguration beaconConfig) { this.beaconConfig = beaconConfig; @@ -220,6 +221,13 @@ public BeaconChainController( this.pendingPoolFactory = new PendingPoolFactory(this.metricsSystem); this.slotEventsChannelPublisher = eventChannels.getPublisher(SlotEventsChannel.class); this.forkChoiceExecutor = new AsyncRunnerEventThread("forkchoice", asyncRunnerFactory); + this.futureItemsMetric = + SettableLabelledGauge.create( + metricsSystem, + BEACON, + "future_items_size", + "Current number of items held for future slots, labelled by type", + "type"); } @Override @@ -613,7 +621,10 @@ protected void initAttestationManager() { pendingPoolFactory.createForAttestations(spec); final FutureItems futureAttestations = FutureItems.create( - ValidateableAttestation::getEarliestSlotForForkChoiceProcessing, UInt64.valueOf(3)); + ValidateableAttestation::getEarliestSlotForForkChoiceProcessing, + UInt64.valueOf(3), + futureItemsMetric, + "attestations"); AttestationValidator attestationValidator = new AttestationValidator(spec, recentChainData, signatureVerificationService); AggregateAttestationValidator aggregateValidator = @@ -812,7 +823,7 @@ public void initBlockImporter() { public void initBlockManager() { LOG.debug("BeaconChainController.initBlockManager()"); final FutureItems futureBlocks = - FutureItems.create(SignedBeaconBlock::getSlot); + FutureItems.create(SignedBeaconBlock::getSlot, futureItemsMetric, "blocks"); BlockValidator blockValidator = new BlockValidator(spec, recentChainData); if (spec.isMilestoneSupported(SpecMilestone.BELLATRIX)) { blockManager =