Skip to content

Commit

Permalink
Feature/range receipt migration (#8047)
Browse files Browse the repository at this point in the history
Co-authored-by: Ben Adams <[email protected]>
  • Loading branch information
asdacap and benaadams authored Jan 27, 2025
1 parent b41fda7 commit d76fed1
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace Nethermind.Blockchain.Receipts
{
public interface IReceiptsMigration
{
Task<bool> Run(long blockNumber);
Task<bool> Run(long from, long to);
}
}
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Core.Test/ProgressLoggerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void Print_Progress()

measuredProgress.LogProgress();

iLogger.Received(1).Info("Progress 1 / 100 ( 0.99 %) [⡆ ] queue 99 | skipped 90 Blk/s | current 10 Blk/s");
iLogger.Received(1).Info("Progress 1 / 100 ( 1.00 %) [⡆ ] queue 99 | skipped 90 Blk/s | current 10 Blk/s");
}

private ProgressLogger CreateProgress()
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Core/ProgressLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private string DefaultFormatter()

private static string GenerateReport(string prefix, long current, long total, long queue, decimal speed, decimal skippedPerSecond)
{
float percentage = Math.Clamp(current / (float)(total + 1), 0, 1);
float percentage = Math.Clamp(current / (float)(Math.Max(total, 1)), 0, 1);
string queuedStr = (queue >= 0 ? $" queue {queue,QueuePaddingLength:N0} | " : " ");
string skippedStr = (skippedPerSecond >= 0 ? $"skipped {skippedPerSecond,SkippedPaddingLength:N0} Blk/s | " : "");
string speedStr = $"current {speed,SpeedPaddingLength:N0} Blk/s";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,7 @@ public async Task Run(CancellationToken cancellationToken)
{
if (_bloomConfig.Migration)
{
if (!CanMigrate(_api.SyncModeSelector.Current))
{
await Wait.ForEventCondition<SyncModeChangedEventArgs>(
cancellationToken,
(d) => _api.SyncModeSelector.Changed += d,
(d) => _api.SyncModeSelector.Changed -= d,
(arg) => CanMigrate(arg.Current));
}
await _api.SyncModeSelector.WaitUntilMode(CanMigrate, cancellationToken);

_stopwatch = Stopwatch.StartNew();
try
Expand Down
101 changes: 41 additions & 60 deletions src/Nethermind/Nethermind.Init/Steps/Migrations/ReceiptMigration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
using Nethermind.Blockchain.Receipts;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Events;
using Nethermind.Core.Extensions;
using Nethermind.Db;
using Nethermind.Int256;
Expand All @@ -33,8 +32,6 @@ public class ReceiptMigration : IDatabaseMigration, IReceiptsMigration
private readonly ILogger _logger;
private CancellationTokenSource? _cancellationTokenSource;
internal Task? _migrationTask;
private Stopwatch? _stopwatch;
private long _toBlock;

private readonly ProgressLogger _progressLogger;
[NotNull]
Expand Down Expand Up @@ -89,13 +86,24 @@ ILogManager logManager
_progressLogger = new ProgressLogger("Receipts migration", logManager);
}

public async Task<bool> Run(long blockNumber)
// Actually start running it.
public async Task<bool> Run(long from, long to)
{
_cancellationTokenSource?.Cancel();
await (_migrationTask ?? Task.CompletedTask);
try
{
await (_migrationTask ?? Task.CompletedTask);
}
catch (OperationCanceledException)
{
}

_cancellationTokenSource = new CancellationTokenSource();
_receiptStorage.MigratedBlockNumber = Math.Min(Math.Max(_receiptStorage.MigratedBlockNumber, blockNumber), (_blockTree.Head?.Number ?? 0) + 1);
_migrationTask = DoRun(_cancellationTokenSource.Token);
_migrationTask = Task.Run(async () =>
{
await _syncModeSelector.WaitUntilMode(CanMigrate, _cancellationTokenSource.Token);
RunMigration(from, to, false, _cancellationTokenSource.Token);
});
return _receiptConfig.StoreReceipts && _receiptConfig.ReceiptsMigration;
}
public async Task Run(CancellationToken cancellationToken)
Expand All @@ -105,25 +113,9 @@ public async Task Run(CancellationToken cancellationToken)
if (_receiptConfig.ReceiptsMigration)
{
ResetMigrationIndexIfNeeded();
await DoRun(cancellationToken);
}
}
}

