Skip to content

Commit

Permalink
Limit number of background tasks and add metrics (#8110)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcindsobczak authored Jan 28, 2025
1 parent eca0591 commit 75fb5f6
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 7 deletions.
3 changes: 3 additions & 0 deletions src/Nethermind/Nethermind.Api/IInitConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public interface IInitConfig : IConfig

[ConfigItem(Description = "[TECHNICAL] Specify concurrency limit for background task.", DefaultValue = "1", HiddenFromDocs = true)]
int BackgroundTaskConcurrency { get; set; }

[ConfigItem(Description = "[TECHNICAL] Specify max number of background task.", DefaultValue = "65536", HiddenFromDocs = true)]
int BackgroundTaskMaxNumber { get; set; }
}

public enum DiagnosticMode
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Api/InitConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class InitConfig : IInitConfig
public INodeStorage.KeyScheme StateDbKeyScheme { get; set; } = INodeStorage.KeyScheme.Current;
public long? ExitOnBlockNumber { get; set; } = null;
public int BackgroundTaskConcurrency { get; set; } = 1;
public int BackgroundTaskMaxNumber { get; set; } = 65536;

[Obsolete("Use DiagnosticMode with MemDb instead")]
public bool UseMemDb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void Setup()
public async Task Test_task_will_execute()
{
TaskCompletionSource tcs = new TaskCompletionSource();
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 1, LimboLogs.Instance);
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 1, 65536, LimboLogs.Instance);

scheduler.ScheduleTask(1, (_, token) =>
{
Expand All @@ -43,7 +43,7 @@ public async Task Test_task_will_execute()
[Test]
public async Task Test_task_will_execute_concurrently_when_configured_so()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, 65536, LimboLogs.Instance);

int counter = 0;

Expand All @@ -68,7 +68,7 @@ public async Task Test_task_will_execute_concurrently_when_configured_so()
[Test]
public async Task Test_task_will_cancel_on_block_processing()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, 65536, LimboLogs.Instance);

bool wasCancelled = false;

Expand All @@ -94,7 +94,7 @@ public async Task Test_task_will_cancel_on_block_processing()
[Test]
public async Task Test_task_that_is_scheduled_during_block_processing_will_continue_after()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, 65536, LimboLogs.Instance);
_blockProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null));

int executionCount = 0;
Expand All @@ -117,7 +117,7 @@ public async Task Test_task_that_is_scheduled_during_block_processing_will_conti
[Test]
public async Task Test_task_that_is_scheduled_during_block_processing_but_deadlined_will_get_called_and_cancelled()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, 65536, LimboLogs.Instance);
_blockProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null));

bool wasCancelled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ public class BackgroundTaskScheduler : IBackgroundTaskScheduler, IAsyncDisposabl
private readonly ManualResetEvent _restartQueueSignal;
private readonly Task<Task>[] _tasksExecutors;

public BackgroundTaskScheduler(IBlockProcessor blockProcessor, int concurrency, ILogManager logManager)
public BackgroundTaskScheduler(IBlockProcessor blockProcessor, int concurrency, int capacity, ILogManager logManager)
{
if (concurrency < 1) throw new ArgumentException("concurrency must be at least 1");
if (capacity < 1) throw new ArgumentException("capacity must be at least 1");

_mainCancellationTokenSource = new CancellationTokenSource();
_blockProcessorCancellationTokenSource = new CancellationTokenSource();
_taskQueue = Channel.CreateUnbounded<IActivity>();
_taskQueue = Channel.CreateBounded<IActivity>(capacity);
_logger = logManager.GetClassLogger();
_blockProcessor = blockProcessor;
_restartQueueSignal = new ManualResetEvent(true);
Expand Down Expand Up @@ -130,6 +131,8 @@ public void ScheduleTask<TReq>(TReq request, Func<TReq, CancellationToken, Task>
// This should never happen unless something goes very wrong.
throw new InvalidOperationException("Unable to write to background task queue.");
}

Evm.Metrics.NumberOfBackgroundTasksScheduled = _taskQueue.Reader.Count;
}

public async ValueTask DisposeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ protected override void Load(ContainerBuilder builder)
.AddSingleton<IBackgroundTaskScheduler, MainBlockProcessingContext>((blockProcessingContext) => new BackgroundTaskScheduler(
blockProcessingContext.BlockProcessor,
initConfig.BackgroundTaskConcurrency,
initConfig.BackgroundTaskMaxNumber,
logManager))
.AddSingleton<IFileSystem>(new FileSystem())
.AddSingleton<IDbProvider>(new DbProvider())
Expand Down
4 changes: 4 additions & 0 deletions src/Nethermind/Nethermind.Evm/Metrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ public class Metrics
public static long ThreadLocalContractsAnalysed => _contractsAnalysed.ThreadLocalValue;
public static void IncrementContractsAnalysed() => _contractsAnalysed.Increment();

[GaugeMetric]
[Description("The number of tasks scheduled in the background.")]
public static long NumberOfBackgroundTasksScheduled { get; set; }

internal static long BlockTransactions { get; set; }

private static float _blockAveGasPrice;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ protected virtual Task InitBlockchain()
BackgroundTaskScheduler backgroundTaskScheduler = new BackgroundTaskScheduler(
mainBlockProcessor,
initConfig.BackgroundTaskConcurrency,
initConfig.BackgroundTaskMaxNumber,
_api.LogManager);
setApi.BackgroundTaskScheduler = backgroundTaskScheduler;
_api.DisposeStack.Push(backgroundTaskScheduler);
Expand Down

0 comments on commit 75fb5f6

Please sign in to comment.