Skip to content

Commit

Permalink
IO refactor WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
louthy committed Dec 18, 2024
1 parent 6680f5e commit 1c4074b
Show file tree
Hide file tree
Showing 18 changed files with 1,386 additions and 109 deletions.
15 changes: 15 additions & 0 deletions LanguageExt.Core/Effects/IO/DSL/IODsl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Threading.Tasks;

namespace LanguageExt.DSL;

static class IODsl
{
public static IODsl<A> Lift<A>(Func<EnvIO, A> f) => new IOLiftSync<A>(f);
public static IODsl<A> Lift<A>(Func<EnvIO, Task<A>> f) => new IOLiftAsync<A>(f);
}

abstract record IODsl<A>
{
public abstract IODsl<B> Map<B>(Func<A, B> f);
}
10 changes: 10 additions & 0 deletions LanguageExt.Core/Effects/IO/DSL/IOLiftAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using System.Threading.Tasks;

namespace LanguageExt.DSL;

record IOLiftAsync<A>(Func<EnvIO, Task<A>> F) : IODsl<A>
{
public override IODsl<B> Map<B>(Func<A, B> f) =>
new IOLiftAsync<B>(async x => f(await F(x).ConfigureAwait(false)));
}
10 changes: 10 additions & 0 deletions LanguageExt.Core/Effects/IO/DSL/IOLiftSync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using System.Threading.Tasks;

namespace LanguageExt.DSL;

record IOLiftSync<A>(Func<EnvIO, A> F) : IODsl<A>
{
public override IODsl<B> Map<B>(Func<A, B> f) =>
new IOLiftSync<B>(x => f(F(x)));
}
25 changes: 25 additions & 0 deletions LanguageExt.Core/Effects/IO/Free/IOBind.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using LanguageExt.DSL;
using LanguageExt.Traits;

namespace LanguageExt;

record IOBind<A>(IODsl<IO<A>> Value) : IO<A>
{
public override IO<B> Map<B>(Func<A, B> f) =>
new IOBind<B>(Value.Map(fa => fa.Map(f)));

public override IO<B> Bind<B>(Func<A, K<IO, B>> f) =>
new IOBind<B>(Value.Map(mx => mx.Bind(f)));

public override IO<B> ApplyBack<B>(K<IO, Func<A, B>> mf) =>
mf switch
{
IOPure<Func<A, B>> (var f) => new IOBind<B>(Value.Map(fa => fa.Map(f))),
IOPureAsync<Func<A, B>> f => new IOBind<B>(Value.Map(fa => fa.ApplyBack(f))),
IOFail<Func<A, B>> (var v) => new IOFail<B>(v),
IOBind<Func<A, B>> (var f) => new IOBind<B>(f.Map(x => x.Apply(this).As())),
IOCatch<Func<A, B>> mc => new IOBind<B>(Value.Map(fa => fa.ApplyBack(mc))),
_ => throw new InvalidOperationException()
};
}
46 changes: 46 additions & 0 deletions LanguageExt.Core/Effects/IO/Free/IOCatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using LanguageExt.Traits;
using LanguageExt.Common;
using System.Threading.Tasks;

namespace LanguageExt;

abstract record IOCatch<A> : IO<A>
{
public abstract ValueTask<IO<A>> Invoke(EnvIO envIO);
}

record IOCatch<X, A>(
K<IO, X> Operation,
Func<Error, bool> Predicate,
Func<Error, K<IO, X>> Failure,
Func<X, IO<A>> Next) : IOCatch<A>
{
public override IO<B> Map<B>(Func<A, B> f) =>
new IOCatch<X, B>(Operation, Predicate, Failure, x => Next(x).Map(f));

public override IO<B> Bind<B>(Func<A, K<IO, B>> f) =>
new IOCatch<X, B>(Operation, Predicate, Failure, x => Next(x).Bind(f));

public override IO<B> ApplyBack<B>(K<IO, Func<A, B>> f) =>
new IOCatch<X, B>(Operation, Predicate, Failure, x => Next(x).ApplyBack(f));

public override async ValueTask<IO<A>> Invoke(EnvIO envIO)
{
try
{
var x = await Operation.As().RunAsync(envIO);
return Next(x);
}
catch(Exception e)
{
var err = Error.New(e);
if (Predicate(err))
{
var x = await Failure(e).As().RunAsync(envIO);
return Next(x);
}
throw;
}
}
}
26 changes: 26 additions & 0 deletions LanguageExt.Core/Effects/IO/Free/IOFail.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using LanguageExt.Common;
using LanguageExt.DSL;
using LanguageExt.Traits;