private async Task DoRun(CancellationToken cancellationToken)
{
if (_receiptConfig.StoreReceipts)
{
if (!CanMigrate(_syncModeSelector.Current))
{
await Wait.ForEventCondition<SyncModeChangedEventArgs>(
cancellationToken,
(e) => _syncModeSelector.Changed += e,
(e) => _syncModeSelector.Changed -= e,
(arg) => CanMigrate(arg.Current));
await _syncModeSelector.WaitUntilMode(CanMigrate, cancellationToken);
RunIfNeeded(cancellationToken);
}

RunIfNeeded(cancellationToken);
}
}

Expand All @@ -137,21 +129,16 @@ private void RunIfNeeded(CancellationToken cancellationToken)
? _blockTree.Head?.Number ?? 0
: _blockTree.BestKnownNumber
: _receiptStorage.MigratedBlockNumber - 1;
_toBlock = migrateToBlockNumber;

_logger.Warn($"Running migration to {_toBlock}");

if (_toBlock > 0)
if (migrateToBlockNumber > 0)
{
_stopwatch = Stopwatch.StartNew();
try
{
RunMigration(cancellationToken);
RunMigration(0, migrateToBlockNumber, true, cancellationToken);
}
catch (Exception e)
{
_stopwatch.Stop();
_logger.Error(GetLogMessage("failed", $"Error: {e}"), e);
_logger.Error("Error running receipt migration", e);
}
}
else
Expand All @@ -160,19 +147,20 @@ private void RunIfNeeded(CancellationToken cancellationToken)
}
}

private void RunMigration(CancellationToken token)
private void RunMigration(long from, long to, bool updateReceiptMigrationPointer, CancellationToken token)
{
long synced = 1;
from = Math.Min(from, to);
long synced = 0;

_progressLogger.Reset(synced, _toBlock);
if (_logger.IsWarn) _logger.Warn($"Running migration from {from} to {to}");

if (_logger.IsInfo) _logger.Info(GetLogMessage("started"));
_progressLogger.Reset(synced, to - from + 1);

using Timer timer = new(1000);
timer.Enabled = true;
timer.Elapsed += (_, _) =>
{
if (_logger.IsInfo) _logger.Info(GetLogMessage("in progress"));
_progressLogger.LogProgress();
};

try
Expand All @@ -183,7 +171,8 @@ private void RunMigration(CancellationToken token)
parallelism = Environment.ProcessorCount;
}

