Skip to content

Commit

Permalink
Merge branch 'master' into feature-7811-admin_addTrustedPeer-rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
LukaszRozmej authored Feb 6, 2025
2 parents fe2ed13 + c740dab commit e31659e
Show file tree
Hide file tree
Showing 13 changed files with 413 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Nethermind.Consensus.Processing;

public sealed class BlockCachePreWarmer(ReadOnlyTxProcessingEnvFactory envFactory, ISpecProvider specProvider, ILogManager logManager, PreBlockCaches? preBlockCaches = null) : IBlockCachePreWarmer
{
private readonly ObjectPool<IReadOnlyTxProcessorSource> _envPool = new DefaultObjectPool<IReadOnlyTxProcessorSource>(new ReadOnlyTxProcessingEnvPooledObjectPolicy(envFactory), Environment.ProcessorCount * 4);
private readonly ObjectPool<IReadOnlyTxProcessorSource> _envPool = new DefaultObjectPool<IReadOnlyTxProcessorSource>(new ReadOnlyTxProcessingEnvPooledObjectPolicy(envFactory), Environment.ProcessorCount * 2);
private readonly ILogger _logger = logManager.GetClassLogger<BlockCachePreWarmer>();

public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, IReleaseSpec spec, CancellationToken cancellationToken = default, params ReadOnlySpan<IHasAccessList> systemAccessLists)
Expand Down Expand Up @@ -135,22 +135,27 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp

try
{
ParallelUnbalancedWork.For<BlockState>(0, block.Transactions.Length, parallelOptions, new(this, block, stateRoot, spec), static (i, state) =>
BlockState blockState = new(this, block, stateRoot, spec);
ParallelUnbalancedWork.For(
0,
block.Transactions.Length,
parallelOptions,
blockState.InitThreadState,
static (i, state) =>
{
IReadOnlyTxProcessorSource env = state.PreWarmer._envPool.Get();
Transaction? tx = null;
try
{
// If the transaction has already been processed or being processed, exit early
if (state.Block.TransactionProcessed > i) return state;

tx = state.Block.Transactions[i];
using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot);

Address senderAddress = tx.SenderAddress!;
if (!scope.WorldState.AccountExists(senderAddress))
IWorldState worldState = state.Scope.WorldState;
if (!worldState.AccountExists(senderAddress))
{
scope.WorldState.CreateAccountIfNotExists(senderAddress, UInt256.Zero);
worldState.CreateAccountIfNotExists(senderAddress, UInt256.Zero);
}

UInt256 nonceDelta = UInt256.Zero;
Expand All @@ -164,14 +169,14 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp

if (!nonceDelta.IsZero)
{
scope.WorldState.IncrementNonce(senderAddress, nonceDelta);
worldState.IncrementNonce(senderAddress, nonceDelta);
}

if (state.Spec.UseTxAccessLists)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
worldState.WarmUp(tx.AccessList); // eip-2930
}
TransactionResult result = scope.TransactionProcessor.Warmup(tx, new BlockExecutionContext(state.BlockHeader, state.Spec), NullTxTracer.Instance);
TransactionResult result = state.Scope.TransactionProcessor.Warmup(tx, new BlockExecutionContext(state.BlockHeader, state.Spec), NullTxTracer.Instance);
if (state.PreWarmer._logger.IsTrace) state.PreWarmer._logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
catch (Exception ex) when (ex is EvmException or OverflowException)
Expand All @@ -182,13 +187,10 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp
{
if (state.PreWarmer._logger.IsDebug) state.PreWarmer._logger.Error($"DEBUG/ERROR Error pre-warming cache {tx?.Hash}", ex);
}
finally
{
state.PreWarmer._envPool.Return(env);
}

return state;
});
},
BlockState.FinallyAction);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -345,13 +347,32 @@ private class ReadOnlyTxProcessingEnvPooledObjectPolicy(ReadOnlyTxProcessingEnvF
public bool Return(IReadOnlyTxProcessorSource obj) => true;
}

