diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineApiMethod.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineApiMethod.java index a47654c47fe..44ec33d7a4c 100644 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineApiMethod.java +++ b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineApiMethod.java @@ -16,8 +16,7 @@ public enum EngineApiMethod { ENGINE_NEW_PAYLOAD("engine_newPayload"), ENGINE_GET_PAYLOAD("engine_getPayload"), - ENGINE_FORK_CHOICE_UPDATED("engine_forkchoiceUpdated"), - ENGINE_GET_BLOBS("engine_getBlobs"); + ENGINE_FORK_CHOICE_UPDATED("engine_forkchoiceUpdated"); private final String name; diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineGetBlobsV1.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineGetBlobsV1.java deleted file mode 100644 index 40426616b87..00000000000 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineGetBlobsV1.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2024 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.ethereum.executionclient.methods; - -import java.util.List; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import tech.pegasys.teku.ethereum.executionclient.ExecutionEngineClient; -import tech.pegasys.teku.ethereum.executionclient.response.ResponseUnwrapper; -import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSchema; -import tech.pegasys.teku.spec.datastructures.execution.BlobAndProof; -import tech.pegasys.teku.spec.logic.versions.deneb.types.VersionedHash; -import tech.pegasys.teku.spec.schemas.SchemaDefinitions; -import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb; - -public class EngineGetBlobsV1 extends AbstractEngineJsonRpcMethod> { - - private static final Logger LOG = LogManager.getLogger(); - private final Spec spec; - - public EngineGetBlobsV1(final ExecutionEngineClient executionEngineClient, final Spec spec) { - super(executionEngineClient); - this.spec = spec; - } - - @Override - public String getName() { - return EngineApiMethod.ENGINE_GET_BLOBS.getName(); - } - - @Override - public int getVersion() { - return 1; - } - - @Override - public boolean isOptional() { - return true; - } - - @Override - public SafeFuture> execute(final JsonRpcRequestParams params) { - - final List blobVersionedHashes = - params.getRequiredListParameter(0, VersionedHash.class); - - final UInt64 slot = params.getRequiredParameter(1, UInt64.class); - - LOG.trace( - "Calling {}(blobVersionedHashes={}, slot={})", - getVersionedName(), - blobVersionedHashes, - slot); - - return executionEngineClient - .getBlobsV1(blobVersionedHashes) - .thenApply(ResponseUnwrapper::unwrapExecutionClientResponseOrThrow) - .thenApply( - response -> { - final SchemaDefinitions schemaDefinitions = spec.atSlot(slot).getSchemaDefinitions(); - final BlobSchema blobSchema = - SchemaDefinitionsDeneb.required(schemaDefinitions).getBlobSchema(); - return response.stream() - .map( - blobAndProofV1 -> - blobAndProofV1 == null - ? null - : blobAndProofV1.asInternalBlobsAndProofs(blobSchema)) - .toList(); - }) - .thenPeek( - blobsAndProofs -> - LOG.trace( - "Response {}(blobVersionedHashes={}) -> {}", - getVersionedName(), - blobVersionedHashes, - blobsAndProofs)); - } -} diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineJsonRpcMethod.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineJsonRpcMethod.java index 8c41f711697..ddc4b4f196e 100644 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineJsonRpcMethod.java +++ b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineJsonRpcMethod.java @@ -28,12 +28,6 @@ default boolean isDeprecated() { return false; } - // TODO should be remove once all ELs implement engine_getBlobsV1. It has been added only to - // better handle the use case when the method is missing in the EL side - default boolean isOptional() { - return false; - } - default String getVersionedName() { return getVersion() == 0 ? getName() : getName() + "V" + getVersion(); } diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/schema/BlobAndProofV1.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/schema/BlobAndProofV1.java index 1a73026a865..041fdbbbab6 100644 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/schema/BlobAndProofV1.java +++ b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/schema/BlobAndProofV1.java @@ -52,11 +52,6 @@ public BlobAndProof asInternalBlobsAndProofs(final BlobSchema blobSchema) { return new BlobAndProof(new Blob(blobSchema, blob), new KZGProof(proof)); } - public static BlobAndProofV1 fromInternalBlobsBundle(final BlobAndProof blobAndProof) { - return new BlobAndProofV1( - blobAndProof.blob().getBytes(), blobAndProof.proof().getBytesCompressed()); - } - @Override public boolean equals(final Object o) { if (this == o) { diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/schema/ConsolidationRequestV1.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/schema/ConsolidationRequestV1.java deleted file mode 100644 index 92bb0f96463..00000000000 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/schema/ConsolidationRequestV1.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2024 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.ethereum.executionclient.schema; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.google.common.base.MoreObjects; -import java.util.Objects; -import org.apache.tuweni.bytes.Bytes48; -import tech.pegasys.teku.ethereum.executionclient.serialization.Bytes20Deserializer; -import tech.pegasys.teku.ethereum.executionclient.serialization.Bytes20Serializer; -import tech.pegasys.teku.ethereum.executionclient.serialization.Bytes48Deserializer; -import tech.pegasys.teku.ethereum.executionclient.serialization.BytesSerializer; -import tech.pegasys.teku.infrastructure.bytes.Bytes20; - -public class ConsolidationRequestV1 { - @JsonSerialize(using = Bytes20Serializer.class) - @JsonDeserialize(using = Bytes20Deserializer.class) - public final Bytes20 sourceAddress; - - @JsonSerialize(using = BytesSerializer.class) - @JsonDeserialize(using = Bytes48Deserializer.class) - public final Bytes48 sourcePubkey; - - @JsonSerialize(using = BytesSerializer.class) - @JsonDeserialize(using = Bytes48Deserializer.class) - public final Bytes48 targetPubkey; - - public ConsolidationRequestV1( - @JsonProperty("sourceAddress") final Bytes20 sourceAddress, - @JsonProperty("sourcePubkey") final Bytes48 sourcePubkey, - @JsonProperty("targetPubkey") final Bytes48 targetPubkey) { - checkNotNull(sourceAddress, "sourceAddress"); - checkNotNull(sourcePubkey, "sourcePubkey"); - checkNotNull(targetPubkey, "targetPubkey"); - this.sourceAddress = sourceAddress; - this.sourcePubkey = sourcePubkey; - this.targetPubkey = targetPubkey; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final ConsolidationRequestV1 that = (ConsolidationRequestV1) o; - return Objects.equals(sourceAddress, that.sourceAddress) - && Objects.equals(sourcePubkey, that.sourcePubkey) - && Objects.equals(targetPubkey, that.targetPubkey); - } - - @Override - public int hashCode() { - return Objects.hash(sourceAddress, sourceAddress, targetPubkey); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("sourceAddress", sourceAddress) - .add("sourcePubkey", sourcePubkey) - .add("targetPubkey", targetPubkey) - .toString(); - } -} diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/schema/DepositRequestV1.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/schema/DepositRequestV1.java deleted file mode 100644 index 1fe6256e062..00000000000 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/schema/DepositRequestV1.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2024 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.ethereum.executionclient.schema; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.google.common.base.MoreObjects; -import java.util.Objects; -import org.apache.tuweni.bytes.Bytes; -import org.apache.tuweni.bytes.Bytes32; -import org.apache.tuweni.bytes.Bytes48; -import tech.pegasys.teku.ethereum.executionclient.serialization.Bytes32Deserializer; -import tech.pegasys.teku.ethereum.executionclient.serialization.Bytes48Deserializer; -import tech.pegasys.teku.ethereum.executionclient.serialization.BytesDeserializer; -import tech.pegasys.teku.ethereum.executionclient.serialization.BytesSerializer; -import tech.pegasys.teku.ethereum.executionclient.serialization.UInt64AsHexDeserializer; -import tech.pegasys.teku.ethereum.executionclient.serialization.UInt64AsHexSerializer; -import tech.pegasys.teku.infrastructure.unsigned.UInt64; - -public class DepositRequestV1 { - @JsonSerialize(using = BytesSerializer.class) - @JsonDeserialize(using = Bytes48Deserializer.class) - public final Bytes48 pubkey; - - @JsonSerialize(using = BytesSerializer.class) - @JsonDeserialize(using = Bytes32Deserializer.class) - public final Bytes32 withdrawalCredentials; - - @JsonSerialize(using = UInt64AsHexSerializer.class) - @JsonDeserialize(using = UInt64AsHexDeserializer.class) - public final UInt64 amount; - - @JsonSerialize(using = BytesSerializer.class) - @JsonDeserialize(using = BytesDeserializer.class) - public final Bytes signature; - - @JsonSerialize(using = UInt64AsHexSerializer.class) - @JsonDeserialize(using = UInt64AsHexDeserializer.class) - public final UInt64 index; - - public DepositRequestV1( - @JsonProperty("pubkey") final Bytes48 pubkey, - @JsonProperty("withdrawalCredentials") final Bytes32 withdrawalCredentials, - @JsonProperty("amount") final UInt64 amount, - @JsonProperty("signature") final Bytes signature, - @JsonProperty("index") final UInt64 index) { - checkNotNull(pubkey, "pubkey"); - checkNotNull(withdrawalCredentials, "withdrawalCredentials"); - checkNotNull(amount, "amount"); - checkNotNull(signature, "signature"); - checkNotNull(index, "index"); - this.pubkey = pubkey; - this.withdrawalCredentials = withdrawalCredentials; - this.amount = amount; - this.signature = signature; - this.index = index; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final DepositRequestV1 that = (DepositRequestV1) o; - return Objects.equals(pubkey, that.pubkey) - && Objects.equals(withdrawalCredentials, that.withdrawalCredentials) - && Objects.equals(amount, that.amount) - && Objects.equals(signature, that.signature) - && Objects.equals(index, that.index); - } - - @Override - public int hashCode() { - return Objects.hash(pubkey, withdrawalCredentials, amount, signature, index); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("pubkey", pubkey) - .add("withdrawalCredentials", withdrawalCredentials) - .add("amount", amount) - .add("signature", signature) - .add("index", index) - .toString(); - } -} diff --git a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/web3j/DefaultExecutionWeb3jClientProvider.java b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/web3j/DefaultExecutionWeb3jClientProvider.java index dd3465735b6..49814585d96 100644 --- a/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/web3j/DefaultExecutionWeb3jClientProvider.java +++ b/ethereum/executionclient/src/main/java/tech/pegasys/teku/ethereum/executionclient/web3j/DefaultExecutionWeb3jClientProvider.java @@ -23,6 +23,11 @@ import tech.pegasys.teku.infrastructure.time.TimeProvider; public class DefaultExecutionWeb3jClientProvider implements ExecutionWeb3jClientProvider { + private static final String[] NON_CRITICAL_METHODS = + new String[] { + "engine_exchangeCapabilities", "engine_getClientVersionV1", "engine_getBlobsV1" + }; + private final String eeEndpoint; private final Duration timeout; private final Optional jwtConfig; @@ -58,8 +63,7 @@ private synchronized void buildClient() { .jwtConfigOpt(jwtConfig) .timeProvider(timeProvider) .executionClientEventsPublisher(executionClientEventsPublisher) - .nonCriticalMethods( - "engine_exchangeCapabilities", "engine_getClientVersionV1", "engine_getBlobsV1") + .nonCriticalMethods(NON_CRITICAL_METHODS) .build(); this.alreadyBuilt = true; } diff --git a/ethereum/executionclient/src/test/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineGetBlobsV1Test.java b/ethereum/executionclient/src/test/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineGetBlobsV1Test.java deleted file mode 100644 index 6501e6a2aaa..00000000000 --- a/ethereum/executionclient/src/test/java/tech/pegasys/teku/ethereum/executionclient/methods/EngineGetBlobsV1Test.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2024 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.ethereum.executionclient.methods; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import tech.pegasys.teku.ethereum.executionclient.ExecutionEngineClient; -import tech.pegasys.teku.ethereum.executionclient.response.InvalidRemoteResponseException; -import tech.pegasys.teku.ethereum.executionclient.schema.BlobAndProofV1; -import tech.pegasys.teku.ethereum.executionclient.schema.Response; -import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.TestSpecFactory; -import tech.pegasys.teku.spec.config.SpecConfigDeneb; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.spec.datastructures.execution.BlobAndProof; -import tech.pegasys.teku.spec.logic.versions.deneb.types.VersionedHash; -import tech.pegasys.teku.spec.util.DataStructureUtil; - -public class EngineGetBlobsV1Test { - - private final Spec spec = TestSpecFactory.createMinimalElectra(); - private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); - private final ExecutionEngineClient executionEngineClient = mock(ExecutionEngineClient.class); - private EngineGetBlobsV1 jsonRpcMethod; - - @BeforeEach - public void setUp() { - jsonRpcMethod = new EngineGetBlobsV1(executionEngineClient, spec); - } - - @Test - public void shouldReturnExpectedNameAndVersion() { - assertThat(jsonRpcMethod.getName()).isEqualTo("engine_getBlobs"); - assertThat(jsonRpcMethod.isOptional()).isTrue(); - assertThat(jsonRpcMethod.getVersion()).isEqualTo(1); - assertThat(jsonRpcMethod.getVersionedName()).isEqualTo("engine_getBlobsV1"); - } - - @Test - public void blobVersionedHashesParamIsRequired() { - final JsonRpcRequestParams params = new JsonRpcRequestParams.Builder().build(); - - assertThatThrownBy(() -> jsonRpcMethod.execute(params)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Missing required parameter at index 0"); - - verifyNoInteractions(executionEngineClient); - } - - @Test - public void slotParamIsRequired() { - final List versionedHashes = dataStructureUtil.randomVersionedHashes(4); - - final JsonRpcRequestParams params = - new JsonRpcRequestParams.Builder().add(versionedHashes).build(); - - assertThatThrownBy(() -> jsonRpcMethod.execute(params)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Missing required parameter at index 1"); - - verifyNoInteractions(executionEngineClient); - } - - @Test - public void shouldReturnFailedExecutionWhenEngineClientRequestFails() { - final List versionedHashes = dataStructureUtil.randomVersionedHashes(4); - final String errorResponseFromClient = "error!"; - - when(executionEngineClient.getBlobsV1(any())) - .thenReturn(dummyFailedResponse(errorResponseFromClient)); - - final JsonRpcRequestParams params = - new JsonRpcRequestParams.Builder().add(versionedHashes).add(UInt64.ZERO).build(); - - assertThat(jsonRpcMethod.execute(params)) - .failsWithin(1, TimeUnit.SECONDS) - .withThrowableOfType(ExecutionException.class) - .withRootCauseInstanceOf(InvalidRemoteResponseException.class) - .withMessageContaining( - "Invalid remote response from the execution client: %s", errorResponseFromClient); - } - - @Test - public void shouldCallGetBlobsV1AndParseResponseSuccessfully() { - final List versionedHashes = dataStructureUtil.randomVersionedHashes(4); - final List blobSidecars = - dataStructureUtil.randomBlobSidecars( - SpecConfigDeneb.required(spec.getGenesisSpecConfig()).getMaxBlobsPerBlock()); - - when(executionEngineClient.getBlobsV1(eq(versionedHashes))) - .thenReturn(dummySuccessfulResponse(blobSidecars)); - - final JsonRpcRequestParams params = - new JsonRpcRequestParams.Builder().add(versionedHashes).add(UInt64.ZERO).build(); - - jsonRpcMethod = new EngineGetBlobsV1(executionEngineClient, spec); - - final List expectedResponse = - blobSidecars.stream() - .map(blobSidecar -> new BlobAndProof(blobSidecar.getBlob(), blobSidecar.getKZGProof())) - .toList(); - assertThat(jsonRpcMethod.execute(params)).isCompletedWithValue(expectedResponse); - - verify(executionEngineClient).getBlobsV1(eq(versionedHashes)); - verifyNoMoreInteractions(executionEngineClient); - } - - private SafeFuture>> dummySuccessfulResponse( - final List blobSidecars) { - return SafeFuture.completedFuture( - new Response<>( - blobSidecars.stream() - .map( - blobSidecar -> - new BlobAndProofV1( - blobSidecar.getBlob().getBytes(), - blobSidecar.getKZGProof().getBytesCompressed())) - .toList())); - } - - private SafeFuture>> dummyFailedResponse( - final String errorMessage) { - return SafeFuture.completedFuture(Response.withErrorMessage(errorMessage)); - } -} diff --git a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/EngineCapabilitiesMonitor.java b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/EngineCapabilitiesMonitor.java index 8ecc3afbcce..d24fd65a763 100644 --- a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/EngineCapabilitiesMonitor.java +++ b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/EngineCapabilitiesMonitor.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.ethereum.events.SlotEventsChannel; @@ -41,7 +40,6 @@ public class EngineCapabilitiesMonitor implements SlotEventsChannel { private final Spec spec; private final EventLogger eventLogger; private final Supplier> capabilitiesSupplier; - private final Supplier> optionalCapabilitiesSupplier; private final ExecutionEngineClient executionEngineClient; public EngineCapabilitiesMonitor( @@ -53,8 +51,6 @@ public EngineCapabilitiesMonitor( this.eventLogger = eventLogger; this.capabilitiesSupplier = Suppliers.memoize(() -> new ArrayList<>(engineMethodsResolver.getCapabilities())); - this.optionalCapabilitiesSupplier = - Suppliers.memoize(() -> new ArrayList<>(engineMethodsResolver.getOptionalCapabilities())); this.executionEngineClient = executionEngineClient; } @@ -83,35 +79,20 @@ private boolean slotIsApplicable(final UInt64 slot) { private SafeFuture monitor() { final List capabilities = capabilitiesSupplier.get(); - final List optionalCapabilities = optionalCapabilitiesSupplier.get(); return executionEngineClient - .exchangeCapabilities( - Stream.concat(capabilities.stream(), optionalCapabilities.stream()).toList()) + .exchangeCapabilities(capabilities) .thenApply(ResponseUnwrapper::unwrapExecutionClientResponseOrThrow) .thenAccept( engineCapabilities -> { - LOG.debug("Engine API capabilities response: " + engineCapabilities); + LOG.debug("Engine API capabilities response: {}", engineCapabilities); final List missingEngineCapabilities = capabilities.stream() - .filter( - capability -> - !engineCapabilities.contains(capability) - && !optionalCapabilities.contains(capability)) - .toList(); - - final List missingOptionalCapabilities = - optionalCapabilities.stream() - .filter( - optionalCapability -> !engineCapabilities.contains(optionalCapability)) + .filter(capability -> !engineCapabilities.contains(capability)) .toList(); if (!missingEngineCapabilities.isEmpty()) { - eventLogger.missingEngineApiCapabilities(missingEngineCapabilities, false); - } - - if (!missingOptionalCapabilities.isEmpty()) { - eventLogger.missingEngineApiCapabilities(missingOptionalCapabilities, true); + eventLogger.missingEngineApiCapabilities(missingEngineCapabilities); } }); } diff --git a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/EngineJsonRpcMethodsResolver.java b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/EngineJsonRpcMethodsResolver.java index ce182addfea..bcf580c3659 100644 --- a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/EngineJsonRpcMethodsResolver.java +++ b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/EngineJsonRpcMethodsResolver.java @@ -13,7 +13,6 @@ package tech.pegasys.teku.ethereum.executionlayer; -import java.util.List; import java.util.Set; import java.util.function.Supplier; import tech.pegasys.teku.ethereum.executionclient.methods.EngineApiMethod; @@ -25,19 +24,10 @@ public interface EngineJsonRpcMethodsResolver { EngineJsonRpcMethod getMethod( EngineApiMethod method, Supplier milestoneSupplier, Class resultType); - EngineJsonRpcMethod> getListMethod( - EngineApiMethod method, Supplier milestoneSupplier, Class resultType); - /** * Get CL capabilities required for the engine_exchangeCapabilities * request */ Set getCapabilities(); - - /** - * TODO this optionality notion should be removed once all ELs implement the engine_getBlobsV1 RPC - * method. It has been added to ensure a softer and better logging when the method is missing only - */ - Set getOptionalCapabilities(); } diff --git a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionClientHandlerImpl.java b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionClientHandlerImpl.java index 581f494cb0a..64f4ffdb089 100644 --- a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionClientHandlerImpl.java +++ b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionClientHandlerImpl.java @@ -24,6 +24,7 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSchema; import tech.pegasys.teku.spec.datastructures.execution.BlobAndProof; import tech.pegasys.teku.spec.datastructures.execution.ClientVersion; import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayload; @@ -36,6 +37,8 @@ import tech.pegasys.teku.spec.executionlayer.PayloadBuildingAttributes; import tech.pegasys.teku.spec.executionlayer.PayloadStatus; import tech.pegasys.teku.spec.logic.versions.deneb.types.VersionedHash; +import tech.pegasys.teku.spec.schemas.SchemaDefinitions; +import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb; public class ExecutionClientHandlerImpl implements ExecutionClientHandler { @@ -130,16 +133,25 @@ public SafeFuture> engineGetClientVersion(final ClientVersio clientVersions.stream().map(ClientVersionV1::asInternalClientVersion).toList()); } + /** Unlikely the {@link BlobSchema} to change with upcoming forks */ @Override public SafeFuture> engineGetBlobs( final List blobVersionedHashes, final UInt64 slot) { - final JsonRpcRequestParams params = - new JsonRpcRequestParams.Builder().add(blobVersionedHashes).add(slot).build(); - return engineMethodsResolver - .getListMethod( - EngineApiMethod.ENGINE_GET_BLOBS, - () -> spec.atSlot(slot).getMilestone(), - BlobAndProof.class) - .execute(params); + return executionEngineClient + .getBlobsV1(blobVersionedHashes) + .thenApply(ResponseUnwrapper::unwrapExecutionClientResponseOrThrow) + .thenApply( + response -> { + final SchemaDefinitions schemaDefinitions = spec.atSlot(slot).getSchemaDefinitions(); + final BlobSchema blobSchema = + SchemaDefinitionsDeneb.required(schemaDefinitions).getBlobSchema(); + return response.stream() + .map( + blobAndProofV1 -> + blobAndProofV1 == null + ? null + : blobAndProofV1.asInternalBlobsAndProofs(blobSchema)) + .toList(); + }); } } diff --git a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/MilestoneBasedEngineJsonRpcMethodsResolver.java b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/MilestoneBasedEngineJsonRpcMethodsResolver.java index 0651980f143..992aa0e5095 100644 --- a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/MilestoneBasedEngineJsonRpcMethodsResolver.java +++ b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/MilestoneBasedEngineJsonRpcMethodsResolver.java @@ -14,14 +14,12 @@ package tech.pegasys.teku.ethereum.executionlayer; import static tech.pegasys.teku.ethereum.executionclient.methods.EngineApiMethod.ENGINE_FORK_CHOICE_UPDATED; -import static tech.pegasys.teku.ethereum.executionclient.methods.EngineApiMethod.ENGINE_GET_BLOBS; import static tech.pegasys.teku.ethereum.executionclient.methods.EngineApiMethod.ENGINE_GET_PAYLOAD; import static tech.pegasys.teku.ethereum.executionclient.methods.EngineApiMethod.ENGINE_NEW_PAYLOAD; import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Supplier; @@ -31,7 +29,6 @@ import tech.pegasys.teku.ethereum.executionclient.methods.EngineForkChoiceUpdatedV1; import tech.pegasys.teku.ethereum.executionclient.methods.EngineForkChoiceUpdatedV2; import tech.pegasys.teku.ethereum.executionclient.methods.EngineForkChoiceUpdatedV3; -import tech.pegasys.teku.ethereum.executionclient.methods.EngineGetBlobsV1; import tech.pegasys.teku.ethereum.executionclient.methods.EngineGetPayloadV1; import tech.pegasys.teku.ethereum.executionclient.methods.EngineGetPayloadV2; import tech.pegasys.teku.ethereum.executionclient.methods.EngineGetPayloadV3; @@ -109,7 +106,6 @@ private Map> denebSupportedMethods() { methods.put(ENGINE_NEW_PAYLOAD, new EngineNewPayloadV3(executionEngineClient)); methods.put(ENGINE_GET_PAYLOAD, new EngineGetPayloadV3(executionEngineClient, spec)); methods.put(ENGINE_FORK_CHOICE_UPDATED, new EngineForkChoiceUpdatedV3(executionEngineClient)); - methods.put(ENGINE_GET_BLOBS, new EngineGetBlobsV1(executionEngineClient, spec)); return methods; } @@ -120,7 +116,6 @@ private Map> electraSupportedMethods() { methods.put(ENGINE_NEW_PAYLOAD, new EngineNewPayloadV4(executionEngineClient)); methods.put(ENGINE_GET_PAYLOAD, new EngineGetPayloadV4(executionEngineClient, spec)); methods.put(ENGINE_FORK_CHOICE_UPDATED, new EngineForkChoiceUpdatedV3(executionEngineClient)); - methods.put(ENGINE_GET_BLOBS, new EngineGetBlobsV1(executionEngineClient, spec)); return methods; } @@ -143,39 +138,10 @@ public EngineJsonRpcMethod getMethod( return foundMethod; } - @Override - @SuppressWarnings({"unchecked", "unused"}) - public EngineJsonRpcMethod> getListMethod( - final EngineApiMethod method, - final Supplier milestoneSupplier, - final Class resultType) { - final SpecMilestone milestone = milestoneSupplier.get(); - final Map> milestoneMethods = - methodsByMilestone.getOrDefault(milestone, Collections.emptyMap()); - final EngineJsonRpcMethod> foundMethod = - (EngineJsonRpcMethod>) milestoneMethods.get(method); - if (foundMethod == null) { - throw new IllegalArgumentException( - "Can't find method with name " + method.getName() + " for milestone " + milestone); - } - return foundMethod; - } - @Override public Set getCapabilities() { return methodsByMilestone.values().stream() .flatMap(methods -> methods.values().stream()) - .filter(method -> !method.isOptional()) - .filter(method -> !method.isDeprecated()) - .map(EngineJsonRpcMethod::getVersionedName) - .collect(Collectors.toSet()); - } - - @Override - public Set getOptionalCapabilities() { - return methodsByMilestone.values().stream() - .flatMap(methods -> methods.values().stream()) - .filter(EngineJsonRpcMethod::isOptional) .filter(method -> !method.isDeprecated()) .map(EngineJsonRpcMethod::getVersionedName) .collect(Collectors.toSet()); diff --git a/ethereum/executionlayer/src/test/java/tech/pegasys/teku/ethereum/executionlayer/EngineCapabilitiesMonitorTest.java b/ethereum/executionlayer/src/test/java/tech/pegasys/teku/ethereum/executionlayer/EngineCapabilitiesMonitorTest.java index 9849430c6f1..ff4d6c5343c 100644 --- a/ethereum/executionlayer/src/test/java/tech/pegasys/teku/ethereum/executionlayer/EngineCapabilitiesMonitorTest.java +++ b/ethereum/executionlayer/src/test/java/tech/pegasys/teku/ethereum/executionlayer/EngineCapabilitiesMonitorTest.java @@ -25,7 +25,6 @@ import java.util.HashSet; import java.util.List; -import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import tech.pegasys.teku.ethereum.executionclient.ExecutionEngineClient; @@ -44,18 +43,14 @@ public class EngineCapabilitiesMonitorTest { mock(EngineJsonRpcMethodsResolver.class); private final ExecutionEngineClient executionEngineClient = mock(ExecutionEngineClient.class); - private final List engineCapabilities = List.of("method1", "method2", "method3"); private final List capabilities = List.of("method1", "method2"); - private final List optionalCapabilities = List.of("method3"); private EngineCapabilitiesMonitor engineCapabilitiesMonitor; @BeforeEach public void setUp() { when(engineMethodsResolver.getCapabilities()).thenReturn(new HashSet<>(capabilities)); - when(engineMethodsResolver.getOptionalCapabilities()) - .thenReturn(new HashSet<>(optionalCapabilities)); - mockEngineCapabilitiesResponse(engineCapabilities); + mockEngineCapabilitiesResponse(capabilities); engineCapabilitiesMonitor = new EngineCapabilitiesMonitor( spec, eventLogger, engineMethodsResolver, executionEngineClient); @@ -69,18 +64,7 @@ public void logsWarningIfEngineDoesNotSupportCapabilities() { // 3rd slot in epoch engineCapabilitiesMonitor.onSlot(UInt64.valueOf(2)); - verify(eventLogger).missingEngineApiCapabilities(List.of("method2"), false); - } - - @Test - public void logsWarningIfEngineDoesNotSupportOptionalCapabilities() { - // engine only supports one of the methods - mockEngineCapabilitiesResponse(List.of("method1", "method2")); - - // 3rd slot in epoch - engineCapabilitiesMonitor.onSlot(UInt64.valueOf(2)); - - verify(eventLogger).missingEngineApiCapabilities(List.of("method3"), true); + verify(eventLogger).missingEngineApiCapabilities(List.of("method2")); } @Test @@ -145,8 +129,7 @@ public void doesNotRunMonitoringIfNotAtRequiredSlot() { } private void mockEngineCapabilitiesResponse(final List engineCapabilities) { - when(executionEngineClient.exchangeCapabilities( - Stream.concat(capabilities.stream(), optionalCapabilities.stream()).toList())) + when(executionEngineClient.exchangeCapabilities(capabilities)) .thenReturn(SafeFuture.completedFuture(new Response<>(engineCapabilities))); } diff --git a/ethereum/executionlayer/src/test/java/tech/pegasys/teku/ethereum/executionlayer/MilestoneBasedEngineJsonRpcMethodsResolverTest.java b/ethereum/executionlayer/src/test/java/tech/pegasys/teku/ethereum/executionlayer/MilestoneBasedEngineJsonRpcMethodsResolverTest.java index 79d6577e40a..a98ad90fbb8 100644 --- a/ethereum/executionlayer/src/test/java/tech/pegasys/teku/ethereum/executionlayer/MilestoneBasedEngineJsonRpcMethodsResolverTest.java +++ b/ethereum/executionlayer/src/test/java/tech/pegasys/teku/ethereum/executionlayer/MilestoneBasedEngineJsonRpcMethodsResolverTest.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.Mockito.mock; import static tech.pegasys.teku.ethereum.executionclient.methods.EngineApiMethod.ENGINE_FORK_CHOICE_UPDATED; -import static tech.pegasys.teku.ethereum.executionclient.methods.EngineApiMethod.ENGINE_GET_BLOBS; import static tech.pegasys.teku.ethereum.executionclient.methods.EngineApiMethod.ENGINE_GET_PAYLOAD; import static tech.pegasys.teku.ethereum.executionclient.methods.EngineApiMethod.ENGINE_NEW_PAYLOAD; @@ -34,7 +33,6 @@ import tech.pegasys.teku.ethereum.executionclient.methods.EngineForkChoiceUpdatedV1; import tech.pegasys.teku.ethereum.executionclient.methods.EngineForkChoiceUpdatedV2; import tech.pegasys.teku.ethereum.executionclient.methods.EngineForkChoiceUpdatedV3; -import tech.pegasys.teku.ethereum.executionclient.methods.EngineGetBlobsV1; import tech.pegasys.teku.ethereum.executionclient.methods.EngineGetPayloadV1; import tech.pegasys.teku.ethereum.executionclient.methods.EngineGetPayloadV2; import tech.pegasys.teku.ethereum.executionclient.methods.EngineGetPayloadV3; @@ -163,8 +161,7 @@ private static Stream denebMethods() { return Stream.of( arguments(ENGINE_NEW_PAYLOAD, EngineNewPayloadV3.class), arguments(ENGINE_GET_PAYLOAD, EngineGetPayloadV3.class), - arguments(ENGINE_FORK_CHOICE_UPDATED, EngineForkChoiceUpdatedV3.class), - arguments(ENGINE_GET_BLOBS, EngineGetBlobsV1.class)); + arguments(ENGINE_FORK_CHOICE_UPDATED, EngineForkChoiceUpdatedV3.class)); } @Test @@ -200,8 +197,7 @@ private static Stream electraMethods() { return Stream.of( arguments(ENGINE_NEW_PAYLOAD, EngineNewPayloadV4.class), arguments(ENGINE_GET_PAYLOAD, EngineGetPayloadV4.class), - arguments(ENGINE_FORK_CHOICE_UPDATED, EngineForkChoiceUpdatedV3.class), - arguments(ENGINE_GET_BLOBS, EngineGetBlobsV1.class)); + arguments(ENGINE_FORK_CHOICE_UPDATED, EngineForkChoiceUpdatedV3.class)); } @Test @@ -229,18 +225,4 @@ void getsCapabilities() { "engine_newPayloadV4", "engine_getPayloadV4"); } - - @Test - void getsOptionalCapabilities() { - final Spec spec = - TestSpecFactory.createMinimalWithCapellaDenebAndElectraForkEpoch( - UInt64.ONE, UInt64.valueOf(2), UInt64.valueOf(3)); - - final MilestoneBasedEngineJsonRpcMethodsResolver engineMethodsResolver = - new MilestoneBasedEngineJsonRpcMethodsResolver(spec, executionEngineClient); - - final Set capabilities = engineMethodsResolver.getOptionalCapabilities(); - - assertThat(capabilities).containsExactlyInAnyOrder("engine_getBlobsV1"); - } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionlayer/ExecutionLayerChannel.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionlayer/ExecutionLayerChannel.java index f4cfac0dba1..afd05688137 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionlayer/ExecutionLayerChannel.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionlayer/ExecutionLayerChannel.java @@ -120,8 +120,7 @@ SafeFuture engineForkChoiceUpdated( ForkChoiceState forkChoiceState, Optional payloadBuildingAttributes); - SafeFuture engineNewPayload( - NewPayloadRequest newPayloadRequest, final UInt64 slot); + SafeFuture engineNewPayload(NewPayloadRequest newPayloadRequest, UInt64 slot); SafeFuture> engineGetClientVersion(ClientVersion clientVersion); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 652e5fc9607..7d2bcb28d45 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -477,7 +477,6 @@ public synchronized int getTotalBlobSidecarsTrackers() { return blockBlobSidecarsTrackers.size(); } - @SuppressWarnings("FutureReturnValueIgnored") private BlockBlobSidecarsTracker internalOnNewBlock( final SignedBeaconBlock block, final Optional remoteOrigin) { final SlotAndBlockRoot slotAndBlockRoot = block.getSlotAndBlockRoot(); @@ -502,19 +501,21 @@ private BlockBlobSidecarsTracker internalOnNewBlock( if (!existingTracker.isComplete()) { // we missed the opportunity to complete the blob sidecars via local EL and RPC // (since the block is required to be known) Let's try now - asyncRunner.runAsync( - () -> - fetchMissingBlobsFromLocalEL(slotAndBlockRoot) - .handleException(this::logLocalElBlobsLookupFailure) - .thenRun( - () -> { - // only run if RPC block fetch has happened ( no blobs RPC fetch - // has occurred) - if (existingTracker.isRpcBlockFetchTriggered()) { - fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot); - } - }) - .finish(this::logBlockOrBlobsRPCFailure)); + asyncRunner + .runAsync( + () -> + fetchMissingBlobsFromLocalEL(slotAndBlockRoot) + .handleException(this::logLocalElBlobsLookupFailure) + .thenRun( + () -> { + // only run if RPC block fetch has happened + // (no blobs RPC fetch has occurred) + if (existingTracker.isRpcBlockFetchTriggered()) { + fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot); + } + }) + .handleException(this::logBlockOrBlobsRPCFailure)) + .ifExceptionGetsHereRaiseABug(); } }); diff --git a/infrastructure/logging/src/main/java/tech/pegasys/teku/infrastructure/logging/EventLogger.java b/infrastructure/logging/src/main/java/tech/pegasys/teku/infrastructure/logging/EventLogger.java index 651eff10f2a..4893ec3e908 100644 --- a/infrastructure/logging/src/main/java/tech/pegasys/teku/infrastructure/logging/EventLogger.java +++ b/infrastructure/logging/src/main/java/tech/pegasys/teku/infrastructure/logging/EventLogger.java @@ -162,13 +162,11 @@ public void executionClientRecovered() { info("Execution Client is responding to requests again after a previous failure", Color.GREEN); } - // TODO remove the isOptional param when all ELs implement the engine_getBlob - public void missingEngineApiCapabilities( - final List missingCapabilities, final boolean isOptional) { + public void missingEngineApiCapabilities(final List missingCapabilities) { warn( String.format( - "Execution Client does not support %s Engine API methods: %s. Make sure it is upgraded to a compatible version.", - isOptional ? "optional" : "required", missingCapabilities), + "Execution Client does not support required Engine API methods: %s. Make sure it is upgraded to a compatible version.", + missingCapabilities), Color.YELLOW); } diff --git a/infrastructure/time/src/test/java/tech/pegasys/teku/infrastructure/time/ThrottlerTest.java b/infrastructure/time/src/test/java/tech/pegasys/teku/infrastructure/time/ThrottlerTest.java index cec34889f42..80d9c927dc3 100644 --- a/infrastructure/time/src/test/java/tech/pegasys/teku/infrastructure/time/ThrottlerTest.java +++ b/infrastructure/time/src/test/java/tech/pegasys/teku/infrastructure/time/ThrottlerTest.java @@ -22,8 +22,8 @@ public class ThrottlerTest { private final AtomicInteger resource = new AtomicInteger(0); - private final UInt64 throttingPeriod = UInt64.valueOf(10); - private final Throttler throttler = new Throttler<>(resource, throttingPeriod); + private final UInt64 throttlingPeriod = UInt64.valueOf(10); + private final Throttler throttler = new Throttler<>(resource, throttlingPeriod); @Test public void init_mustThrowWhenThrottlingPeriodIsNull() { @@ -59,19 +59,19 @@ public void invoke_shouldThrottle() { assertThat(resource.get()).isEqualTo(1); // Repeatedly invoke at initial time - for (int i = 0; i < throttingPeriod.times(2).intValue(); i++) { + for (int i = 0; i < throttlingPeriod.times(2).intValue(); i++) { throttler.invoke(initialTime, AtomicInteger::incrementAndGet); assertThat(resource.get()).isEqualTo(1); } // Increment time and invoke up to limit - for (int i = 0; i < throttingPeriod.intValue(); i++) { + for (int i = 0; i < throttlingPeriod.intValue(); i++) { throttler.invoke(initialTime.plus(i), AtomicInteger::incrementAndGet); } assertThat(resource.get()).isEqualTo(1); // Invoke at boundary - throttler.invoke(initialTime.plus(throttingPeriod), AtomicInteger::incrementAndGet); + throttler.invoke(initialTime.plus(throttlingPeriod), AtomicInteger::incrementAndGet); assertThat(resource.get()).isEqualTo(2); } @@ -81,11 +81,11 @@ public void invoke_shouldNotThrottleAcrossSparseInvocations() { throttler.invoke(initialTime, AtomicInteger::incrementAndGet); assertThat(resource.get()).isEqualTo(1); - throttler.invoke(initialTime.plus(throttingPeriod.times(2)), AtomicInteger::incrementAndGet); + throttler.invoke(initialTime.plus(throttlingPeriod.times(2)), AtomicInteger::incrementAndGet); assertThat(resource.get()).isEqualTo(2); throttler.invoke( - initialTime.plus(throttingPeriod.times(3)).plus(1), AtomicInteger::incrementAndGet); + initialTime.plus(throttlingPeriod.times(3)).plus(1), AtomicInteger::incrementAndGet); assertThat(resource.get()).isEqualTo(3); } @@ -108,19 +108,21 @@ public void invoke_shouldThrottleBasedOnLastSuccessfulInvocation() { assertThat(resource.get()).isEqualTo(1); // Don't throttle under the next threshold - throttler.invoke(lastInvocation.plus(throttingPeriod).minus(1), AtomicInteger::incrementAndGet); + throttler.invoke( + lastInvocation.plus(throttlingPeriod).minus(1), AtomicInteger::incrementAndGet); assertThat(resource.get()).isEqualTo(1); // Invoke once we pass the current threshold - lastInvocation = lastInvocation.plus(throttingPeriod.times(2)).plus(1); + lastInvocation = lastInvocation.plus(throttlingPeriod.times(2)).plus(1); throttler.invoke(lastInvocation, AtomicInteger::incrementAndGet); assertThat(resource.get()).isEqualTo(2); // Don't throttle under the next threshold - throttler.invoke(lastInvocation.plus(throttingPeriod).minus(1), AtomicInteger::incrementAndGet); + throttler.invoke( + lastInvocation.plus(throttlingPeriod).minus(1), AtomicInteger::incrementAndGet); assertThat(resource.get()).isEqualTo(2); // Invoke at next threshold - throttler.invoke(lastInvocation.plus(throttingPeriod), AtomicInteger::incrementAndGet); + throttler.invoke(lastInvocation.plus(throttlingPeriod), AtomicInteger::incrementAndGet); assertThat(resource.get()).isEqualTo(3); } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java index 2a1bce8e6f3..9d3c3fdcb83 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java @@ -33,7 +33,9 @@ public class Eth2IncomingRequestHandler< TRequest extends RpcRequest & SszData, TResponse extends SszData> implements RpcRequestHandler { + private static final Logger LOG = LogManager.getLogger(); + private static final Duration RECEIVE_INCOMING_REQUEST_TIMEOUT = Duration.ofSeconds(10); private final PeerLookup peerLookup; @@ -119,9 +121,8 @@ private void handleRequest( private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) { asyncRunner - .getDelayedFuture(RECEIVE_INCOMING_REQUEST_TIMEOUT) - .thenAccept( - (__) -> { + .runAfterDelay( + () -> { if (!requestHandled.get()) { LOG.debug( "Failed to receive incoming request data within {} sec for protocol {}. Close stream.", @@ -129,7 +130,8 @@ private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) { protocolId); stream.closeAbruptly().ifExceptionGetsHereRaiseABug(); } - }) + }, + RECEIVE_INCOMING_REQUEST_TIMEOUT) .ifExceptionGetsHereRaiseABug(); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java index 320a5a3f943..be3e39cd95b 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java @@ -55,7 +55,7 @@ enum State { private static final Logger LOG = LogManager.getLogger(); @VisibleForTesting static final Duration READ_COMPLETE_TIMEOUT = Duration.ofSeconds(10); - @VisibleForTesting static final Duration RESPONSE_CHUNK_ARRIVAL_TIMEOUT = Duration.ofSeconds(30); + @VisibleForTesting static final Duration RESPONSE_CHUNK_ARRIVAL_TIMEOUT = Duration.ofSeconds(10); private final AsyncRunner asyncRunner; private final int maximumResponseChunks; @@ -116,7 +116,7 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By throw new RpcException.ExtraDataAppendedException(" extra data: " + bufToString(data)); } - List maybeResponses = responseDecoder.decodeNextResponses(data); + final List maybeResponses = responseDecoder.decodeNextResponses(data); final int chunksReceived = currentChunkCount.addAndGet(maybeResponses.size()); if (chunksReceived > maximumResponseChunks) { @@ -161,8 +161,8 @@ private String bufToString(final ByteBuf buf) { final int contentSize = Integer.min(buf.readableBytes(), 1024); String bufContent = ""; if (contentSize > 0) { - ByteBuf bufSlice = buf.slice(0, contentSize); - byte[] bytes = new byte[bufSlice.readableBytes()]; + final ByteBuf bufSlice = buf.slice(0, contentSize); + final byte[] bytes = new byte[bufSlice.readableBytes()]; bufSlice.getBytes(0, bytes); bufContent += Bytes.wrap(bytes); if (contentSize < buf.readableBytes()) { @@ -255,9 +255,8 @@ private void ensureNextResponseChunkArrivesInTime( final int previousResponseCount, final AtomicInteger currentResponseCount) { timeoutRunner - .getDelayedFuture(RESPONSE_CHUNK_ARRIVAL_TIMEOUT) - .thenAccept( - (__) -> { + .runAfterDelay( + () -> { if (previousResponseCount == currentResponseCount.get()) { abortRequest( stream, @@ -265,22 +264,23 @@ private void ensureNextResponseChunkArrivesInTime( "Timed out waiting for response chunk " + previousResponseCount, RESPONSE_CHUNK_ARRIVAL_TIMEOUT)); } - }) + }, + RESPONSE_CHUNK_ARRIVAL_TIMEOUT) .ifExceptionGetsHereRaiseABug(); } private void ensureReadCompleteArrivesInTime(final RpcStream stream) { timeoutRunner - .getDelayedFuture(READ_COMPLETE_TIMEOUT) - .thenAccept( - (__) -> { + .runAfterDelay( + () -> { if (!(state.get() == READ_COMPLETE || state.get() == CLOSED)) { abortRequest( stream, new RpcTimeoutException( "Timed out waiting for read channel close", READ_COMPLETE_TIMEOUT)); } - }) + }, + READ_COMPLETE_TIMEOUT) .ifExceptionGetsHereRaiseABug(); }