Skip to content

Commit

Permalink
Add option to toggle block performance metrics (#5199)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored Mar 18, 2022
1 parent 5a5b31b commit 91c03c1
Show file tree
Hide file tree
Showing 18 changed files with 167 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public static SyncingNodeManager create(
futureBlocks,
blockValidator,
new SystemTimeProvider(),
EVENT_LOG);
EVENT_LOG,
false);

eventChannels
.subscribe(SlotEventsChannel.class, blockManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ private void applyBlock(
block.getRoot(),
block.getSlot(),
block.getParentRoot());
final SafeFuture<BlockImportResult> result = forkChoice.onBlock(block, executionEngine);
final SafeFuture<BlockImportResult> result =
forkChoice.onBlock(block, Optional.empty(), executionEngine);
assertThat(result).isCompleted();
final BlockImportResult importResult = result.join();
assertThat(importResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,52 +15,71 @@

import static tech.pegasys.teku.infrastructure.time.TimeUtilities.secondsToMillis;

import it.unimi.dsi.fastutil.Pair;
import java.util.ArrayList;
import java.util.List;
import tech.pegasys.teku.infrastructure.logging.EventLogger;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.storage.client.RecentChainData;

public class BlockImportPerformance {
private final TimeProvider timeProvider;
private UInt64 blockArrivalTimeStamp;
private UInt64 importCompletedTimeStamp;
private UInt64 arrivalDelay;

private final List<Pair<String, UInt64>> events = new ArrayList<>();
private UInt64 timeWarningLimitTimeStamp;
private UInt64 processingTime;
private UInt64 timeAtSlotStartTimeStamp;

public BlockImportPerformance(final TimeProvider timeProvider) {
this.timeProvider = timeProvider;
}

public void arrival(final RecentChainData recentChainData, final UInt64 slot) {
blockArrivalTimeStamp = timeProvider.getTimeInMillis();
final UInt64 timeAtSlotStartTimeStamp =
secondsToMillis(recentChainData.computeTimeAtSlot(slot));

arrivalDelay = blockArrivalTimeStamp.minusMinZero(timeAtSlotStartTimeStamp);

timeAtSlotStartTimeStamp = secondsToMillis(recentChainData.computeTimeAtSlot(slot));
timeWarningLimitTimeStamp =
timeAtSlotStartTimeStamp.plus(
secondsToMillis(recentChainData.getSpec().getSecondsPerSlot(slot)).dividedBy(3));
addEvent("Received");
}

public void processed() {
importCompletedTimeStamp = timeProvider.getTimeInMillis();
processingTime = timeProvider.getTimeInMillis().minus(blockArrivalTimeStamp);
public void preStateRetrieved() {
addEvent("Pre-state retrieved");
}

public UInt64 getBlockArrivalTimeStamp() {
return blockArrivalTimeStamp;
public void postStateCreated() {
addEvent("Block processed");
}

public UInt64 getArrivalDelay() {
return arrivalDelay;
public void transactionReady() {
addEvent("Transaction prepared");
}

public boolean isSlotTimeWarningPassed() {
return importCompletedTimeStamp.isGreaterThan(timeWarningLimitTimeStamp);
public void transactionCommitted() {
addEvent("Transaction committed");
}

public void processingComplete(final EventLogger eventLogger, final SignedBeaconBlock block) {
final UInt64 importCompletedTimestamp = addEvent("Import complete");

if (importCompletedTimestamp.isGreaterThan(timeWarningLimitTimeStamp)) {
UInt64 previousEventTimestamp = timeAtSlotStartTimeStamp;
final List<String> eventTimings = new ArrayList<>();
for (Pair<String, UInt64> event : events) {
// minusMinZero because sometimes time does actually go backwards so be safe.
final UInt64 stepDuration = event.right().minusMinZero(previousEventTimestamp);
eventTimings.add(
event.left() + (eventTimings.isEmpty() ? " " : " +") + stepDuration + "ms");
previousEventTimestamp = event.right();
}
final String combinedTimings = String.join(", ", eventTimings);
eventLogger.lateBlockImport(block.getRoot(), block.getSlot(), combinedTimings);
}
}

public UInt64 getProcessingTime() {
return processingTime;
private UInt64 addEvent(final String label) {
final UInt64 timestamp = timeProvider.getTimeInMillis();
events.add(Pair.of(label, timestamp));
return timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ public BlockImporter(
}

@CheckReturnValue
public SafeFuture<BlockImportResult> importBlock(SignedBeaconBlock block) {
public SafeFuture<BlockImportResult> importBlock(final SignedBeaconBlock block) {
return importBlock(block, Optional.empty());
}

@CheckReturnValue
public SafeFuture<BlockImportResult> importBlock(
final SignedBeaconBlock block,
final Optional<BlockImportPerformance> blockImportPerformance) {
final Optional<Boolean> knownOptimistic = recentChainData.isBlockOptimistic(block.getRoot());
if (knownOptimistic.isPresent()) {
LOG.trace(
Expand All @@ -92,7 +99,7 @@ public SafeFuture<BlockImportResult> importBlock(SignedBeaconBlock block) {
}

return validateWeakSubjectivityPeriod()
.thenCompose(__ -> forkChoice.onBlock(block, executionEngine))
.thenCompose(__ -> forkChoice.onBlock(block, blockImportPerformance, executionEngine))
.thenApply(
result -> {
if (!result.isSuccessful()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class BlockManager extends Service
private final BlockValidator validator;
private final TimeProvider timeProvider;
private final EventLogger eventLogger;
private final boolean blockImportPerformanceEnabled;

private final FutureItems<SignedBeaconBlock> futureBlocks;
// in the invalidBlockRoots map we are going to store blocks whose import result is invalid
Expand All @@ -66,14 +67,16 @@ public BlockManager(
final FutureItems<SignedBeaconBlock> futureBlocks,
final BlockValidator validator,
final TimeProvider timeProvider,
final EventLogger eventLogger) {
final EventLogger eventLogger,
final boolean blockImportPerformanceEnabled) {
this.recentChainData = recentChainData;
this.blockImporter = blockImporter;
this.pendingBlocks = pendingBlocks;
this.futureBlocks = futureBlocks;
this.validator = validator;
this.timeProvider = timeProvider;
this.eventLogger = eventLogger;
this.blockImportPerformanceEnabled = blockImportPerformanceEnabled;
}

@Override
Expand All @@ -96,9 +99,15 @@ public SafeFuture<BlockImportResult> importBlock(final SignedBeaconBlock block)
public SafeFuture<InternalValidationResult> validateAndImportBlock(
final SignedBeaconBlock block) {

final BlockImportPerformance blockImportPerformance = new BlockImportPerformance(timeProvider);
final Optional<BlockImportPerformance> blockImportPerformance;

blockImportPerformance.arrival(recentChainData, block.getSlot());
if (blockImportPerformanceEnabled) {
final BlockImportPerformance performance = new BlockImportPerformance(timeProvider);
performance.arrival(recentChainData, block.getSlot());
blockImportPerformance = Optional.of(performance);
} else {
blockImportPerformance = Optional.empty();
}

if (propagateInvalidity(block).isPresent()) {
return SafeFuture.completedFuture(
Expand All @@ -110,11 +119,7 @@ public SafeFuture<InternalValidationResult> validateAndImportBlock(
result -> {
if (result.code().equals(ValidationResultCode.ACCEPT)
|| result.code().equals(ValidationResultCode.SAVE_FOR_FUTURE)) {
LOG.trace(
"Preparing to import block: {} arrived at {}",
() -> formatBlock(block.getSlot(), block.getRoot()),
blockImportPerformance::getBlockArrivalTimeStamp);
doImportBlock(block, Optional.of(blockImportPerformance))
doImportBlock(block, blockImportPerformance)
.finish(err -> LOG.error("Failed to process received block.", err));
}
});
Expand Down Expand Up @@ -159,7 +164,7 @@ private SafeFuture<BlockImportResult> doImportBlock(
.or(() -> handleKnownBlock(block))
.orElseGet(
() ->
handleBlockImport(block)
handleBlockImport(block, blockImportPerformance)
.thenPeek(__ -> lateBlockImportCheck(blockImportPerformance, block)))
.thenPeek(
result -> {
Expand Down Expand Up @@ -202,9 +207,11 @@ private Optional<SafeFuture<BlockImportResult>> handleKnownBlock(final SignedBea
SafeFuture.completedFuture(BlockImportResult.knownBlock(block, isOptimistic)));
}

private SafeFuture<BlockImportResult> handleBlockImport(final SignedBeaconBlock block) {
private SafeFuture<BlockImportResult> handleBlockImport(
final SignedBeaconBlock block,
final Optional<BlockImportPerformance> blockImportPerformance) {
return blockImporter
.importBlock(block)
.importBlock(block, blockImportPerformance)
.thenPeek(
result -> {
if (result.isSuccessful()) {
Expand Down Expand Up @@ -268,16 +275,6 @@ private void lateBlockImportCheck(
final Optional<BlockImportPerformance> maybeBlockImportPerformance,
final SignedBeaconBlock block) {
maybeBlockImportPerformance.ifPresent(
blockImportPerformance -> {
blockImportPerformance.processed();
if (blockImportPerformance.isSlotTimeWarningPassed()) {

eventLogger.lateBlockImport(
block.getRoot(),
block.getSlot(),
blockImportPerformance.getArrivalDelay(),
blockImportPerformance.getProcessingTime());
}
});
blockImportPerformance -> blockImportPerformance.processingComplete(eventLogger, block));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,17 @@ public ReexecutingExecutionPayloadBlockManager(
final BlockValidator validator,
final TimeProvider timeProvider,
final EventLogger eventLogger,
final AsyncRunner asyncRunner) {
final AsyncRunner asyncRunner,
final boolean blockImportPerformanceEnabled) {
super(
recentChainData,
blockImporter,
pendingBlocks,
futureBlocks,
validator,
timeProvider,
eventLogger);
eventLogger,
blockImportPerformanceEnabled);
this.asyncRunner = asyncRunner;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason;
import tech.pegasys.teku.spec.logic.common.util.ForkChoiceUtil;
import tech.pegasys.teku.statetransition.block.BlockImportPerformance;
import tech.pegasys.teku.storage.client.RecentChainData;
import tech.pegasys.teku.storage.store.UpdatableStore;
import tech.pegasys.teku.storage.store.UpdatableStore.StoreTransaction;
Expand Down Expand Up @@ -176,10 +177,15 @@ private SafeFuture<Boolean> processHead(Optional<UInt64> nodeSlot) {

/** Import a block to the store. */
public SafeFuture<BlockImportResult> onBlock(
final SignedBeaconBlock block, final ExecutionEngineChannel executionEngine) {
final SignedBeaconBlock block,
final Optional<BlockImportPerformance> blockImportPerformance,
final ExecutionEngineChannel executionEngine) {
return recentChainData
.retrieveStateAtSlot(new SlotAndBlockRoot(block.getSlot(), block.getParentRoot()))
.thenCompose(blockSlotState -> onBlock(block, blockSlotState, executionEngine));
.thenPeek(__ -> blockImportPerformance.ifPresent(BlockImportPerformance::preStateRetrieved))
.thenCompose(
blockSlotState ->
onBlock(block, blockSlotState, blockImportPerformance, executionEngine));
}

/**
Expand All @@ -189,6 +195,7 @@ public SafeFuture<BlockImportResult> onBlock(
private SafeFuture<BlockImportResult> onBlock(
final SignedBeaconBlock block,
final Optional<BeaconState> blockSlotState,
final Optional<BlockImportPerformance> blockImportPerformance,
final ExecutionEngineChannel executionEngine) {
if (blockSlotState.isEmpty()) {
return SafeFuture.completedFuture(BlockImportResult.FAILED_UNKNOWN_PARENT);
Expand Down Expand Up @@ -224,6 +231,7 @@ private SafeFuture<BlockImportResult> onBlock(
reportInvalidBlock(block, result);
return SafeFuture.completedFuture(result);
}
blockImportPerformance.ifPresent(BlockImportPerformance::postStateCreated);

return payloadExecutor
.getExecutionResult()
Expand All @@ -232,6 +240,7 @@ private SafeFuture<BlockImportResult> onBlock(
importBlockAndState(
block,
blockSlotState.get(),
blockImportPerformance,
forkChoiceUtil,
indexedAttestationCache,
postState,
Expand All @@ -242,6 +251,7 @@ private SafeFuture<BlockImportResult> onBlock(
private BlockImportResult importBlockAndState(
final SignedBeaconBlock block,
final BeaconState blockSlotState,
final Optional<BlockImportPerformance> blockImportPerformance,
final ForkChoiceUtil forkChoiceUtil,
final CapturingIndexedAttestationCache indexedAttestationCache,
final BeaconState postState,
Expand Down Expand Up @@ -305,8 +315,10 @@ private BlockImportResult importBlockAndState(
}
}

blockImportPerformance.ifPresent(BlockImportPerformance::transactionReady);
// Note: not using thenRun here because we want to ensure each step is on the event thread
transaction.commit().join();
blockImportPerformance.ifPresent(BlockImportPerformance::transactionCommitted);
forkChoiceStrategy.onExecutionPayloadResult(block.getRoot(), payloadResult);

final UInt64 currentEpoch = spec.computeEpochAtSlot(spec.getCurrentSlot(transaction));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -118,7 +119,8 @@ public class BlockManagerTest {
futureBlocks,
blockValidator,
timeProvider,
eventLogger);
eventLogger,
true);

private UInt64 currentSlot = GENESIS_SLOT;

Expand Down Expand Up @@ -258,7 +260,8 @@ public void onGossipedBlock_retryIfParentWasUnknownButIsNowAvailable() {
futureBlocks,
mock(BlockValidator.class),
timeProvider,
eventLogger);
eventLogger,
false);
forwardBlockImportedNotificationsTo(blockManager);
assertThat(blockManager.start()).isCompleted();

Expand All @@ -270,21 +273,21 @@ public void onGossipedBlock_retryIfParentWasUnknownButIsNowAvailable() {
localChain.chainBuilder().generateBlockAtSlot(nextNextSlot).getBlock();

final SafeFuture<BlockImportResult> blockImportResult = new SafeFuture<>();
when(blockImporter.importBlock(nextNextBlock))
when(blockImporter.importBlock(nextNextBlock, Optional.empty()))
.thenReturn(blockImportResult)
.thenReturn(new SafeFuture<>());

incrementSlot();
incrementSlot();
blockManager.importBlock(nextNextBlock);
ignoreFuture(verify(blockImporter).importBlock(nextNextBlock));
ignoreFuture(verify(blockImporter).importBlock(nextNextBlock, Optional.empty()));

// Before nextNextBlock imports, it's parent becomes available
when(localRecentChainData.containsBlock(nextNextBlock.getParentRoot())).thenReturn(true);

// So when the block import completes, it should be retried
blockImportResult.complete(BlockImportResult.FAILED_UNKNOWN_PARENT);
ignoreFuture(verify(blockImporter, times(2)).importBlock(nextNextBlock));
ignoreFuture(verify(blockImporter, times(2)).importBlock(nextNextBlock, Optional.empty()));

assertThat(pendingBlocks.contains(nextNextBlock)).isFalse();
}
Expand Down Expand Up @@ -567,7 +570,9 @@ void onValidateAndImportBlock_shouldLogSlowImport() {
.isCompletedWithValueMatching(InternalValidationResult::isAccept);
verify(eventLogger)
.lateBlockImport(
block.getRoot(), block.getSlot(), UInt64.valueOf(1_000), UInt64.valueOf(3_000));
block.getRoot(),
block.getSlot(),
"Received 1000ms, Pre-state retrieved +3000ms, Block processed +0ms, Transaction prepared +0ms, Transaction committed +0ms, Import complete +0ms");
}

@Test
Expand Down
Loading

0 comments on commit 91c03c1

Please sign in to comment.