diff --git a/src/Nethermind/Nethermind.Init/Steps/EthereumStepsManager.cs b/src/Nethermind/Nethermind.Init/Steps/EthereumStepsManager.cs index 04a1c3fd309..9616912a6e1 100644 --- a/src/Nethermind/Nethermind.Init/Steps/EthereumStepsManager.cs +++ b/src/Nethermind/Nethermind.Init/Steps/EthereumStepsManager.cs @@ -19,10 +19,8 @@ public class EthereumStepsManager { private readonly ILogger _logger; - private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(true); private readonly INethermindApi _api; private readonly List _allSteps; - private readonly Dictionary _allStepsByBaseType; public EthereumStepsManager( IEthereumStepsLoader loader, @@ -36,153 +34,96 @@ public EthereumStepsManager( ?? throw new ArgumentNullException(nameof(logManager)); _allSteps = loader.LoadSteps(_api.GetType()).ToList(); - _allStepsByBaseType = _allSteps.ToDictionary(static s => s.StepBaseType, static s => s); } - private async Task ReviewDependencies(CancellationToken cancellationToken) + public async Task InitializeAll(CancellationToken cancellationToken) { - bool changedAnything; + List allRequiredSteps = CreateAndExecuteSteps(cancellationToken); + if (allRequiredSteps.Count == 0) + return; do { - foreach (StepInfo stepInfo in _allSteps) - { - _logger.Debug($"{stepInfo} is {stepInfo.Stage}"); - } - - await _autoResetEvent.WaitOneAsync(cancellationToken); - - if (_logger.IsDebug) _logger.Debug("Reviewing steps manager dependencies"); - - changedAnything = false; - foreach (StepInfo stepInfo in _allSteps) - { - cancellationToken.ThrowIfCancellationRequested(); - - if (stepInfo.Stage == StepInitializationStage.WaitingForDependencies) - { - bool allDependenciesFinished = true; - foreach (Type dependency in stepInfo.Dependencies) - { - StepInfo dependencyInfo = _allStepsByBaseType[dependency]; - if (dependencyInfo.Stage != StepInitializationStage.Complete) - { - if (_logger.IsDebug) _logger.Debug($"{stepInfo} is waiting for {dependencyInfo}"); - allDependenciesFinished = false; - break; - } - } - - if (allDependenciesFinished) - { - stepInfo.Stage = StepInitializationStage.WaitingForExecution; - changedAnything = true; - if (_logger.IsDebug) _logger.Debug($"{stepInfo} stage changed to {stepInfo.Stage}"); - _autoResetEvent.Set(); - } - } - } - } while (changedAnything); + Task current = await Task.WhenAny(allRequiredSteps); + ReviewFailedAndThrow(current); + allRequiredSteps.Remove(current); + } while (allRequiredSteps.Any(s => !s.IsCompleted)); } - public async Task InitializeAll(CancellationToken cancellationToken) - { - while (_allSteps.Any(static s => s.Stage != StepInitializationStage.Complete)) - { - cancellationToken.ThrowIfCancellationRequested(); - - RunOneRoundOfInitialization(cancellationToken); - await ReviewDependencies(cancellationToken); - ReviewFailedAndThrow(); - } - await Task.WhenAll(_allPending); - } - - private readonly ConcurrentQueue _allPending = new(); - - private void RunOneRoundOfInitialization(CancellationToken cancellationToken) + private List CreateAndExecuteSteps(CancellationToken cancellationToken) { - int startedThisRound = 0; + Dictionary createdSteps = []; + foreach (StepInfo stepInfo in _allSteps) { cancellationToken.ThrowIfCancellationRequested(); - if (stepInfo.Stage != StepInitializationStage.WaitingForExecution) - { - continue; - } - IStep? step = CreateStepInstance(stepInfo); if (step is null) { if (_logger.IsError) _logger.Error($"Unable to create instance of Ethereum runner step {stepInfo}"); continue; } - - if (_logger.IsDebug) _logger.Debug($"Executing step: {stepInfo}"); - - stepInfo.Stage = StepInitializationStage.Executing; - startedThisRound++; - Task task = ExecuteStep(step, stepInfo, cancellationToken); - - if (step.MustInitialize) - { - _allPending.Enqueue(task); - } - else + createdSteps.Add(step.GetType(), new StepWrapper(step)); + } + List allRequiredSteps = new(); + foreach (StepInfo stepInfo in _allSteps) + { + if (!createdSteps.ContainsKey(stepInfo.StepType)) { - stepInfo.Stage = StepInitializationStage.Complete; + throw new StepDependencyException($"A step {stepInfo} could not be created and initialization cannot proceed."); } - } + StepWrapper stepWrapper = createdSteps[stepInfo.StepType]; - if (startedThisRound == 0 && _allPending.All(static t => t.IsCompleted)) - { - Interlocked.Increment(ref _foreverLoop); - if (_foreverLoop > 100) + Task task = ExecuteStep(stepWrapper, stepInfo, createdSteps, cancellationToken); + if (_logger.IsDebug) _logger.Debug($"Executing step: {stepInfo}"); + + if (stepWrapper.Step.MustInitialize) { - if (_logger.IsWarn) _logger.Warn($"Didn't start any initialization steps during initialization round and all previous steps are already completed."); + allRequiredSteps.Add(task); } } + return allRequiredSteps; } - private async Task ExecuteStep(IStep step, StepInfo stepInfo, CancellationToken cancellationToken) + private async Task ExecuteStep(StepWrapper stepWrapper, StepInfo stepInfo, Dictionary steps, CancellationToken cancellationToken) { long startTime = Stopwatch.GetTimestamp(); try { - await step.Execute(cancellationToken); + IEnumerable dependencies = []; + foreach (Type type in stepInfo.Dependencies) + { + if (!steps.ContainsKey(type)) + throw new StepDependencyException($"The dependent step {type.Name} for {stepInfo.StepType.Name} was not created."); + dependencies = stepInfo.Dependencies.Select(t => steps[t]); + } + await stepWrapper.StartExecute(dependencies, cancellationToken); if (_logger.IsDebug) _logger.Debug( - $"Step {step.GetType().Name,-24} executed in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms"); - - stepInfo.Stage = StepInitializationStage.Complete; + $"Step {stepWrapper.GetType().Name,-24} executed in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms"); } - catch (Exception exception) + catch (Exception exception) when (exception is not TaskCanceledException) { - if (step.MustInitialize) + if (stepWrapper.Step.MustInitialize) { if (_logger.IsError) _logger.Error( - $"Step {step.GetType().Name,-24} failed after {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms", + $"Step {stepWrapper.GetType().Name,-24} failed after {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms", exception); - - stepInfo.Stage = StepInitializationStage.Failed; throw; } if (_logger.IsWarn) { _logger.Warn( - $"Step {step.GetType().Name,-24} failed after {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms {exception}"); + $"Step {stepWrapper.GetType().Name,-24} failed after {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms {exception}"); } - stepInfo.Stage = StepInitializationStage.Complete; } finally { - _autoResetEvent.Set(); - - if (_logger.IsDebug) _logger.Debug($"{step.GetType().Name,-24} complete"); + if (_logger.IsDebug) _logger.Debug($"{stepWrapper.GetType().Name,-24} complete"); } } @@ -201,13 +142,36 @@ private async Task ExecuteStep(IStep step, StepInfo stepInfo, CancellationToken return step; } - private int _foreverLoop; + private void ReviewFailedAndThrow(Task task) + { + if (task?.IsFaulted == true && task?.Exception is not null) + ExceptionDispatchInfo.Capture(task.Exception.GetBaseException()).Throw(); + } - private void ReviewFailedAndThrow() + private class StepWrapper(IStep step) { - Task? anyFaulted = _allPending.FirstOrDefault(static t => t.IsFaulted); - if (anyFaulted?.IsFaulted == true && anyFaulted?.Exception is not null) - ExceptionDispatchInfo.Capture(anyFaulted.Exception.GetBaseException()).Throw(); + public IStep Step => step; + public Task StepTask => _taskCompletedSource.Task; + + private TaskCompletionSource _taskCompletedSource = new TaskCompletionSource(); + + public async Task StartExecute(IEnumerable dependentSteps, CancellationToken cancellationToken) + { + cancellationToken.Register(() => _taskCompletedSource.TrySetCanceled()); + + await Task.WhenAll(dependentSteps.Select(s => s.StepTask)); + try + { + await step.Execute(cancellationToken); + _taskCompletedSource.TrySetResult(); + } + catch + { + _taskCompletedSource.TrySetCanceled(); + throw; + } + } } } + } diff --git a/src/Nethermind/Nethermind.Init/Steps/StepInfo.cs b/src/Nethermind/Nethermind.Init/Steps/StepInfo.cs index 14ba018fd8a..4873e2b2099 100644 --- a/src/Nethermind/Nethermind.Init/Steps/StepInfo.cs +++ b/src/Nethermind/Nethermind.Init/Steps/StepInfo.cs @@ -29,11 +29,9 @@ public StepInfo(Type type, Type baseType) public Type[] Dependencies { get; } - public StepInitializationStage Stage { get; set; } - public override string ToString() { - return $"{StepType.Name} : {StepBaseType.Name} ({Stage})"; + return $"{StepType.Name} : {StepBaseType.Name}"; } } } diff --git a/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/EthereumStepsManagerTests.cs b/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/EthereumStepsManagerTests.cs index b496114ec63..a38fded1cf1 100644 --- a/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/EthereumStepsManagerTests.cs +++ b/src/Nethermind/Nethermind.Runner.Test/Ethereum/Steps/EthereumStepsManagerTests.cs @@ -47,7 +47,7 @@ public async Task With_steps_from_here() LimboLogs.Instance); using CancellationTokenSource source = new CancellationTokenSource(TimeSpan.FromSeconds(1)); - + source.Cancel(); try { await stepsManager.InitializeAll(source.Token); @@ -97,7 +97,7 @@ public async Task With_failing_steps() LimboLogs.Instance); using CancellationTokenSource source = new CancellationTokenSource(TimeSpan.FromSeconds(2)); - + source.Cancel(); try { await stepsManager.InitializeAll(source.Token); @@ -153,7 +153,7 @@ public StepA(NethermindApi runnerContext) } } - [RunnerStepDependencies(typeof(StepC))] + [RunnerStepDependencies(typeof(StepCStandard))] public class StepB : IStep { public Task Execute(CancellationToken cancellationToken)