Skip to content

Commit

Permalink
Fix payload_attributes streaming event notification (#7861)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored Jan 8, 2024
1 parent b00281c commit b2756d2
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import tech.pegasys.teku.api.response.v1.EventType;
import tech.pegasys.teku.beacon.sync.events.SyncState;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.events.EventChannels;
import tech.pegasys.teku.infrastructure.json.JsonUtil;
import tech.pegasys.teku.infrastructure.restapi.endpoints.ListQueryParameterUtils;
Expand All @@ -49,7 +50,9 @@
import tech.pegasys.teku.spec.datastructures.operations.SignedVoluntaryExit;
import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
import tech.pegasys.teku.statetransition.block.NewBlockBuildingSubscriber.NewBlockBuildingNotification;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceUpdatedResult;
import tech.pegasys.teku.spec.executionlayer.PayloadBuildingAttributes;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceUpdatedResultSubscriber.ForkChoiceUpdatedResultNotification;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
import tech.pegasys.teku.storage.api.ChainHeadChannel;
import tech.pegasys.teku.storage.api.FinalizedCheckpointChannel;
Expand Down Expand Up @@ -95,7 +98,7 @@ public EventSubscriptionManager(
nodeDataProvider.subscribeToNewVoluntaryExits(this::onNewVoluntaryExit);
nodeDataProvider.subscribeToSyncCommitteeContributions(this::onSyncCommitteeContribution);
nodeDataProvider.subscribeToNewBlsToExecutionChanges(this::onNewBlsToExecutionChange);
nodeDataProvider.subscribeToNewBlockBuilding(this::onNewPayloadAttributes);
nodeDataProvider.subscribeToForkChoiceUpdatedResult(this::onForkChoiceUpdatedResult);
}

public void registerClient(final SseClient sseClient) {
Expand Down Expand Up @@ -220,13 +223,25 @@ protected void onNewProposerSlashing(
}
}

protected void onNewPayloadAttributes(
final NewBlockBuildingNotification newBlockBuildingNotification) {
final SpecMilestone milestone =
spec.atSlot(newBlockBuildingNotification.payloadAttributes().getProposalSlot())
.getMilestone();
protected void onForkChoiceUpdatedResult(
final ForkChoiceUpdatedResultNotification forkChoiceUpdatedResultNotification) {
final Optional<PayloadBuildingAttributes> maybePayloadAttributes =
forkChoiceUpdatedResultNotification.payloadAttributes();
// no payload attributes
if (maybePayloadAttributes.isEmpty()) {
return;
}
final SafeFuture<Optional<ForkChoiceUpdatedResult>> forkChoiceUpdatedResult =
forkChoiceUpdatedResultNotification.forkChoiceUpdatedResult();
// no fCu has been sent
if (forkChoiceUpdatedResult.isCompletedNormally() && forkChoiceUpdatedResult.join().isEmpty()) {
return;
}
final PayloadBuildingAttributes payloadAttributes = maybePayloadAttributes.get();
final SpecMilestone milestone = spec.atSlot(payloadAttributes.getProposalSlot()).getMilestone();
final PayloadAttributesEvent payloadAttributesEvent =
PayloadAttributesEvent.create(milestone, newBlockBuildingNotification);
PayloadAttributesEvent.create(
milestone, payloadAttributes, forkChoiceUpdatedResultNotification.forkChoiceState());
notifySubscribersOfEvent(EventType.payload_attributes, payloadAttributesEvent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.execution.versions.capella.Withdrawal;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceState;
import tech.pegasys.teku.spec.executionlayer.PayloadBuildingAttributes;
import tech.pegasys.teku.statetransition.block.NewBlockBuildingSubscriber.NewBlockBuildingNotification;

public class PayloadAttributesEvent extends Event<PayloadAttributesData> {

Expand Down Expand Up @@ -99,19 +99,22 @@ record PayloadAttributes(
Optional<List<Withdrawal>> withdrawals,
Optional<Bytes32> parentBeaconBlockRoot) {}

/**
* @param forkChoiceState The fork choice state before sending the fCu so can use it to get
* parent_block_number and parent_block_hash
*/
static PayloadAttributesEvent create(
final SpecMilestone milestone,
final NewBlockBuildingNotification newBlockBuildingNotification) {
final PayloadBuildingAttributes payloadAttributes =
newBlockBuildingNotification.payloadAttributes();
final PayloadBuildingAttributes payloadAttributes,
final ForkChoiceState forkChoiceState) {
final PayloadAttributesData data =
new PayloadAttributesData(
milestone,
new PayloadAttributesEvent.Data(
payloadAttributes.getProposalSlot(),
payloadAttributes.getParentBeaconBlockRoot(),
newBlockBuildingNotification.parentExecutionBlockNumber(),
newBlockBuildingNotification.parentExecutionBlockHash(),
forkChoiceState.getHeadExecutionBlockNumber(),
forkChoiceState.getHeadExecutionBlockHash(),
payloadAttributes.getProposerIndex(),
// based on PayloadAttributesV<N> as defined by the execution-apis specification
new PayloadAttributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import tech.pegasys.teku.beaconrestapi.handlers.v1.events.PayloadAttributesEvent.Data;
import tech.pegasys.teku.beaconrestapi.handlers.v1.events.PayloadAttributesEvent.PayloadAttributes;
import tech.pegasys.teku.beaconrestapi.handlers.v1.events.PayloadAttributesEvent.PayloadAttributesData;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.infrastructure.events.EventChannels;
import tech.pegasys.teku.infrastructure.json.JsonUtil;
Expand All @@ -56,9 +57,12 @@
import tech.pegasys.teku.spec.datastructures.operations.SignedVoluntaryExit;
import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceState;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceUpdatedResult;
import tech.pegasys.teku.spec.executionlayer.PayloadBuildingAttributes;
import tech.pegasys.teku.spec.executionlayer.PayloadStatus;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.statetransition.block.NewBlockBuildingSubscriber.NewBlockBuildingNotification;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceUpdatedResultSubscriber.ForkChoiceUpdatedResultNotification;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
import tech.pegasys.teku.storage.api.ReorgContext;

Expand Down Expand Up @@ -134,6 +138,22 @@ public class EventSubscriptionManagerTest {
samplePayloadAttributes.getFeeRecipient(),
samplePayloadAttributes.getWithdrawals(),
Optional.of(samplePayloadAttributes.getParentBeaconBlockRoot()))));
final ForkChoiceUpdatedResultNotification forkChoiceUpdatedResultNotification =
new ForkChoiceUpdatedResultNotification(
new ForkChoiceState(
data.randomBytes32(),
data.randomSlot(),
samplePayloadAttributesData.data().parentExecutionBlockNumber(),
samplePayloadAttributesData.data().parentExecutionBlockHash(),
data.randomBytes32(),
data.randomBytes32(),
false),
Optional.of(samplePayloadAttributes),
false,
SafeFuture.completedFuture(
Optional.of(
new ForkChoiceUpdatedResult(
PayloadStatus.VALID, Optional.of(data.randomBytes8())))));

private final AsyncContext async = mock(AsyncContext.class);
private final EventChannels channels = mock(EventChannels.class);
Expand Down Expand Up @@ -258,14 +278,12 @@ void shouldPropagatePayloadAttributes() throws IOException {
manager.registerClient(client1);

triggerPayloadAttributesEvent();
final NewBlockBuildingNotification notification =
new NewBlockBuildingNotification(
samplePayloadAttributesData.data().parentExecutionBlockNumber(),
samplePayloadAttributesData.data().parentExecutionBlockHash(),
samplePayloadAttributes);
checkEvent(
"payload_attributes",
PayloadAttributesEvent.create(samplePayloadAttributesData.milestone(), notification));
PayloadAttributesEvent.create(
samplePayloadAttributesData.milestone(),
samplePayloadAttributes,
forkChoiceUpdatedResultNotification.forkChoiceState()));
}

@Test
Expand Down Expand Up @@ -437,11 +455,7 @@ private void triggerBlobSidecarEvent() {
}

private void triggerPayloadAttributesEvent() {
manager.onNewPayloadAttributes(
new NewBlockBuildingNotification(
samplePayloadAttributesData.data().parentExecutionBlockNumber(),
samplePayloadAttributesData.data().parentExecutionBlockHash(),
samplePayloadAttributes));
manager.onForkChoiceUpdatedResult(forkChoiceUpdatedResultNotification);
asyncRunner.executeQueuedActions();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool.NewBlobSidecarSubscriber;
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.block.NewBlockBuildingSubscriber;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceUpdatedResultSubscriber;
import tech.pegasys.teku.statetransition.forkchoice.PreparedProposerInfo;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.statetransition.forkchoice.RegisteredValidatorInfo;
Expand Down Expand Up @@ -212,8 +212,8 @@ public void subscribeToSyncCommitteeContributions(
syncCommitteeContributionPool.subscribeOperationAdded(listener);
}

public void subscribeToNewBlockBuilding(final NewBlockBuildingSubscriber listener) {
forkChoiceNotifier.subscribeToNewBlockBuilding(listener);
public void subscribeToForkChoiceUpdatedResult(final ForkChoiceUpdatedResultSubscriber listener) {
forkChoiceNotifier.subscribeToForkChoiceUpdatedResult(listener);
}

public SafeFuture<Optional<List<ValidatorLivenessAtEpoch>>> getValidatorLiveness(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public ForkChoice(
public void onForkChoiceUpdatedResult(
final ForkChoiceUpdatedResultNotification forkChoiceUpdatedResultNotification) {
forkChoiceUpdatedResultNotification
.getForkChoiceUpdatedResult()
.forkChoiceUpdatedResult()
.thenAccept(
maybeForkChoiceUpdatedResult ->
maybeForkChoiceUpdatedResult.ifPresent(
Expand All @@ -174,14 +174,12 @@ public void onForkChoiceUpdatedResult(
LOG.error(
"Execution engine considers INVALID recently provided terminal block {}",
forkChoiceUpdatedResultNotification
.getForkChoiceState()
.forkChoiceState()
.getHeadExecutionBlockHash());
return;
}
onExecutionPayloadResult(
forkChoiceUpdatedResultNotification
.getForkChoiceState()
.getHeadBlockRoot(),
forkChoiceUpdatedResultNotification.forkChoiceState().getHeadBlockRoot(),
forkChoiceUpdatedResult.getPayloadStatus());
}))
.finish(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadContext;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceState;
import tech.pegasys.teku.statetransition.block.NewBlockBuildingSubscriber;

public interface ForkChoiceNotifier {
void onForkChoiceUpdated(ForkChoiceState forkChoiceState, Optional<UInt64> proposingSlot);
Expand All @@ -36,6 +35,4 @@ SafeFuture<Optional<ExecutionPayloadContext>> getPayloadId(
boolean validatorIsConnected(UInt64 validatorIndex, UInt64 currentSlot);

void subscribeToForkChoiceUpdatedResult(ForkChoiceUpdatedResultSubscriber subscriber);

void subscribeToNewBlockBuilding(NewBlockBuildingSubscriber subscriber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceState;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceUpdatedResult;
import tech.pegasys.teku.spec.executionlayer.PayloadBuildingAttributes;
import tech.pegasys.teku.statetransition.block.NewBlockBuildingSubscriber;
import tech.pegasys.teku.statetransition.block.NewBlockBuildingSubscriber.NewBlockBuildingNotification;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceUpdatedResultSubscriber.ForkChoiceUpdatedResultNotification;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager.ProposersDataManagerSubscriber;
import tech.pegasys.teku.storage.client.RecentChainData;
Expand All @@ -48,8 +45,6 @@ public class ForkChoiceNotifierImpl implements ForkChoiceNotifier, ProposersData

private final Subscribers<ForkChoiceUpdatedResultSubscriber> forkChoiceUpdatedSubscribers =
Subscribers.create(true);
private final Subscribers<NewBlockBuildingSubscriber> newBlockBuildingSubscribers =
Subscribers.create(true);

private ForkChoiceUpdateData forkChoiceUpdateData = new ForkChoiceUpdateData();

Expand Down Expand Up @@ -78,11 +73,6 @@ public void subscribeToForkChoiceUpdatedResult(ForkChoiceUpdatedResultSubscriber
forkChoiceUpdatedSubscribers.subscribe(subscriber);
}

@Override
public void subscribeToNewBlockBuilding(final NewBlockBuildingSubscriber subscriber) {
newBlockBuildingSubscribers.subscribe(subscriber);
}

@Override
public void onForkChoiceUpdated(
final ForkChoiceState forkChoiceState, final Optional<UInt64> proposingSlot) {
Expand Down Expand Up @@ -190,9 +180,6 @@ private SafeFuture<Optional<ExecutionPayloadContext>> internalGetPayloadId(
.withPayloadBuildingAttributes(payloadBuildingAttributes);

sendForkChoiceUpdated();
payloadBuildingAttributes.ifPresent(
payloadAttributes ->
notifyNewBlockBuildingSubscribers(forkChoiceState, payloadAttributes));

if (!forkChoiceUpdateData.isPayloadIdSuitable(parentExecutionHash, timestamp)) {
throw new IllegalStateException(
Expand Down Expand Up @@ -258,20 +245,11 @@ private void sendForkChoiceUpdated() {
ForkChoiceUpdatedResultSubscriber::onForkChoiceUpdatedResult,
new ForkChoiceUpdatedResultNotification(
forkChoiceUpdateData.getForkChoiceState(),
forkChoiceUpdateData.getPayloadBuildingAttributes(),
forkChoiceUpdateData.hasTerminalBlockHash(),
forkChoiceUpdatedResult));
}

private void notifyNewBlockBuildingSubscribers(
final ForkChoiceState forkChoiceState, final PayloadBuildingAttributes payloadAttributes) {
newBlockBuildingSubscribers.deliver(
NewBlockBuildingSubscriber::onNewBlockBuilding,
new NewBlockBuildingNotification(
forkChoiceState.getHeadExecutionBlockNumber(),
forkChoiceState.getHeadExecutionBlockHash(),
payloadAttributes));
}

private void updatePayloadAttributes(final UInt64 blockSlot) {
LOG.debug("updatePayloadAttributes blockSlot {}", blockSlot);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ public boolean isPayloadIdSuitable(final Bytes32 parentExecutionHash, final UInt
}
}

public Optional<PayloadBuildingAttributes> getPayloadBuildingAttributes() {
return payloadBuildingAttributes;
}

public SafeFuture<Optional<ExecutionPayloadContext>> getExecutionPayloadContext() {
return executionPayloadContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,17 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceState;
import tech.pegasys.teku.spec.executionlayer.ForkChoiceUpdatedResult;
import tech.pegasys.teku.spec.executionlayer.PayloadBuildingAttributes;

@FunctionalInterface
public interface ForkChoiceUpdatedResultSubscriber {

void onForkChoiceUpdatedResult(
final ForkChoiceUpdatedResultNotification forkChoiceUpdatedResultNotification);

class ForkChoiceUpdatedResultNotification {
private final ForkChoiceState forkChoiceState;
private final boolean isTerminalBlockCall;
private final SafeFuture<Optional<ForkChoiceUpdatedResult>> forkChoiceUpdatedResult;

public ForkChoiceUpdatedResultNotification(
final ForkChoiceState forkChoiceState,
final boolean isTerminalBlockCall,
final SafeFuture<Optional<ForkChoiceUpdatedResult>> forkChoiceUpdatedResult) {
this.forkChoiceState = forkChoiceState;
this.isTerminalBlockCall = isTerminalBlockCall;
this.forkChoiceUpdatedResult = forkChoiceUpdatedResult;
}

public ForkChoiceState getForkChoiceState() {
return forkChoiceState;
}

public boolean isTerminalBlockCall() {
return isTerminalBlockCall;
}

public SafeFuture<Optional<ForkChoiceUpdatedResult>> getForkChoiceUpdatedResult() {
return forkChoiceUpdatedResult;
}
}
record ForkChoiceUpdatedResultNotification(
ForkChoiceState forkChoiceState,
Optional<PayloadBuildingAttributes> payloadAttributes,
boolean isTerminalBlockCall,
SafeFuture<Optional<ForkChoiceUpdatedResult>> forkChoiceUpdatedResult) {}
}
Loading

0 comments on commit b2756d2

Please sign in to comment.