Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Tyrrrz committed Nov 25, 2023
1 parent 797b829 commit 051c026
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 118 deletions.
3 changes: 2 additions & 1 deletion CliWrap/EventStream/PushEventStreamCommandExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static IObservable<CommandEvent> Observe(
CancellationToken gracefulCancellationToken
)
{
return Observable.Create<CommandEvent>(observer =>
return Observable.CreateSynchronized<CommandEvent>(observer =>
{
var stdOutPipe = PipeTarget.Merge(
command.StandardOutputPipe,
Expand All @@ -54,6 +54,7 @@ CancellationToken gracefulCancellationToken
forcefulCancellationToken,
gracefulCancellationToken
);

observer.OnNext(new StartedCommandEvent(commandTask.ProcessId));

// Don't pass cancellation token to the continuation because we need it to
Expand Down
23 changes: 13 additions & 10 deletions CliWrap/PipeSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ public abstract Task CopyToAsync(
);
}

file class AnonymousPipeSource : PipeSource
public partial class PipeSource
{
private readonly Func<Stream, CancellationToken, Task> _copyToAsync;

public AnonymousPipeSource(Func<Stream, CancellationToken, Task> copyToAsync) =>
_copyToAsync = copyToAsync;

public override async Task CopyToAsync(
Stream destination,
CancellationToken cancellationToken = default
) => await _copyToAsync(destination, cancellationToken).ConfigureAwait(false);
private class AnonymousPipeSource : PipeSource
{
private readonly Func<Stream, CancellationToken, Task> _copyToAsync;

public AnonymousPipeSource(Func<Stream, CancellationToken, Task> copyToAsync) =>
_copyToAsync = copyToAsync;

public override async Task CopyToAsync(
Stream destination,
CancellationToken cancellationToken = default
) => await _copyToAsync(destination, cancellationToken).ConfigureAwait(false);
}
}

public partial class PipeSource
Expand Down
140 changes: 72 additions & 68 deletions CliWrap/PipeTarget.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,95 +26,99 @@ public abstract Task CopyFromAsync(
);
}

file class AnonymousPipeTarget : PipeTarget
public partial class PipeTarget
{
private readonly Func<Stream, CancellationToken, Task> _copyFromAsync;

public AnonymousPipeTarget(Func<Stream, CancellationToken, Task> copyFromAsync) =>
_copyFromAsync = copyFromAsync;

public override async Task CopyFromAsync(
Stream origin,
CancellationToken cancellationToken = default
) => await _copyFromAsync(origin, cancellationToken).ConfigureAwait(false);
}
private class AnonymousPipeTarget : PipeTarget
{
private readonly Func<Stream, CancellationToken, Task> _copyFromAsync;

file class AggregatePipeTarget : PipeTarget
{
public IReadOnlyList<PipeTarget> Targets { get; }
public AnonymousPipeTarget(Func<Stream, CancellationToken, Task> copyFromAsync) =>
_copyFromAsync = copyFromAsync;

public AggregatePipeTarget(IReadOnlyList<PipeTarget> targets) => Targets = targets;
public override async Task CopyFromAsync(
Stream origin,
CancellationToken cancellationToken = default
) => await _copyFromAsync(origin, cancellationToken).ConfigureAwait(false);
}

public override async Task CopyFromAsync(
Stream origin,
CancellationToken cancellationToken = default
)
private class AggregatePipeTarget : PipeTarget
{
// Cancellation to abort the pipe if any of the underlying targets fail
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
public IReadOnlyList<PipeTarget> Targets { get; }

// Create a separate sub-stream for each target
var targetSubStreams = new Dictionary<PipeTarget, SimplexStream>();
foreach (var target in Targets)
targetSubStreams[target] = new SimplexStream();
public AggregatePipeTarget(IReadOnlyList<PipeTarget> targets) => Targets = targets;

try
public override async Task CopyFromAsync(
Stream origin,
CancellationToken cancellationToken = default
)
{
// Start piping in the background
var readingTask = Task.WhenAll(
targetSubStreams.Select(async targetSubStream =>
{
var (target, subStream) = targetSubStream;
// Cancellation to abort the pipe if any of the underlying targets fail
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

try
{
// ReSharper disable once AccessToDisposedClosure
await target.CopyFromAsync(subStream, cts.Token).ConfigureAwait(false);
}
catch
{
// Abort the operation if any of the targets fail
// ReSharper disable once AccessToDisposedClosure
cts.Cancel();

throw;
}
})
);
// Create a separate sub-stream for each target
var targetSubStreams = new Dictionary<PipeTarget, SimplexStream>();
foreach (var target in Targets)
targetSubStreams[target] = new SimplexStream();

try
{
// Read from the master stream and replicate the data to each sub-stream
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSizes.Stream);
while (true)
// Start piping in the background
var readingTask = Task.WhenAll(
targetSubStreams.Select(async targetSubStream =>
{
var (target, subStream) = targetSubStream;

try
{
// ReSharper disable once AccessToDisposedClosure
await target.CopyFromAsync(subStream, cts.Token).ConfigureAwait(false);
}
catch
{
// Abort the operation if any of the targets fail
// ReSharper disable once AccessToDisposedClosure
cts.Cancel();

throw;
}
})
);

try
{
var bytesRead = await origin
.ReadAsync(buffer.Memory, cts.Token)
.ConfigureAwait(false);
if (bytesRead <= 0)
break;
// Read from the master stream and replicate the data to each sub-stream
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSizes.Stream);
while (true)
{
var bytesRead = await origin
.ReadAsync(buffer.Memory, cts.Token)
.ConfigureAwait(false);

if (bytesRead <= 0)
break;

foreach (var (_, subStream) in targetSubStreams)
await subStream
.WriteAsync(buffer.Memory[..bytesRead], cts.Token)
.ConfigureAwait(false);
}

// Report that transmission is complete
foreach (var (_, subStream) in targetSubStreams)
await subStream
.WriteAsync(buffer.Memory[..bytesRead], cts.Token)
.ConfigureAwait(false);
await subStream.ReportCompletionAsync(cts.Token).ConfigureAwait(false);
}
finally
{
// Wait for all targets to finish and propagate potential exceptions
await readingTask.ConfigureAwait(false);
}

// Report that transmission is complete
foreach (var (_, subStream) in targetSubStreams)
await subStream.ReportCompletionAsync(cts.Token).ConfigureAwait(false);
}
finally
{
// Wait for all targets to finish and propagate potential exceptions
await readingTask.ConfigureAwait(false);
foreach (var (_, subStream) in targetSubStreams)
await subStream.ToAsyncDisposable().DisposeAsync().ConfigureAwait(false);
}
}
finally
{
foreach (var (_, subStream) in targetSubStreams)
await subStream.ToAsyncDisposable().DisposeAsync().ConfigureAwait(false);
}
}
}

