Skip to content

Commit

Permalink
+semver:minor - Merge pull request #33 from thomhurst/feature/Paralle…
Browse files Browse the repository at this point in the history
…lAsyncProcessorTests

ParallelAsyncProcessorTests
  • Loading branch information
thomhurst authored Dec 22, 2023
2 parents 0da4f45 + 362e664 commit 4db024e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 5 deletions.
33 changes: 33 additions & 0 deletions EnumerableAsyncProcessor.UnitTests/ParallelAsyncProcessorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using EnumerableAsyncProcessor.Builders;
using NUnit.Framework;

namespace EnumerableAsyncProcessor.UnitTests;

[FixtureLifeCycle(LifeCycle.InstancePerTestCase)]
[Parallelizable(ParallelScope.All)]
public class ParallelAsyncProcessorTests
{
[Test]
public async Task Test()
{
var stopwatch = Stopwatch.StartNew();

var processor = AsyncProcessorBuilder
.WithExecutionCount(500)
.ForEachAsync( () => Task.Delay(100))
.ProcessInParallel();

await processor;

stopwatch.Stop();

var completedTasks = processor.GetEnumerableTasks().Count(x => x.IsCompleted);

Assert.That(completedTasks, Is.EqualTo(500));
Assert.That(stopwatch.Elapsed.Milliseconds, Is.EqualTo(100).Within(1000));
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
namespace EnumerableAsyncProcessor.RunnableProcessors.ResultProcessors;
using EnumerableAsyncProcessor.RunnableProcessors.ResultProcessors.Abstract;

public class ResultParallelAsyncProcessor<TOutput> : ResultRateLimitedParallelAsyncProcessor<TOutput>
namespace EnumerableAsyncProcessor.RunnableProcessors.ResultProcessors;

public class ResultParallelAsyncProcessor<TOutput> : ResultAbstractAsyncProcessor<TOutput>
{
internal ResultParallelAsyncProcessor(int count, Func<Task<TOutput>> taskSelector, CancellationTokenSource cancellationTokenSource) : base(count, taskSelector, -1, cancellationTokenSource)
internal ResultParallelAsyncProcessor(int count, Func<Task<TOutput>> taskSelector, CancellationTokenSource cancellationTokenSource) : base(count, taskSelector, cancellationTokenSource)
{
}

internal override Task Process()
{
return Task.WhenAll(EnumerableTaskCompletionSources.Select(ProcessItem));
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
using System.Collections.Immutable;
using EnumerableAsyncProcessor.RunnableProcessors.ResultProcessors.Abstract;

namespace EnumerableAsyncProcessor.RunnableProcessors.ResultProcessors;

public class ResultParallelAsyncProcessor<TInput, TOutput> : ResultRateLimitedParallelAsyncProcessor<TInput, TOutput>
public class ResultParallelAsyncProcessor<TInput, TOutput> : ResultAbstractAsyncProcessor<TInput, TOutput>
{
internal ResultParallelAsyncProcessor(ImmutableList<TInput> items, Func<TInput, Task<TOutput>> taskSelector, CancellationTokenSource cancellationTokenSource) : base(items, taskSelector, -1, cancellationTokenSource)
internal ResultParallelAsyncProcessor(ImmutableList<TInput> items, Func<TInput, Task<TOutput>> taskSelector, CancellationTokenSource cancellationTokenSource) : base(items, taskSelector, cancellationTokenSource)
{
}

internal override Task Process()
{
return Task.WhenAll(ItemisedTaskCompletionSourceContainers.Select(ProcessItem));
}
}

0 comments on commit 4db024e

Please sign in to comment.