GetBlockBodiesForMigration(token).AsParallel().WithDegreeOfParallelism(parallelism).ForAll((item) =>
GetBlockBodiesForMigration(from, to, updateReceiptMigrationPointer, token)
.AsParallel().WithDegreeOfParallelism(parallelism).ForAll((item) =>
{
(long blockNum, Hash256 blockHash) = item;
Block? block = _blockTree.FindBlock(blockHash!, BlockTreeLookupOptions.None);
Expand All @@ -204,30 +193,29 @@ private void RunMigration(CancellationToken token)

if (!token.IsCancellationRequested)
{
if (_logger.IsInfo) _logger.Info(GetLogMessage("Compacting receipts database"));
if (_logger.IsInfo) _logger.Info("Compacting receipts database");
_receiptsDb.Compact();
if (_logger.IsInfo) _logger.Info(GetLogMessage("Compacting receipts tx index database"));
if (_logger.IsInfo) _logger.Info("Compacting receipts tx index database");
_txIndexDb.Compact();
if (_logger.IsInfo) _logger.Info(GetLogMessage("Compacting receipts block database"));
if (_logger.IsInfo) _logger.Info("Compacting receipts block database");
_receiptsBlockDb.Compact();
}
}
finally
{
_progressLogger.MarkEnd();
_stopwatch?.Stop();
timer.Stop();
}

if (!token.IsCancellationRequested)
{
if (_logger.IsInfo) _logger.Info(GetLogMessage("finished"));
if (_logger.IsInfo) _logger.Info("Receipt migration finished");
}
}

Block GetMissingBlock(long i, Hash256? blockHash)
{
if (_logger.IsDebug) _logger.Debug(GetLogMessage("warning", $"Block {i} not found. Logs will not be searchable for this block."));
if (_logger.IsDebug) _logger.Debug($"Block {i} not found. Logs will not be searchable for this block.");
Block emptyBlock = EmptyBlock.Get();
emptyBlock.Header.Number = i;
emptyBlock.Header.Hash = blockHash;
Expand All @@ -239,7 +227,7 @@ static void ReturnMissingBlock(Block emptyBlock)
EmptyBlock.Return(emptyBlock);
}

IEnumerable<(long, Hash256)> GetBlockBodiesForMigration(CancellationToken token)
IEnumerable<(long, Hash256)> GetBlockBodiesForMigration(long from, long to, bool updateReceiptMigrationPointer, CancellationToken token)
{
bool TryGetMainChainBlockHashFromLevel(long number, out Hash256? blockHash)
{
Expand All @@ -266,11 +254,11 @@ bool TryGetMainChainBlockHashFromLevel(long number, out Hash256? blockHash)
}
}

for (long i = _toBlock - 1; i > 0; i--)
for (long i = to; i >= from; i--)
{
if (token.IsCancellationRequested)
{
if (_logger.IsInfo) _logger.Info(GetLogMessage("cancelled"));
if (_logger.IsInfo) _logger.Info("Receipt migration cancelled");
yield break;
}

Expand All @@ -279,7 +267,7 @@ bool TryGetMainChainBlockHashFromLevel(long number, out Hash256? blockHash)
yield return (i, blockHash!);
}

if (_receiptStorage.MigratedBlockNumber > i)
if (updateReceiptMigrationPointer && _receiptStorage.MigratedBlockNumber > i)
{
_receiptStorage.MigratedBlockNumber = i;
}
Expand All @@ -295,11 +283,12 @@ private void MigrateBlock(Block block)

if (notNullReceipts.Length == 0) return;

// This should set the new rlp and tx index depending on config.
_receiptStorage.Insert(block, notNullReceipts);

// I guess some old schema need this
// It used to be that the tx index is stored in the default column so we are moving it into transactions column
{
using IWriteBatch writeBatch = _receiptsDb.StartWriteBatch().GetColumnBatch(ReceiptsColumns.Transactions);
using IWriteBatch writeBatch = _receiptsDb.StartWriteBatch().GetColumnBatch(ReceiptsColumns.Default);
for (int i = 0; i < notNullReceipts.Length; i++)
{
writeBatch[notNullReceipts[i].TxHash!.Bytes] = null;
Expand All @@ -324,8 +313,7 @@ private void MigrateBlock(Block block)
if (notNullReceipts.Length != receipts.Length)
{
if (_logger.IsWarn)
_logger.Warn(GetLogMessage("warning",
$"Block {block.ToString(Block.Format.FullHashAndNumber)} is missing {receipts.Length - notNullReceipts.Length} of {receipts.Length} receipts!"));
_logger.Warn($"Block {block.ToString(Block.Format.FullHashAndNumber)} is missing {receipts.Length - notNullReceipts.Length} of {receipts.Length} receipts!");
}
}

Expand Down Expand Up @@ -382,13 +370,6 @@ private bool IsMigrationNeeded(long blockNumber, Hash256 blockHash, TxReceipt[]
return _receiptConfig.CompactReceiptStore != isCompactEncoding;
}

private string GetLogMessage(string status, string? suffix = null)
{
string message = $"ReceiptsDb migration {status} | {_stopwatch?.Elapsed:d\\:hh\\:mm\\:ss} | {_progressLogger.CurrentValue.ToString().PadLeft(_toBlock.ToString().Length)} / {_toBlock} blocks migrated. | current {_progressLogger.CurrentPerSecond:F2} Blk/s | total {_progressLogger.TotalPerSecond:F2} Blk/s. {suffix}";
_progressLogger.SetMeasuringPoint();
return message;
}

private class EmptyBlockObjectPolicy : IPooledObjectPolicy<Block>
{
public Block Create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public void Debug_traceCall_test()
[Test]
public async Task Migrate_receipts()
{
debugBridge.MigrateReceipts(Arg.Any<long>()).Returns(true);
debugBridge.MigrateReceipts(Arg.Any<long>(), Arg.Any<long>()).Returns(true);
IDebugRpcModule rpcModule = new DebugRpcModule(LimboLogs.Instance, debugBridge, jsonRpcConfig, specProvider);
string response = await RpcTest.TestSerializedRequest(rpcModule, "debug_migrateReceipts", 100);
Assert.That(response, Is.Not.Null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public DebugBridge(

public void UpdateHeadBlock(Hash256 blockHash) => _blockTree.UpdateHeadBlock(blockHash);

public Task<bool> MigrateReceipts(long blockNumber)
=> _receiptsMigration.Run(blockNumber + 1); // add 1 to make go from inclusive (better for API) to exclusive (better for internal)
public Task<bool> MigrateReceipts(long from, long to) => _receiptsMigration.Run(from, to);

public void InsertReceipts(BlockParameter blockParameter, TxReceipt[] txReceipts)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ public ResultWrapper<GethLikeTxTrace> debug_traceTransactionInBlockByIndex(byte[
return ResultWrapper<GethLikeTxTrace>.Success(transactionTrace);
}

public async Task<ResultWrapper<bool>> debug_migrateReceipts(long blockNumber) =>
ResultWrapper<bool>.Success(await _debugBridge.MigrateReceipts(blockNumber));
public async Task<ResultWrapper<bool>> debug_migrateReceipts(long from, long to) =>
ResultWrapper<bool>.Success(await _debugBridge.MigrateReceipts(from, to));

public Task<ResultWrapper<bool>> debug_insertReceipts(BlockParameter blockParameter, ReceiptForRpc[] receiptForRpc)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface IDebugBridge
ChainLevelInfo GetLevelInfo(long number);
int DeleteChainSlice(long startNumber, bool force = false);
void UpdateHeadBlock(Hash256 blockHash);
Task<bool> MigrateReceipts(long blockNumber);
Task<bool> MigrateReceipts(long from, long to);
void InsertReceipts(BlockParameter blockParameter, TxReceipt[] receipts);
SyncReportSymmary GetCurrentSyncStage();
IEnumerable<string> TraceBlockToFile(Hash256 blockHash, CancellationToken cancellationToken, GethTraceOptions? gethTraceOptions = null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public interface IDebugRpcModule : IRpcModule
ResultWrapper<GethLikeTxTrace> debug_traceTransactionInBlockByIndex(byte[] blockRlp, int txIndex, GethTraceOptions options = null);

[JsonRpcMethod(Description = "Sets the block number up to which receipts will be migrated to (Nethermind specific).")]
Task<ResultWrapper<bool>> debug_migrateReceipts(long blockNumber);
Task<ResultWrapper<bool>> debug_migrateReceipts(long from, long to);

[JsonRpcMethod(Description = "Insert receipts for the block after verifying receipts root correctness.")]
Task<ResultWrapper<bool>> debug_insertReceipts(BlockParameter blockParameter, ReceiptForRpc[] receiptForRpc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

namespace Nethermind.Runner.Test.Ethereum.Steps.Migrations
{
[TestFixture]
public class ReceiptMigrationTests
{
[TestCase(null, 0, false, false, false, false)] // No change to migrate
Expand Down Expand Up @@ -71,6 +70,7 @@ public async Task RunMigration(int? commandStartBlockNumber, long currentMigrate
TestMemColumnsDb<ReceiptsColumns> receiptColumnDb = new();
TestMemDb blocksDb = (TestMemDb)receiptColumnDb.GetColumnDb(ReceiptsColumns.Blocks);
TestMemDb txDb = (TestMemDb)receiptColumnDb.GetColumnDb(ReceiptsColumns.Transactions);
TestMemDb defaultDb = (TestMemDb)receiptColumnDb.GetColumnDb(ReceiptsColumns.Default);

// Put the last block receipt encoding
Block lastBlock = blockTree.FindBlock(chainLength - 1);
Expand All @@ -93,21 +93,20 @@ public async Task RunMigration(int? commandStartBlockNumber, long currentMigrate

if (commandStartBlockNumber.HasValue)
{
_ = migration.Run(commandStartBlockNumber.Value);
_ = migration.Run(0, commandStartBlockNumber.Value);
await migration._migrationTask!;
}
else
{
await migration.Run(CancellationToken.None);
Assert.That(() => outMemoryReceiptStorage.MigratedBlockNumber, Is.InRange(0, 1).After(1000, 10));
}

Assert.That(() => outMemoryReceiptStorage.MigratedBlockNumber, Is.InRange(0, 1).After(1000, 10));

if (wasMigrated)
{
int blockNum = (commandStartBlockNumber ?? chainLength) - 1 - 1;
int blockNum = commandStartBlockNumber ?? (chainLength - 1);
int txCount = blockNum * 2;
txDb.KeyWasWritten((item => item.Item2 is null), txCount);
defaultDb.KeyWasWritten((item => item.Item2 is null), txCount);
((TestMemDb)receiptColumnDb.GetColumnDb(ReceiptsColumns.Blocks)).KeyWasRemoved((_ => true), blockNum);
outMemoryReceiptStorage.Count.Should().Be(txCount);
}
Expand Down
Loading

0 comments on commit d76fed1

Please sign in to comment.