namespace LanguageExt;

record IOFail<A>(Error Value) : IO<A>
{
public override IO<B> Map<B>(Func<A, B> f) =>
new IOFail<B>(Value);

public override IO<B> Bind<B>(Func<A, K<IO, B>> f) =>
IO.fail<B>(Value);

public override IO<B> ApplyBack<B>(K<IO, Func<A, B>> mf) =>
mf switch
{
IOPure<Func<A, B>> => new IOFail<B>(Value),
IOPureAsync<Func<A, B>> => new IOFail<B>(Value),
IOFail<Func<A, B>> (var v) => new IOFail<B>(v + Value),
IOBind<Func<A, B>> (var f) => new IOBind<B>(f.Map(x => x.Apply(this).As())),
IOCatch<Func<A, B>> => new IOFail<B>(Value),
_ => throw new InvalidOperationException()
};
}
25 changes: 25 additions & 0 deletions LanguageExt.Core/Effects/IO/Free/IOPure.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using LanguageExt.DSL;
using LanguageExt.Traits;

namespace LanguageExt;

record IOPure<A>(A Value) : IO<A>
{
public override IO<B> Map<B>(Func<A, B> f) =>
new IOPure<B>(f(Value));

public override IO<B> Bind<B>(Func<A, K<IO, B>> f) =>
f(Value).As();

public override IO<B> ApplyBack<B>(K<IO, Func<A, B>> mf) =>
mf switch
{
IOPure<Func<A, B>> (var f) => new IOPure<B>(f(Value)),
IOPureAsync<Func<A, B>> (var tf) => new IOPureAsync<B>(tf.Map(f => f(Value))),
IOFail<Func<A, B>> (var v) => new IOFail<B>(v),
IOBind<Func<A, B>> (var f) => new IOBind<B>(f.Map(x => x.Apply(this).As())),
IOCatch<Func<A, B>> mc => mc.Map(f => f(Value)),
_ => throw new InvalidOperationException()
};
}
38 changes: 38 additions & 0 deletions LanguageExt.Core/Effects/IO/Free/IOPureAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Threading.Tasks;
using LanguageExt.DSL;
using LanguageExt.Traits;

namespace LanguageExt;

record IOPureAsync<A>(Task<A> Value) : IO<A>
{
public override IO<B> Map<B>(Func<A, B> f) =>
new IOPureAsync<B>(Value.Map(f));

public override IO<B> Bind<B>(Func<A, K<IO, B>> f) =>
new IOPureAsync<K<IO, B>>(Value.Map(f)).Bind(Prelude.identity);

public override IO<B> ApplyBack<B>(K<IO, Func<A, B>> mf) =>
mf switch
{
IOPure<Func<A, B>> (var f) => new IOPureAsync<B>(Apply(f, Value)),
IOPureAsync<Func<A, B>> (var tf) => new IOPureAsync<B>(Apply(tf, Value)),
IOFail<Func<A, B>> (var v) => new IOFail<B>(v),
IOBind<Func<A, B>> (var f) => new IOBind<B>(f.Map(f1 => f1.Apply(this).As())),
IOCatch<Func<A, B>> mc => mc.Bind(f => new IOPureAsync<B>(Value.Map(f))),
_ => throw new InvalidOperationException()
};

static async Task<B> Apply<B>(Func<A, B> f, Task<A> ta)
{
var a = await ta;
return f(a);
}

static async Task<B> Apply<B>(Task<Func<A, B>> f, Task<A> a)
{
await Task.WhenAll(f, a);
return f.Result(a.Result);
}
}
9 changes: 9 additions & 0 deletions LanguageExt.Core/Effects/IO/IO.Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ namespace LanguageExt;

