Skip to content

Commit

Permalink
More elegant Source stream exmaple
Browse files Browse the repository at this point in the history
  • Loading branch information
louthy committed Oct 22, 2024
1 parent ac4f314 commit 073f266
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 14 deletions.
17 changes: 17 additions & 0 deletions LanguageExt.Core/Effects/IO/IO.Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,23 @@ public static K<M, A> Await<M, A>(this K<M, ForkIO<A>> ma)
public static K<M, ForkIO<A>> ForkIO<M, A>(this K<M, A> ma, Option<TimeSpan> timeout = default)
where M : Monad<M> =>
ma.MapIO(io => io.Fork(timeout));


/// <summary>
/// Queue this IO operation to run on the thread-pool.
/// </summary>
/// <param name="timeout">Maximum time that the forked IO operation can run for. `None` for no timeout.</param>
/// <returns>Returns a `ForkIO` data-structure that contains two IO effects that can be used to either cancel
/// the forked IO operation or to await the result of it.
/// </returns>
[Pure]
[MethodImpl(Opt.Default)]
public static K<M, ForkIO<Option<A>>> ForkIO<M, A>(this StreamT<M, A> ma, Option<TimeSpan> timeout = default)
where M : Monad<M> =>
ma.Run()
.Map(oht => oht.Map(ht => ht.Item1))
.ForkIO(timeout);


/// <summary>
/// Timeout operation if it takes too long
Expand Down
9 changes: 9 additions & 0 deletions LanguageExt.Core/Effects/IO/IO.Prelude.cs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,15 @@ public static K<M, A> timeoutIO<M, A>(TimeSpan timeout, K<M, A> ma)
public static K<M, A> repeatIO<M, A>(K<M, A> ma)
where M : Monad<M> =>
ma.RepeatIO();

/// <summary>
/// Keeps repeating the computation
/// </summary>
/// <param name="ma">Computation to repeat</param>
/// <typeparam name="A">Computation bound value type</typeparam>
/// <returns>The result of the last invocation of `ma`</returns>
public static IO<A> repeat<A>(IO<A> ma) =>
ma.Repeat();

/// <summary>
/// Keeps repeating the computation, until the scheduler expires
Expand Down
43 changes: 42 additions & 1 deletion LanguageExt.Core/Effects/Prelude.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,46 @@
namespace LanguageExt;
using System;
using LanguageExt.Traits;

namespace LanguageExt;

public static partial class Prelude
{
/// <summary>
/// Start a new source
/// </summary>
/// <returns>A source in an IO computation</returns>
public static IO<Source<A>> Source<A>() =>
LanguageExt.Source<A>.Start();

/// <summary>
/// Subscribe to the source and await the values
/// </summary>
/// <remarks>
/// Each subscriber runs on the same thread as the event-distributor. So, if you have multiple
/// subscribers they will be processed serially for each event. If you want the subscribers to
/// run in parallel then you must lift the `IO` monad when calling `Subscribe` and `forkIO` the
/// stream. This gives fine-grained control over when to run events in parallel.
/// </remarks>
/// <typeparam name="M">Monad type lifted into the stream</typeparam>
/// <returns>StreamT monad transformer that will get the values coming downstream</returns>

public static StreamT<M, A> await<M, A>(Source<A> source)
where M : Monad<M> =>
source.Await<M>();

/// <summary>
/// Post a value to flow downstream
/// </summary>
/// <param name="value">Value</param>
/// <returns>IO effect</returns>
public static IO<Unit> post<A>(Source<A> source, A value) =>
source.Post(value);

/// <summary>
/// Post a value to flow downstream (partially applied)
/// </summary>
/// <param name="value">Value</param>
/// <returns>IO effect</returns>
public static Func<A, IO<Unit>> post<A>(Source<A> source) =>
source.Post;
}
11 changes: 11 additions & 0 deletions LanguageExt.Core/Monads/Monadic conditionals/Prelude.guard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ public static K<F, Unit> guard<F>(bool flag)
where F : MonoidK<F>, Applicative<F> =>
flag ? F.Pure(unit) : F.Empty<Unit>();

/// <summary>
/// Guard against continuing an applicative expression
/// </summary>
/// <param name="flag">Flag for continuing</param>
/// <returns>Applicative that yields `()` if `flag` is `true`; otherwise it yields `Applicative.Empty` -
/// shortcutting the expression</returns>
[Pure]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static IO<Unit> guardIO(bool flag) =>
flag ? IO.pure(unit) : IO.empty<Unit>();

/// <summary>
/// Guard against continuing a monadic expression
/// </summary>
Expand Down
28 changes: 15 additions & 13 deletions Samples/Streams/SourceStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,24 @@ namespace Streams;
public class SourceStream
{
public static IO<Unit> run =>
from s in Source<string>.Start()
from f in show(s).Iter().ForkIO()
from w in writeLine("Type something and press enter (empty-line ends the demo)")
from r in interaction(s)
select r;
from s in Source<string>()
from f in forkIO(subscribe(s))
from _ in writeLine("Type something and press enter (empty-line ends the demo)") >>
interaction(s)
select unit;

static IO<Unit> interaction(Source<string> source) =>
from l in readLine
from _ in l == ""
? Pure(unit)
: source.Post(l)
.Bind(_ => interaction(source))
select unit;
repeat(from l in readLine
from _ in deliver(source, l)
select unit)
| @catch(unitIO);

static IO<Unit> deliver(Source<string> source, string line) =>
guardIO(line != "") >>
post(source, line);

static StreamT<IO, Unit> show(Source<string> source) =>
from v in source.Await<IO>()
static StreamT<IO, Unit> subscribe(Source<string> source) =>
from v in await<IO, string>(source)
from _ in writeLine(v)
where false
select unit;
Expand Down

0 comments on commit 073f266

Please sign in to comment.