From 2d18be7d4b3f6cb04bff646351a8c7a78d88ac97 Mon Sep 17 00:00:00 2001 From: Rima Falko Date: Thu, 16 Jan 2025 00:34:39 +0100 Subject: [PATCH] remove parallelism-meter form signal-flow and signal-handling-pipelines --- Sources/Falko.Talkie.Signals/Flows/SignalFlow.cs | 14 +++++--------- .../Handling/ManySignalHandlingPipeline.cs | 9 ++++----- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/Sources/Falko.Talkie.Signals/Flows/SignalFlow.cs b/Sources/Falko.Talkie.Signals/Flows/SignalFlow.cs index 5f0c0e5..69326df 100644 --- a/Sources/Falko.Talkie.Signals/Flows/SignalFlow.cs +++ b/Sources/Falko.Talkie.Signals/Flows/SignalFlow.cs @@ -14,14 +14,9 @@ public sealed class SignalFlow : ISignalFlow private readonly RemovableSequence _pipelines = new(); - private readonly ParallelismMeter _pipelinesParallelismMeter = new(); - private bool _disposed; - public SignalFlow() - { - TaskScheduler.UnobservedTaskException += OnUnobservedSignalPublishingException; - } + public SignalFlow() => TaskScheduler.UnobservedTaskException += OnUnobservedSignalPublishingException; public Subscription Subscribe(ISignalHandlingPipeline handlingPipeline) { @@ -50,9 +45,10 @@ public async Task PublishAsync(Signal signal, CancellationToken cancellationToke try { // ReSharper disable once InconsistentlySynchronizedField - await _pipelines.Parallelize(_pipelinesParallelismMeter) - .ForEachAsync((pipeline, scopedCancellationToken) => pipeline.TransferAsync(this, signal, scopedCancellationToken), - cancellationToken: publishCancellationToken); + await _pipelines + .Parallelize() + .ForEachAsync((pipeline, scopedCancellationToken) => pipeline + .TransferAsync(this, signal, scopedCancellationToken), publishCancellationToken); } catch (SignalPublishingException exception) { diff --git a/Sources/Falko.Talkie.Signals/Pipelines/Handling/ManySignalHandlingPipeline.cs b/Sources/Falko.Talkie.Signals/Pipelines/Handling/ManySignalHandlingPipeline.cs index a2a04de..56b7fa9 100644 --- a/Sources/Falko.Talkie.Signals/Pipelines/Handling/ManySignalHandlingPipeline.cs +++ b/Sources/Falko.Talkie.Signals/Pipelines/Handling/ManySignalHandlingPipeline.cs @@ -15,8 +15,6 @@ public sealed class ManySignalHandlingPipeline { private readonly FrozenSequence _handlers = handlers.ToFrozenSequence(); - private readonly ParallelismMeter _handlersParallelismMeter = new(); - public ValueTask TransferAsync(ISignalFlow flow, Signal signal, CancellationToken cancellationToken = default) { if (interceptingPipeline?.TryTransfer(signal, out signal, cancellationToken) is false) @@ -26,9 +24,10 @@ public ValueTask TransferAsync(ISignalFlow flow, Signal signal, CancellationToke var context = new SignalContext(flow, signal); - return _handlers.Parallelize(_handlersParallelismMeter) - .ForEachAsync((handler, scopedCancellationToken) => handler.HandleAsync(context, scopedCancellationToken), - cancellationToken: cancellationToken) + return _handlers + .Parallelize() + .ForEachAsync((handler, scopedCancellationToken) => handler + .HandleAsync(context, scopedCancellationToken), cancellationToken: cancellationToken) .AsValueTask(); } }