From d76fed1b381fdd98fc677422603fcd29828049a9 Mon Sep 17 00:00:00 2001 From: Amirul Ashraf Date: Mon, 27 Jan 2025 17:30:56 +0800 Subject: [PATCH] Feature/range receipt migration (#8047) Co-authored-by: Ben Adams --- .../Receipts/IReceiptsMigration.cs | 2 +- .../ProgressLoggerTests.cs | 2 +- .../Nethermind.Core/ProgressLogger.cs | 2 +- .../Steps/Migrations/BloomMigration.cs | 9 +- .../Steps/Migrations/ReceiptMigration.cs | 101 +++++++----------- .../Modules/DebugModuleTests.cs | 2 +- .../Modules/DebugModule/DebugBridge.cs | 3 +- .../Modules/DebugModule/DebugRpcModule.cs | 4 +- .../Modules/DebugModule/IDebugBridge.cs | 2 +- .../Modules/DebugModule/IDebugRpcModule.cs | 2 +- .../Steps/Migrations/ReceiptMigrationTests.cs | 11 +- .../ParallelSync/ISyncModeSelector.cs | 14 +++ 12 files changed, 70 insertions(+), 84 deletions(-) diff --git a/src/Nethermind/Nethermind.Blockchain/Receipts/IReceiptsMigration.cs b/src/Nethermind/Nethermind.Blockchain/Receipts/IReceiptsMigration.cs index 5944c305c36..d4e46d012b6 100644 --- a/src/Nethermind/Nethermind.Blockchain/Receipts/IReceiptsMigration.cs +++ b/src/Nethermind/Nethermind.Blockchain/Receipts/IReceiptsMigration.cs @@ -7,6 +7,6 @@ namespace Nethermind.Blockchain.Receipts { public interface IReceiptsMigration { - Task Run(long blockNumber); + Task Run(long from, long to); } } diff --git a/src/Nethermind/Nethermind.Core.Test/ProgressLoggerTests.cs b/src/Nethermind/Nethermind.Core.Test/ProgressLoggerTests.cs index 80508e01489..0280da31aa4 100644 --- a/src/Nethermind/Nethermind.Core.Test/ProgressLoggerTests.cs +++ b/src/Nethermind/Nethermind.Core.Test/ProgressLoggerTests.cs @@ -168,7 +168,7 @@ public void Print_Progress() measuredProgress.LogProgress(); - iLogger.Received(1).Info("Progress 1 / 100 ( 0.99 %) [⡆ ] queue 99 | skipped 90 Blk/s | current 10 Blk/s"); + iLogger.Received(1).Info("Progress 1 / 100 ( 1.00 %) [⡆ ] queue 99 | skipped 90 Blk/s | current 10 Blk/s"); } private ProgressLogger CreateProgress() diff --git a/src/Nethermind/Nethermind.Core/ProgressLogger.cs b/src/Nethermind/Nethermind.Core/ProgressLogger.cs index ae6a0db883a..f674106dab0 100644 --- a/src/Nethermind/Nethermind.Core/ProgressLogger.cs +++ b/src/Nethermind/Nethermind.Core/ProgressLogger.cs @@ -179,7 +179,7 @@ private string DefaultFormatter() private static string GenerateReport(string prefix, long current, long total, long queue, decimal speed, decimal skippedPerSecond) { - float percentage = Math.Clamp(current / (float)(total + 1), 0, 1); + float percentage = Math.Clamp(current / (float)(Math.Max(total, 1)), 0, 1); string queuedStr = (queue >= 0 ? $" queue {queue,QueuePaddingLength:N0} | " : " "); string skippedStr = (skippedPerSecond >= 0 ? $"skipped {skippedPerSecond,SkippedPaddingLength:N0} Blk/s | " : ""); string speedStr = $"current {speed,SpeedPaddingLength:N0} Blk/s"; diff --git a/src/Nethermind/Nethermind.Init/Steps/Migrations/BloomMigration.cs b/src/Nethermind/Nethermind.Init/Steps/Migrations/BloomMigration.cs index dbdbec216ab..a6586ee2c03 100644 --- a/src/Nethermind/Nethermind.Init/Steps/Migrations/BloomMigration.cs +++ b/src/Nethermind/Nethermind.Init/Steps/Migrations/BloomMigration.cs @@ -54,14 +54,7 @@ public async Task Run(CancellationToken cancellationToken) { if (_bloomConfig.Migration) { - if (!CanMigrate(_api.SyncModeSelector.Current)) - { - await Wait.ForEventCondition( - cancellationToken, - (d) => _api.SyncModeSelector.Changed += d, - (d) => _api.SyncModeSelector.Changed -= d, - (arg) => CanMigrate(arg.Current)); - } + await _api.SyncModeSelector.WaitUntilMode(CanMigrate, cancellationToken); _stopwatch = Stopwatch.StartNew(); try diff --git a/src/Nethermind/Nethermind.Init/Steps/Migrations/ReceiptMigration.cs b/src/Nethermind/Nethermind.Init/Steps/Migrations/ReceiptMigration.cs index c77c0297f18..169c2f2daa5 100644 --- a/src/Nethermind/Nethermind.Init/Steps/Migrations/ReceiptMigration.cs +++ b/src/Nethermind/Nethermind.Init/Steps/Migrations/ReceiptMigration.cs @@ -14,7 +14,6 @@ using Nethermind.Blockchain.Receipts; using Nethermind.Core; using Nethermind.Core.Crypto; -using Nethermind.Core.Events; using Nethermind.Core.Extensions; using Nethermind.Db; using Nethermind.Int256; @@ -33,8 +32,6 @@ public class ReceiptMigration : IDatabaseMigration, IReceiptsMigration private readonly ILogger _logger; private CancellationTokenSource? _cancellationTokenSource; internal Task? _migrationTask; - private Stopwatch? _stopwatch; - private long _toBlock; private readonly ProgressLogger _progressLogger; [NotNull] @@ -89,13 +86,24 @@ ILogManager logManager _progressLogger = new ProgressLogger("Receipts migration", logManager); } - public async Task Run(long blockNumber) + // Actually start running it. + public async Task Run(long from, long to) { _cancellationTokenSource?.Cancel(); - await (_migrationTask ?? Task.CompletedTask); + try + { + await (_migrationTask ?? Task.CompletedTask); + } + catch (OperationCanceledException) + { + } + _cancellationTokenSource = new CancellationTokenSource(); - _receiptStorage.MigratedBlockNumber = Math.Min(Math.Max(_receiptStorage.MigratedBlockNumber, blockNumber), (_blockTree.Head?.Number ?? 0) + 1); - _migrationTask = DoRun(_cancellationTokenSource.Token); + _migrationTask = Task.Run(async () => + { + await _syncModeSelector.WaitUntilMode(CanMigrate, _cancellationTokenSource.Token); + RunMigration(from, to, false, _cancellationTokenSource.Token); + }); return _receiptConfig.StoreReceipts && _receiptConfig.ReceiptsMigration; } public async Task Run(CancellationToken cancellationToken) @@ -105,25 +113,9 @@ public async Task Run(CancellationToken cancellationToken) if (_receiptConfig.ReceiptsMigration) { ResetMigrationIndexIfNeeded(); - await DoRun(cancellationToken); - } - } - } - - private async Task DoRun(CancellationToken cancellationToken) - { - if (_receiptConfig.StoreReceipts) - { - if (!CanMigrate(_syncModeSelector.Current)) - { - await Wait.ForEventCondition( - cancellationToken, - (e) => _syncModeSelector.Changed += e, - (e) => _syncModeSelector.Changed -= e, - (arg) => CanMigrate(arg.Current)); + await _syncModeSelector.WaitUntilMode(CanMigrate, cancellationToken); + RunIfNeeded(cancellationToken); } - - RunIfNeeded(cancellationToken); } } @@ -137,21 +129,16 @@ private void RunIfNeeded(CancellationToken cancellationToken) ? _blockTree.Head?.Number ?? 0 : _blockTree.BestKnownNumber : _receiptStorage.MigratedBlockNumber - 1; - _toBlock = migrateToBlockNumber; - - _logger.Warn($"Running migration to {_toBlock}"); - if (_toBlock > 0) + if (migrateToBlockNumber > 0) { - _stopwatch = Stopwatch.StartNew(); try { - RunMigration(cancellationToken); + RunMigration(0, migrateToBlockNumber, true, cancellationToken); } catch (Exception e) { - _stopwatch.Stop(); - _logger.Error(GetLogMessage("failed", $"Error: {e}"), e); + _logger.Error("Error running receipt migration", e); } } else @@ -160,19 +147,20 @@ private void RunIfNeeded(CancellationToken cancellationToken) } } - private void RunMigration(CancellationToken token) + private void RunMigration(long from, long to, bool updateReceiptMigrationPointer, CancellationToken token) { - long synced = 1; + from = Math.Min(from, to); + long synced = 0; - _progressLogger.Reset(synced, _toBlock); + if (_logger.IsWarn) _logger.Warn($"Running migration from {from} to {to}"); - if (_logger.IsInfo) _logger.Info(GetLogMessage("started")); + _progressLogger.Reset(synced, to - from + 1); using Timer timer = new(1000); timer.Enabled = true; timer.Elapsed += (_, _) => { - if (_logger.IsInfo) _logger.Info(GetLogMessage("in progress")); + _progressLogger.LogProgress(); }; try @@ -183,7 +171,8 @@ private void RunMigration(CancellationToken token) parallelism = Environment.ProcessorCount; } - GetBlockBodiesForMigration(token).AsParallel().WithDegreeOfParallelism(parallelism).ForAll((item) => + GetBlockBodiesForMigration(from, to, updateReceiptMigrationPointer, token) + .AsParallel().WithDegreeOfParallelism(parallelism).ForAll((item) => { (long blockNum, Hash256 blockHash) = item; Block? block = _blockTree.FindBlock(blockHash!, BlockTreeLookupOptions.None); @@ -204,30 +193,29 @@ private void RunMigration(CancellationToken token) if (!token.IsCancellationRequested) { - if (_logger.IsInfo) _logger.Info(GetLogMessage("Compacting receipts database")); + if (_logger.IsInfo) _logger.Info("Compacting receipts database"); _receiptsDb.Compact(); - if (_logger.IsInfo) _logger.Info(GetLogMessage("Compacting receipts tx index database")); + if (_logger.IsInfo) _logger.Info("Compacting receipts tx index database"); _txIndexDb.Compact(); - if (_logger.IsInfo) _logger.Info(GetLogMessage("Compacting receipts block database")); + if (_logger.IsInfo) _logger.Info("Compacting receipts block database"); _receiptsBlockDb.Compact(); } } finally { _progressLogger.MarkEnd(); - _stopwatch?.Stop(); timer.Stop(); } if (!token.IsCancellationRequested) { - if (_logger.IsInfo) _logger.Info(GetLogMessage("finished")); + if (_logger.IsInfo) _logger.Info("Receipt migration finished"); } } Block GetMissingBlock(long i, Hash256? blockHash) { - if (_logger.IsDebug) _logger.Debug(GetLogMessage("warning", $"Block {i} not found. Logs will not be searchable for this block.")); + if (_logger.IsDebug) _logger.Debug($"Block {i} not found. Logs will not be searchable for this block."); Block emptyBlock = EmptyBlock.Get(); emptyBlock.Header.Number = i; emptyBlock.Header.Hash = blockHash; @@ -239,7 +227,7 @@ static void ReturnMissingBlock(Block emptyBlock) EmptyBlock.Return(emptyBlock); } - IEnumerable<(long, Hash256)> GetBlockBodiesForMigration(CancellationToken token) + IEnumerable<(long, Hash256)> GetBlockBodiesForMigration(long from, long to, bool updateReceiptMigrationPointer, CancellationToken token) { bool TryGetMainChainBlockHashFromLevel(long number, out Hash256? blockHash) { @@ -266,11 +254,11 @@ bool TryGetMainChainBlockHashFromLevel(long number, out Hash256? blockHash) } } - for (long i = _toBlock - 1; i > 0; i--) + for (long i = to; i >= from; i--) { if (token.IsCancellationRequested) { - if (_logger.IsInfo) _logger.Info(GetLogMessage("cancelled")); + if (_logger.IsInfo) _logger.Info("Receipt migration cancelled"); yield break; } @@ -279,7 +267,7 @@ bool TryGetMainChainBlockHashFromLevel(long number, out Hash256? blockHash) yield return (i, blockHash!); } - if (_receiptStorage.MigratedBlockNumber > i) + if (updateReceiptMigrationPointer && _receiptStorage.MigratedBlockNumber > i) { _receiptStorage.MigratedBlockNumber = i; } @@ -295,11 +283,12 @@ private void MigrateBlock(Block block) if (notNullReceipts.Length == 0) return; + // This should set the new rlp and tx index depending on config. _receiptStorage.Insert(block, notNullReceipts); - // I guess some old schema need this + // It used to be that the tx index is stored in the default column so we are moving it into transactions column { - using IWriteBatch writeBatch = _receiptsDb.StartWriteBatch().GetColumnBatch(ReceiptsColumns.Transactions); + using IWriteBatch writeBatch = _receiptsDb.StartWriteBatch().GetColumnBatch(ReceiptsColumns.Default); for (int i = 0; i < notNullReceipts.Length; i++) { writeBatch[notNullReceipts[i].TxHash!.Bytes] = null; @@ -324,8 +313,7 @@ private void MigrateBlock(Block block) if (notNullReceipts.Length != receipts.Length) { if (_logger.IsWarn) - _logger.Warn(GetLogMessage("warning", - $"Block {block.ToString(Block.Format.FullHashAndNumber)} is missing {receipts.Length - notNullReceipts.Length} of {receipts.Length} receipts!")); + _logger.Warn($"Block {block.ToString(Block.Format.FullHashAndNumber)} is missing {receipts.Length - notNullReceipts.Length} of {receipts.Length} receipts!"); } } @@ -382,13 +370,6 @@ private bool IsMigrationNeeded(long blockNumber, Hash256 blockHash, TxReceipt[] return _receiptConfig.CompactReceiptStore != isCompactEncoding; } - private string GetLogMessage(string status, string? suffix = null) - { - string message = $"ReceiptsDb migration {status} | {_stopwatch?.Elapsed:d\\:hh\\:mm\\:ss} | {_progressLogger.CurrentValue.ToString().PadLeft(_toBlock.ToString().Length)} / {_toBlock} blocks migrated. | current {_progressLogger.CurrentPerSecond:F2} Blk/s | total {_progressLogger.TotalPerSecond:F2} Blk/s. {suffix}"; - _progressLogger.SetMeasuringPoint(); - return message; - } - private class EmptyBlockObjectPolicy : IPooledObjectPolicy { public Block Create() diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/Modules/DebugModuleTests.cs b/src/Nethermind/Nethermind.JsonRpc.Test/Modules/DebugModuleTests.cs index 0cea4e87063..2cf6ecaadbe 100644 --- a/src/Nethermind/Nethermind.JsonRpc.Test/Modules/DebugModuleTests.cs +++ b/src/Nethermind/Nethermind.JsonRpc.Test/Modules/DebugModuleTests.cs @@ -392,7 +392,7 @@ public void Debug_traceCall_test() [Test] public async Task Migrate_receipts() { - debugBridge.MigrateReceipts(Arg.Any()).Returns(true); + debugBridge.MigrateReceipts(Arg.Any(), Arg.Any()).Returns(true); IDebugRpcModule rpcModule = new DebugRpcModule(LimboLogs.Instance, debugBridge, jsonRpcConfig, specProvider); string response = await RpcTest.TestSerializedRequest(rpcModule, "debug_migrateReceipts", 100); Assert.That(response, Is.Not.Null); diff --git a/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/DebugBridge.cs b/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/DebugBridge.cs index df9dea1b87f..2212eef4799 100644 --- a/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/DebugBridge.cs +++ b/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/DebugBridge.cs @@ -91,8 +91,7 @@ public DebugBridge( public void UpdateHeadBlock(Hash256 blockHash) => _blockTree.UpdateHeadBlock(blockHash); - public Task MigrateReceipts(long blockNumber) - => _receiptsMigration.Run(blockNumber + 1); // add 1 to make go from inclusive (better for API) to exclusive (better for internal) + public Task MigrateReceipts(long from, long to) => _receiptsMigration.Run(from, to); public void InsertReceipts(BlockParameter blockParameter, TxReceipt[] txReceipts) { diff --git a/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/DebugRpcModule.cs b/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/DebugRpcModule.cs index d72a55b372d..c308901b87c 100644 --- a/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/DebugRpcModule.cs +++ b/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/DebugRpcModule.cs @@ -145,8 +145,8 @@ public ResultWrapper debug_traceTransactionInBlockByIndex(byte[ return ResultWrapper.Success(transactionTrace); } - public async Task> debug_migrateReceipts(long blockNumber) => - ResultWrapper.Success(await _debugBridge.MigrateReceipts(blockNumber)); + public async Task> debug_migrateReceipts(long from, long to) => + ResultWrapper.Success(await _debugBridge.MigrateReceipts(from, to)); public Task> debug_insertReceipts(BlockParameter blockParameter, ReceiptForRpc[] receiptForRpc) { diff --git a/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/IDebugBridge.cs b/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/IDebugBridge.cs index 9b8db38d2cf..67913a7a933 100644 --- a/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/IDebugBridge.cs +++ b/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/IDebugBridge.cs @@ -31,7 +31,7 @@ public interface IDebugBridge ChainLevelInfo GetLevelInfo(long number); int DeleteChainSlice(long startNumber, bool force = false); void UpdateHeadBlock(Hash256 blockHash); - Task MigrateReceipts(long blockNumber); + Task MigrateReceipts(long from, long to); void InsertReceipts(BlockParameter blockParameter, TxReceipt[] receipts); SyncReportSymmary GetCurrentSyncStage(); IEnumerable TraceBlockToFile(Hash256 blockHash, CancellationToken cancellationToken, GethTraceOptions? gethTraceOptions = null); diff --git a/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/IDebugRpcModule.cs b/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/IDebugRpcModule.cs index f8c1129cddc..dce860a1a30 100644 --- a/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/IDebugRpcModule.cs +++ b/src/Nethermind/Nethermind.JsonRpc/Modules/DebugModule/IDebugRpcModule.cs @@ -85,7 +85,7 @@ public interface IDebugRpcModule : IRpcModule ResultWrapper debug_traceTransactionInBlockByIndex(byte[] blockRlp, int txIndex, GethTraceOptions options = null); [JsonRpcMethod(Description = "Sets the block number up to which receipts will be migrated to (Nethermind specific).")] - Task> debug_migrateReceipts(long blockNumber); + Task> debug_migrateReceipts(long from, long to); [JsonRpcMethod(Description = "Insert receipts for the block after verifying receipts root correctness.")] Task> debug_insertReceipts(BlockParameter blockParameter, ReceiptForRpc[] receiptForRpc); diff --git a/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/Migrations/ReceiptMigrationTests.cs b/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/Migrations/ReceiptMigrationTests.cs index 040ca87a360..daa8b534994 100644 --- a/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/Migrations/ReceiptMigrationTests.cs +++ b/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/Migrations/ReceiptMigrationTests.cs @@ -23,7 +23,6 @@ namespace Nethermind.Runner.Test.Ethereum.Steps.Migrations { - [TestFixture] public class ReceiptMigrationTests { [TestCase(null, 0, false, false, false, false)] // No change to migrate @@ -71,6 +70,7 @@ public async Task RunMigration(int? commandStartBlockNumber, long currentMigrate TestMemColumnsDb receiptColumnDb = new(); TestMemDb blocksDb = (TestMemDb)receiptColumnDb.GetColumnDb(ReceiptsColumns.Blocks); TestMemDb txDb = (TestMemDb)receiptColumnDb.GetColumnDb(ReceiptsColumns.Transactions); + TestMemDb defaultDb = (TestMemDb)receiptColumnDb.GetColumnDb(ReceiptsColumns.Default); // Put the last block receipt encoding Block lastBlock = blockTree.FindBlock(chainLength - 1); @@ -93,21 +93,20 @@ public async Task RunMigration(int? commandStartBlockNumber, long currentMigrate if (commandStartBlockNumber.HasValue) { - _ = migration.Run(commandStartBlockNumber.Value); + _ = migration.Run(0, commandStartBlockNumber.Value); await migration._migrationTask!; } else { await migration.Run(CancellationToken.None); + Assert.That(() => outMemoryReceiptStorage.MigratedBlockNumber, Is.InRange(0, 1).After(1000, 10)); } - Assert.That(() => outMemoryReceiptStorage.MigratedBlockNumber, Is.InRange(0, 1).After(1000, 10)); - if (wasMigrated) { - int blockNum = (commandStartBlockNumber ?? chainLength) - 1 - 1; + int blockNum = commandStartBlockNumber ?? (chainLength - 1); int txCount = blockNum * 2; - txDb.KeyWasWritten((item => item.Item2 is null), txCount); + defaultDb.KeyWasWritten((item => item.Item2 is null), txCount); ((TestMemDb)receiptColumnDb.GetColumnDb(ReceiptsColumns.Blocks)).KeyWasRemoved((_ => true), blockNum); outMemoryReceiptStorage.Count.Should().Be(txCount); } diff --git a/src/Nethermind/Nethermind.Synchronization/ParallelSync/ISyncModeSelector.cs b/src/Nethermind/Nethermind.Synchronization/ParallelSync/ISyncModeSelector.cs index 049f7a41077..6931df111e4 100644 --- a/src/Nethermind/Nethermind.Synchronization/ParallelSync/ISyncModeSelector.cs +++ b/src/Nethermind/Nethermind.Synchronization/ParallelSync/ISyncModeSelector.cs @@ -2,6 +2,9 @@ // SPDX-License-Identifier: LGPL-3.0-only using System; +using System.Threading; +using System.Threading.Tasks; +using Nethermind.Core.Events; namespace Nethermind.Synchronization.ParallelSync { @@ -17,5 +20,16 @@ public interface ISyncModeSelector : IDisposable void Stop(); void Update(); + + async Task WaitUntilMode(Func predicate, CancellationToken cancellationToken) + { + if (predicate(Current)) return; + + await Wait.ForEventCondition( + cancellationToken, + (e) => Changed += e, + (e) => Changed -= e, + (arg) => predicate(arg.Current)); + } } }