public partial class IO
{
/// <summary>
/// Lift a pure value into an IO computation
/// </summary>
/// <param name="value">value</param>
/// <typeparam name="A">Bound value type</typeparam>
/// <returns>IO in a success state. Always yields the lifted value.</returns>
public static IO<A> pure<A>(A value) =>
IO<A>.Pure(value);

/// <summary>
/// Put the IO into a failure state
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions LanguageExt.Core/Effects/IO/IO.Monad.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/*
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
Expand Down Expand Up @@ -249,3 +250,4 @@ static K<IO, A> Fallible<Error, IO>.Fail<A>(Error error) =>
static K<IO, A> Fallible<Error, IO>.Catch<A>(K<IO, A> fa, Func<Error, bool> Predicate, Func<Error, K<IO, A>> Fail) =>
fa.As().Catch(Predicate, Fail);
}
*/
48 changes: 24 additions & 24 deletions LanguageExt.Core/Effects/IO/IO.Prelude.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static partial class Prelude
/// Request a cancellation of the IO expression
/// </summary>
public static readonly IO<Unit> cancel =
new IOSync<Unit>(
IO<Unit>.Lift(
e =>
{
e.Source.Cancel();
Expand Down Expand Up @@ -221,18 +221,18 @@ public static K<M, Seq<A>> awaitAll<M, A>(Seq<K<M, A>> ms)
/// <returns>Sequence of results</returns>
public static IO<Seq<A>> awaitAll<A>(Seq<K<IO, A>> ms) =>
awaitAll(ms.Map(f => f.As()));

/// <summary>
/// Awaits all operations
/// </summary>
/// <param name="ms">Operations to await</param>
/// <returns>Sequence of results</returns>
public static IO<Seq<A>> awaitAll<A>(Seq<IO<A>> ms) =>
new IOAsync<Seq<A>>(async eio =>
{
var result = await Task.WhenAll(ms.Map(io => io.RunAsync(eio).AsTask()));
return IOResponse.Complete(result.ToSeqUnsafe());
});
IO.liftAsync(async eio =>
{
var result = await Task.WhenAll(ms.Map(io => io.RunAsync(eio).AsTask()));
return result.ToSeqUnsafe();
});

/// <summary>
/// Awaits all forks
Expand All @@ -249,12 +249,12 @@ public static K<M, Seq<A>> awaitAll<M, A>(Seq<K<M, ForkIO<A>>> forks)
/// <param name="ms">IO operations to await</param>
/// <returns>Sequence of results</returns>
public static IO<Seq<A>> awaitAll<A>(Seq<IO<ForkIO<A>>> mfs) =>
new IOAsync<Seq<A>>(async eio =>
{
var forks = mfs.Map(mf => mf.Run(eio));
var result = await Task.WhenAll(forks.Map(f => f.Await.RunAsync(eio).AsTask()));
return IOResponse.Complete(result.ToSeqUnsafe());
});
IO.liftAsync(async eio =>
{
var forks = mfs.Map(mf => mf.Run(eio));
var result = await Task.WhenAll(forks.Map(f => f.Await.RunAsync(eio).AsTask()));
return result.ToSeqUnsafe();
});

/// <summary>
/// Awaits for any forks to complete
Expand Down Expand Up @@ -368,11 +368,11 @@ public static IO<A> awaitAny<A>(Seq<K<IO, A>> ms) =>
/// If we have collected as many errors as we have forks, then we'll return them all.
/// </returns>
public static IO<A> awaitAny<A>(Seq<IO<A>> ms) =>
new IOAsync<A>(async eio =>
{
var result = await await Task.WhenAny(ms.Map(io => io.RunAsync(eio).AsTask()));
return IOResponse.Complete(result);
});
IO.liftAsync(async eio =>
{
var result = await await Task.WhenAny(ms.Map(io => io.RunAsync(eio).AsTask()));
return result;
});

/// <summary>
/// Awaits for any forks to complete
Expand All @@ -384,12 +384,12 @@ public static IO<A> awaitAny<A>(Seq<IO<A>> ms) =>
/// If we have collected as many errors as we have forks, then we'll return them all.
/// </returns>
public static IO<A> awaitAny<A>(Seq<IO<ForkIO<A>>> mfs) =>
new IOAsync<A>(async eio =>
{
var forks = mfs.Map(mf => mf.Run(eio));
var result = await await Task.WhenAny(forks.Map(f => f.Await.RunAsync(eio).AsTask()));
return IOResponse.Complete(result);
});
IO.liftAsync(async eio =>
{
var forks = mfs.Map(mf => mf.Run(eio));
var result = await await Task.WhenAny(forks.Map(f => f.Await.RunAsync(eio).AsTask()));
return result;
});

/// <summary>
/// Timeout operation if it takes too long
Expand Down
Loading

0 comments on commit 1c4074b

Please sign in to comment.