private readonly struct BlockState(BlockCachePreWarmer preWarmer, Block block, Hash256 stateRoot, IReleaseSpec spec)
private readonly struct BlockState(BlockCachePreWarmer preWarmer, Block block, Hash256 stateRoot, IReleaseSpec spec, IReadOnlyTxProcessorSource env = null, IReadOnlyTxProcessingScope scope = null)
{
public static Action<BlockState> FinallyAction { get; } = DisposeThreadState;

public readonly BlockCachePreWarmer PreWarmer = preWarmer;
public readonly Block Block = block;
public readonly Hash256 StateRoot = stateRoot;
public readonly IReleaseSpec Spec = spec;
public readonly BlockHeader BlockHeader => Block.Header;
public readonly IReadOnlyTxProcessorSource Env = env;
public readonly IReadOnlyTxProcessingScope Scope = scope;

public BlockState InitThreadState()
{
IReadOnlyTxProcessorSource env = PreWarmer._envPool.Get();
IReadOnlyTxProcessingScope scope = env.Build(StateRoot);
return new(PreWarmer, Block, StateRoot, Spec, env, scope);
}

public void Dispose()
{
Scope.Dispose();
PreWarmer._envPool.Return(Env);
}

private static void DisposeThreadState(BlockState state) => state.Dispose();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public static int GetRequestsByteSize(this IEnumerable<ExecutionRequest> request
public static Hash256 CalculateHashFromFlatEncodedRequests(byte[][]? flatEncodedRequests)
{
// make sure that length is 3 or less elements
if (flatEncodedRequests is null || flatEncodedRequests.Length > MaxRequestsCount)
if (flatEncodedRequests is null)
{
throw new ArgumentException("Flat encoded requests must be an array of 3 or less elements");
throw new ArgumentException("Flat encoded requests must be an array");
}

using SHA256 sha256 = SHA256.Create();
Expand Down
21 changes: 13 additions & 8 deletions src/Nethermind/Nethermind.Evm/BlobGasCalculator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,40 @@ public static bool TryCalculateFeePerBlobGas(ulong excessBlobGas, UInt256 blobGa
{
static bool FakeExponentialOverflow(UInt256 factor, UInt256 num, UInt256 denominator, out UInt256 feePerBlobGas)
{
UInt256 output = UInt256.Zero;

if (UInt256.MultiplyOverflow(factor, denominator, out UInt256 numAccum))
UInt256 accumulator;
if (factor == UInt256.One)
{
// Skip expensive 256bit multiplication if factor is 1
accumulator = denominator;
}
else if (UInt256.MultiplyOverflow(factor, denominator, out accumulator))
{
feePerBlobGas = UInt256.MaxValue;
return true;
}

for (UInt256 i = 1; numAccum > 0; i++)
UInt256 output = default;
for (ulong i = 1; !accumulator.IsZero; i++)
{
if (UInt256.AddOverflow(output, numAccum, out output))
if (UInt256.AddOverflow(output, accumulator, out output))
{
feePerBlobGas = UInt256.MaxValue;
return true;
}

if (UInt256.MultiplyOverflow(numAccum, num, out UInt256 updatedNumAccum))
if (UInt256.MultiplyOverflow(accumulator, num, out UInt256 updatedAccumulator))
{
feePerBlobGas = UInt256.MaxValue;
return true;
}

if (UInt256.MultiplyOverflow(i, denominator, out UInt256 multipliedDeniminator))
if (UInt256.MultiplyOverflow(i, denominator, out UInt256 multipliedDenominator))
{
feePerBlobGas = UInt256.MaxValue;
return true;
}

numAccum = updatedNumAccum / multipliedDeniminator;
accumulator = updatedAccumulator / multipliedDenominator;
}

feePerBlobGas = output / denominator;
Expand Down
47 changes: 22 additions & 25 deletions src/Nethermind/Nethermind.JsonRpc/Modules/Eth/EthRpcModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
using Nethermind.Wallet;
using Block = Nethermind.Core.Block;
using BlockHeader = Nethermind.Core.BlockHeader;
using ResultType = Nethermind.Core.ResultType;
using Signature = Nethermind.Core.Crypto.Signature;
using Transaction = Nethermind.Core.Transaction;

Expand Down Expand Up @@ -384,7 +385,7 @@ public ResultWrapper<BlockForRpc> eth_getBlockByNumber(BlockParameter blockParam
: new BlockForRpc(block, returnFullTransactionObjects, _specProvider));
}

public ResultWrapper<TransactionForRpc?> eth_getTransactionByHash(Hash256 transactionHash)
public virtual ResultWrapper<TransactionForRpc?> eth_getTransactionByHash(Hash256 transactionHash)
{
(TxReceipt? receipt, Transaction? transaction, UInt256? baseFee) = _blockchainBridge.GetTransaction(transactionHash, checkTxnPool: true);
if (transaction is null)
Expand All @@ -393,7 +394,7 @@ public ResultWrapper<BlockForRpc> eth_getBlockByNumber(BlockParameter blockParam
}

RecoverTxSenderIfNeeded(transaction);
TransactionForRpc transactionModel = TransactionForRpc.FromTransaction(transaction, receipt?.BlockHash, receipt?.BlockNumber, receipt?.Index, baseFee, specProvider.ChainId);
TransactionForRpc transactionModel = TransactionForRpc.FromTransaction(transaction, receipt?.BlockHash, receipt?.BlockNumber, receipt?.Index, baseFee, _specProvider.ChainId);
if (_logger.IsTrace) _logger.Trace($"eth_getTransactionByHash request {transactionHash}, result: {transactionModel.Hash}");
return ResultWrapper<TransactionForRpc?>.Success(transactionModel);
}
Expand All @@ -420,39 +421,39 @@ public ResultWrapper<TransactionForRpc[]> eth_pendingTransactions()
{
Transaction transaction = transactions[i];
RecoverTxSenderIfNeeded(transaction);
transactionsModels[i] = TransactionForRpc.FromTransaction(transaction, chainId: specProvider.ChainId);
transactionsModels[i] = TransactionForRpc.FromTransaction(transaction, chainId: _specProvider.ChainId);
transactionsModels[i].BlockHash = Keccak.Zero;
}

if (_logger.IsTrace) _logger.Trace($"eth_pendingTransactions request, result: {transactionsModels.Length}");
return ResultWrapper<TransactionForRpc[]>.Success(transactionsModels);
}

public ResultWrapper<TransactionForRpc> eth_getTransactionByBlockHashAndIndex(Hash256 blockHash,
UInt256 positionIndex)
public ResultWrapper<TransactionForRpc> eth_getTransactionByBlockHashAndIndex(Hash256 blockHash, UInt256 positionIndex)
{
SearchResult<Block> searchResult = _blockFinder.SearchForBlock(new BlockParameter(blockHash));
if (searchResult.IsError)
ResultWrapper<TransactionForRpc> result = GetTransactionByBlockAndIndex(new BlockParameter(blockHash), positionIndex);
if (result.Result.ResultType == ResultType.Success)
{
return GetFailureResult<TransactionForRpc, Block>(searchResult, _ethSyncingInfo.SyncMode.HaveNotSyncedBodiesYet());
if (_logger.IsTrace)
_logger.Trace(
$"eth_getTransactionByBlockHashAndIndex request {blockHash}, index: {positionIndex}, result: {result.Data.Hash}");
}
return result;
}

Block block = searchResult.Object;
if (positionIndex < 0 || positionIndex > block!.Transactions.Length - 1)
public ResultWrapper<TransactionForRpc> eth_getTransactionByBlockNumberAndIndex(BlockParameter blockParameter, UInt256 positionIndex)
{
ResultWrapper<TransactionForRpc> result = GetTransactionByBlockAndIndex(blockParameter, positionIndex);
if (result.Result.ResultType == ResultType.Success)
{
return ResultWrapper<TransactionForRpc>.Fail("Position Index is incorrect", ErrorCodes.InvalidParams);
if (_logger.IsTrace)
_logger.Trace(
$"eth_getTransactionByBlockNumberAndIndex request {blockParameter}, index: {positionIndex}, result: {result.Data.Hash}");
}

Transaction transaction = block.Transactions[(int)positionIndex];
RecoverTxSenderIfNeeded(transaction);

TransactionForRpc transactionModel = TransactionForRpc.FromTransaction(transaction, block.Hash, block.Number, (int)positionIndex, block.BaseFeePerGas, specProvider.ChainId);

return ResultWrapper<TransactionForRpc>.Success(transactionModel);
return result;
}

public ResultWrapper<TransactionForRpc> eth_getTransactionByBlockNumberAndIndex(BlockParameter blockParameter,
UInt256 positionIndex)
protected virtual ResultWrapper<TransactionForRpc> GetTransactionByBlockAndIndex(BlockParameter blockParameter, UInt256 positionIndex)
{
SearchResult<Block> searchResult = _blockFinder.SearchForBlock(blockParameter);
if (searchResult.IsError)
Expand All @@ -469,11 +470,7 @@ public ResultWrapper<TransactionForRpc> eth_getTransactionByBlockNumberAndIndex(
Transaction transaction = block.Transactions[(int)positionIndex];
RecoverTxSenderIfNeeded(transaction);

TransactionForRpc transactionModel = TransactionForRpc.FromTransaction(transaction, block.Hash, block.Number, (int)positionIndex, block.BaseFeePerGas, specProvider.ChainId);

if (_logger.IsDebug)
_logger.Debug(
$"eth_getTransactionByBlockNumberAndIndex request {blockParameter}, index: {positionIndex}, result: {transactionModel.Hash}");
TransactionForRpc transactionModel = TransactionForRpc.FromTransaction(transaction, block.Hash, block.Number, (int)positionIndex, block.BaseFeePerGas, _specProvider.ChainId);
return ResultWrapper<TransactionForRpc>.Success(transactionModel);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Linq;
using Nethermind.Core;
Expand Down Expand Up @@ -48,12 +49,6 @@ public ValidationResult ValidateParams(IReleaseSpec spec, int version, out strin
return ValidationResult.Fail;
}

if (ExecutionRequests.Length > ExecutionRequestExtensions.MaxRequestsCount)
{
error = $"Execution requests must have less than {ExecutionRequestExtensions.MaxRequestsCount} items";
return ValidationResult.Fail;
}

// verification of the requests
for (int i = 0; i < ExecutionRequests.Length; i++)
{
Expand Down
Loading

0 comments on commit e31659e

Please sign in to comment.