Expand Down
46 changes: 7 additions & 39 deletions CliWrap/Utils/Observable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,22 @@

namespace CliWrap.Utils;

file class SynchronizedObserver<T> : IObserver<T>
{
private readonly IObserver<T> _observer;
private readonly object _syncRoot;

public SynchronizedObserver(IObserver<T> observer, object? syncRoot = null)
{
_observer = observer;
_syncRoot = syncRoot ?? new object();
}

public void OnCompleted()
{
lock (_syncRoot)
{
_observer.OnCompleted();
}
}

public void OnError(Exception error)
{
lock (_syncRoot)
{
_observer.OnError(error);
}
}

public void OnNext(T value)
{
lock (_syncRoot)
{
_observer.OnNext(value);
}
}
}

file class Observable<T> : IObservable<T>
internal class Observable<T> : IObservable<T>
{
private readonly Func<IObserver<T>, IDisposable> _subscribe;

public Observable(Func<IObserver<T>, IDisposable> subscribe) => _subscribe = subscribe;

public IDisposable Subscribe(IObserver<T> observer) =>
_subscribe(new SynchronizedObserver<T>(observer));
public IDisposable Subscribe(IObserver<T> observer) => _subscribe(observer);
}

internal static class Observable
{
public static IObservable<T> Create<T>(Func<IObserver<T>, IDisposable> subscribe) =>
new Observable<T>(subscribe);

public static IObservable<T> CreateSynchronized<T>(
Func<IObserver<T>, IDisposable> subscribe,
object? syncRoot = null
) => Create<T>(observer => subscribe(new SynchronizedObserver<T>(observer, syncRoot)));
}
39 changes: 39 additions & 0 deletions CliWrap/Utils/SynchronizedObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;

namespace CliWrap.Utils;

internal class SynchronizedObserver<T> : IObserver<T>
{
private readonly IObserver<T> _observer;
private readonly object _syncRoot;

public SynchronizedObserver(IObserver<T> observer, object? syncRoot = null)
{
_observer = observer;
_syncRoot = syncRoot ?? new object();
}

public void OnCompleted()
{
lock (_syncRoot)
{
_observer.OnCompleted();
}
}

public void OnError(Exception error)
{
lock (_syncRoot)
{
_observer.OnError(error);
}
}

public void OnNext(T value)
{
lock (_syncRoot)
{
_observer.OnNext(value);
}
}
}

0 comments on commit 051c026

Please sign in to comment.