Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/initsteps race condition #8145

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public InitDatabaseSnapshot(INethermindApi api) : base(api)
_logger = _api.LogManager.GetClassLogger();
}

public override async Task Execute(CancellationToken cancellationToken)
protected override async Task Setup(CancellationToken cancellationToken)
{
switch (_api.Config<IInitConfig>().DiagnosticMode)
{
Expand All @@ -38,7 +38,7 @@ public override async Task Execute(CancellationToken cancellationToken)
break;
}

await base.Execute(cancellationToken);
await base.Setup(cancellationToken);
}

private async Task InitDbFromSnapshot(CancellationToken cancellationToken)
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Init/InitializeStateDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
namespace Nethermind.Init;

[RunnerStepDependencies(typeof(InitializePlugins), typeof(InitializeBlockTree), typeof(SetupKeyStore))]
public class InitializeStateDb : IStep
public class InitializeStateDb : InitStep, IStep
{
private readonly INethermindApi _api;
private ILogger _logger;
Expand All @@ -41,7 +41,7 @@ public InitializeStateDb(INethermindApi api)
_api = api;
}

public Task Execute(CancellationToken cancellationToken)
protected override Task Setup(CancellationToken _)
{
InitBlockTraceDumper();

Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Init/Steps/ApplyMemoryHint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace Nethermind.Init.Steps
{
[RunnerStepDependencies(typeof(MigrateConfigs))]
public sealed class ApplyMemoryHint : IStep
public sealed class ApplyMemoryHint : InitStep, IStep
{
private readonly INethermindApi _api;
private readonly IInitConfig _initConfig;
Expand All @@ -32,7 +32,7 @@ public ApplyMemoryHint(INethermindApi api)
_txPoolConfig = api.Config<ITxPoolConfig>();
}

public Task Execute(CancellationToken _)
protected override Task Setup(CancellationToken _)
{
MemoryHintMan memoryHintMan = new(_api.LogManager);
uint cpuCount = (uint)Environment.ProcessorCount;
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Init/Steps/DatabaseMigrations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace Nethermind.Init.Steps
{
[RunnerStepDependencies(typeof(InitTxTypesAndRlp), typeof(InitDatabase), typeof(InitializeBlockchain), typeof(InitializeNetwork))]
public sealed class DatabaseMigrations : IStep
public sealed class DatabaseMigrations : InitStep, IStep
{
private readonly IApiWithNetwork _api;

Expand All @@ -20,7 +20,7 @@ public DatabaseMigrations(INethermindApi api)
_api = api;
}

public async Task Execute(CancellationToken cancellationToken)
protected override async Task Setup(CancellationToken cancellationToken)
{
foreach (IDatabaseMigration migration in CreateMigrations())
{
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Init/Steps/EraStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace Nethermind.Init.Steps;

[RunnerStepDependencies(typeof(InitializeBlockchain), typeof(LoadGenesisBlock))]
public class EraStep : IStep
public class EraStep : InitStep, IStep
{
protected readonly INethermindApi _api;

Expand All @@ -21,7 +21,7 @@ public EraStep(INethermindApi api)
_api = api;
}

public async Task Execute(CancellationToken cancellationToken)
protected override async Task Setup(CancellationToken cancellationToken)
{
IContainer container = _api.ConfigureContainerBuilderFromApiWithBlockchain(new ContainerBuilder())
.AddModule(new EraModule())
Expand Down
131 changes: 32 additions & 99 deletions src/Nethermind/Nethermind.Init/Steps/EthereumStepsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ public class EthereumStepsManager
{
private readonly ILogger _logger;

private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(true);
private readonly INethermindApi _api;
private readonly List<StepInfo> _allSteps;
private readonly Dictionary<Type, StepInfo> _allStepsByBaseType;

public EthereumStepsManager(
IEthereumStepsLoader loader,
Expand All @@ -36,138 +34,79 @@ public EthereumStepsManager(
?? throw new ArgumentNullException(nameof(logManager));

_allSteps = loader.LoadSteps(_api.GetType()).ToList();
_allStepsByBaseType = _allSteps.ToDictionary(static s => s.StepBaseType, static s => s);
}

private async Task ReviewDependencies(CancellationToken cancellationToken)
{
bool changedAnything;
do
{
foreach (StepInfo stepInfo in _allSteps)
{
_logger.Debug($"{stepInfo} is {stepInfo.Stage}");
}

await _autoResetEvent.WaitOneAsync(cancellationToken);

if (_logger.IsDebug) _logger.Debug("Reviewing steps manager dependencies");

changedAnything = false;
foreach (StepInfo stepInfo in _allSteps)
{
cancellationToken.ThrowIfCancellationRequested();

if (stepInfo.Stage == StepInitializationStage.WaitingForDependencies)
{
bool allDependenciesFinished = true;
foreach (Type dependency in stepInfo.Dependencies)
{
StepInfo dependencyInfo = _allStepsByBaseType[dependency];
if (dependencyInfo.Stage != StepInitializationStage.Complete)
{
if (_logger.IsDebug) _logger.Debug($"{stepInfo} is waiting for {dependencyInfo}");
allDependenciesFinished = false;
break;
}
}

if (allDependenciesFinished)
{
stepInfo.Stage = StepInitializationStage.WaitingForExecution;
changedAnything = true;
if (_logger.IsDebug) _logger.Debug($"{stepInfo} stage changed to {stepInfo.Stage}");
_autoResetEvent.Set();
}
}
}
} while (changedAnything);
}

public async Task InitializeAll(CancellationToken cancellationToken)
{
while (_allSteps.Any(static s => s.Stage != StepInitializationStage.Complete))
List<Task> allRequiredSteps = CreateAndExecuteSteps(cancellationToken);
if (allRequiredSteps.Count == 0)
return;
Task current;
ak88 marked this conversation as resolved.
Show resolved Hide resolved
do
{
cancellationToken.ThrowIfCancellationRequested();

RunOneRoundOfInitialization(cancellationToken);
await ReviewDependencies(cancellationToken);
ReviewFailedAndThrow();
}

await Task.WhenAll(_allPending);
current = await Task.WhenAny(allRequiredSteps);
ak88 marked this conversation as resolved.
Show resolved Hide resolved
ReviewFailedAndThrow(current);
allRequiredSteps.Remove(current);
} while (allRequiredSteps.Any(s => !s.IsCompleted));
}

private readonly ConcurrentQueue<Task> _allPending = new();

private void RunOneRoundOfInitialization(CancellationToken cancellationToken)
private List<Task> CreateAndExecuteSteps(CancellationToken cancellationToken)
{
int startedThisRound = 0;
Dictionary<Type, IStep> createdSteps = [];

foreach (StepInfo stepInfo in _allSteps)
{
cancellationToken.ThrowIfCancellationRequested();

if (stepInfo.Stage != StepInitializationStage.WaitingForExecution)
{
continue;
}

IStep? step = CreateStepInstance(stepInfo);
if (step is null)
{
if (_logger.IsError) _logger.Error($"Unable to create instance of Ethereum runner step {stepInfo}");
continue;
}
createdSteps.Add(step.GetType(), step);
}
List<Task> allRequiredSteps = new();
foreach (StepInfo stepInfo in _allSteps)
{
if (!createdSteps.ContainsKey(stepInfo.StepType))
{
throw new StepDependencyException($"A step {stepInfo} could not be created and initialization cannot proceed.");
}
IStep step = createdSteps[stepInfo.StepType];

Task task = ExecuteStep(step, stepInfo, createdSteps, cancellationToken);
if (_logger.IsDebug) _logger.Debug($"Executing step: {stepInfo}");

stepInfo.Stage = StepInitializationStage.Executing;
startedThisRound++;
Task task = ExecuteStep(step, stepInfo, cancellationToken);

if (step.MustInitialize)
{
_allPending.Enqueue(task);
}
else
{
stepInfo.Stage = StepInitializationStage.Complete;
}
}

if (startedThisRound == 0 && _allPending.All(static t => t.IsCompleted))
{
Interlocked.Increment(ref _foreverLoop);
if (_foreverLoop > 100)
{
if (_logger.IsWarn) _logger.Warn($"Didn't start any initialization steps during initialization round and all previous steps are already completed.");
allRequiredSteps.Add(task);
}
}
return allRequiredSteps;
}

private async Task ExecuteStep(IStep step, StepInfo stepInfo, CancellationToken cancellationToken)
private async Task ExecuteStep(IStep step, StepInfo stepInfo, Dictionary<Type, IStep> steps, CancellationToken cancellationToken)
{
long startTime = Stopwatch.GetTimestamp();
try
{
await step.Execute(cancellationToken);
IEnumerable<Task> dependencies = stepInfo.Dependencies.Select(t => steps[t].StepCompleted);
await step.Execute(dependencies, cancellationToken);

if (_logger.IsDebug)
_logger.Debug(
$"Step {step.GetType().Name,-24} executed in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms");

stepInfo.Stage = StepInitializationStage.Complete;
}
catch (Exception exception)
catch (Exception exception) when (exception is not TaskCanceledException)
{
if (step.MustInitialize)
{
if (_logger.IsError)
_logger.Error(
$"Step {step.GetType().Name,-24} failed after {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms",
exception);

stepInfo.Stage = StepInitializationStage.Failed;
throw;
}

Expand All @@ -176,12 +115,9 @@ private async Task ExecuteStep(IStep step, StepInfo stepInfo, CancellationToken
_logger.Warn(
$"Step {step.GetType().Name,-24} failed after {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms {exception}");
}
stepInfo.Stage = StepInitializationStage.Complete;
}
finally
{
_autoResetEvent.Set();

if (_logger.IsDebug) _logger.Debug($"{step.GetType().Name,-24} complete");
}
}
Expand All @@ -201,13 +137,10 @@ private async Task ExecuteStep(IStep step, StepInfo stepInfo, CancellationToken
return step;
}

private int _foreverLoop;

private void ReviewFailedAndThrow()
private void ReviewFailedAndThrow(Task task)
{
Task? anyFaulted = _allPending.FirstOrDefault(static t => t.IsFaulted);
if (anyFaulted?.IsFaulted == true && anyFaulted?.Exception is not null)
ExceptionDispatchInfo.Capture(anyFaulted.Exception.GetBaseException()).Throw();
if (task?.IsFaulted == true && task?.Exception is not null)
ExceptionDispatchInfo.Capture(task.Exception.GetBaseException()).Throw();
}
}
}
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Init/Steps/FilterBootnodes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
namespace Nethermind.Init.Steps
{
[RunnerStepDependencies(typeof(SetupKeyStore))]
public class FilterBootnodes : IStep
public class FilterBootnodes : InitStep, IStep
{
private readonly IApiWithStores _api;

Expand All @@ -18,7 +18,7 @@ public FilterBootnodes(INethermindApi api)
_api = api;
}

public Task Execute(CancellationToken _)
protected override Task Setup(CancellationToken _)
{
if (_api.ChainSpec is null)
{
Expand Down
5 changes: 3 additions & 2 deletions src/Nethermind/Nethermind.Init/Steps/IEthereumRunnerStep.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Nethermind.Init.Steps
{
public interface IStep
{
Task Execute(CancellationToken cancellationToken);

Task StepCompleted { get; }
Task Execute(IEnumerable<Task> dependentSteps, CancellationToken cancellationToken);
public bool MustInitialize => true;
}
}
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Init/Steps/InitCrypto.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace Nethermind.Init.Steps
{
[RunnerStepDependencies(typeof(InitTxTypesAndRlp))]
public class InitCrypto : IStep
public class InitCrypto : InitStep, IStep
{
private readonly IBasicApi _api;

Expand All @@ -20,7 +20,7 @@ public InitCrypto(INethermindApi api)
}

[Todo(Improve.Refactor, "Automatically scan all the references solutions?")]
public virtual Task Execute(CancellationToken _)
protected override Task Setup(CancellationToken _)
{
_api.EthereumEcdsa = new EthereumEcdsa(_api.SpecProvider!.ChainId);
return Task.CompletedTask;
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Init/Steps/InitDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
namespace Nethermind.Init.Steps
{
[RunnerStepDependencies(typeof(ApplyMemoryHint))]
public class InitDatabase : IStep
public class InitDatabase : InitStep, IStep
{
private readonly INethermindApi _api;

Expand All @@ -30,7 +30,7 @@ public InitDatabase(INethermindApi api)
_api = api;
}

public virtual async Task Execute(CancellationToken _)
protected override async Task Setup(CancellationToken cancellationToken)
{
ILogger logger = _api.LogManager.GetClassLogger();

Expand Down
Loading
Loading