Skip to content

Commit

Permalink
Docs
Browse files Browse the repository at this point in the history
  • Loading branch information
destel committed Mar 18, 2024
1 parent df77820 commit 4ee445c
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 2 deletions.
5 changes: 5 additions & 0 deletions chans/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"time"
)

// Batch groups items from an input channel into batches based on a maximum size and a timeout.
// A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes.
// To emit batches only when full, set the timeout to -1. This function never emits empty batches.
// The timeout countdown starts when the first item is added to a new batch.
func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
if in == nil {
return nil
Expand Down Expand Up @@ -91,6 +95,7 @@ func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
return out
}

// Unbatch is the inverse of Batch. It takes a channel of batches and emits individual items.
func Unbatch[A any](in <-chan []A) <-chan A {
if in == nil {
return nil
Expand Down
21 changes: 19 additions & 2 deletions chans/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,67 @@ package chans

import "github.com/destel/rill/internal/common"

// Map applies a transformation function to each item in an input channel, using n goroutines for concurrency.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedMap to preserve the input order.
func Map[A, B any](in <-chan A, n int, f func(A) B) <-chan B {
return common.MapOrFilter(in, n, func(a A) (B, bool) {
return f(a), true
})
}

// OrderedMap is similar to Map, but it guarantees that the output order is the same as the input order.
func OrderedMap[A, B any](in <-chan A, n int, f func(A) B) <-chan B {
return common.OrderedMapOrFilter(in, n, func(a A) (B, bool) {
return f(a), true
})
}

// Filter removes items that do not meet a specified condition, using n goroutines for concurrent processing.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedFilter to preserve the input order.
func Filter[A any](in <-chan A, n int, f func(A) bool) <-chan A {
return common.MapOrFilter(in, n, func(a A) (A, bool) {
return a, f(a)
})
}

// OrderedFilter is similar to Filter, but it guarantees that the output order is the same as the input order.
func OrderedFilter[A any](in <-chan A, n int, f func(A) bool) <-chan A {
return common.OrderedMapOrFilter(in, n, func(a A) (A, bool) {
return a, f(a)
})
}

// FlatMap applies a function to each item in an input channel, where the function returns a channel of items.
// These items are then flattened into a single output channel. Uses n goroutines for concurrency.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedFlatMap to preserve the input order.
func FlatMap[A, B any](in <-chan A, n int, f func(A) <-chan B) <-chan B {
var zero B
return common.MapOrFlatMap(in, n, func(a A) (b B, bb <-chan B, flat bool) {
return zero, f(a), true
})
}

// OrderedFlatMap is similar to FlatMap, but it guarantees that the output order is the same as the input order.
func OrderedFlatMap[A, B any](in <-chan A, n int, f func(A) <-chan B) <-chan B {
var zero B
return common.OrderedMapOrFlatMap(in, n, func(a A) (b B, bb <-chan B, flat bool) {
return zero, f(a), true
})
}

// blocking
// todo: explain that if false has been returned for item[i] that it's guranteed that function would have been called for all previous items
// ForEach applies a function to each item in an input channel using n goroutines. The function blocks until
// all items are processed or the function returns false. In case of early termination, ForEach ensures
// the input channel is drained to avoid goroutine leaks, making it safe for use in environments where cleanup is crucial.
// While this function does not guarantee the order of item processing due to its concurrent nature,
// using n = 1 results in sequential processing, as in a simple for-range loop.
func ForEach[A any](in <-chan A, n int, f func(A) bool) {
if n == 1 {
for a := range in {
if !f(a) {
DrainNB(in)
break
}
}
Expand Down
2 changes: 2 additions & 0 deletions chans/delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type delayedValue[A any] struct {
SendAt time.Time
}

// Delay postpones the delivery of items from an input channel by a specified duration, maintaining the order.
// Useful for adding delays in processing or simulating latency.
func Delay[A any](in <-chan A, delay time.Duration) <-chan A {
wrapped := make(chan delayedValue[A])
go func() {
Expand Down
8 changes: 8 additions & 0 deletions chans/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func slowMerge[A any](ins []<-chan A) <-chan A {
return out
}

// Merge combines multiple input channels into a single output channel. Items are emitted as soon as they're available,
// so the output order is not defined.
func Merge[A any](ins ...<-chan A) <-chan A {
switch len(ins) {
case 0:
Expand All @@ -90,13 +92,19 @@ func Merge[A any](ins ...<-chan A) <-chan A {
}
}

// Split2 divides the input channel into two output channels based on the discriminator function f, using n goroutines for concurrency.
// The function f takes an item from the input and decides which output channel (out0 or out1) it should go to by returning 0 or 1, respectively.
// Return values other than 0 or 1 lead to the item being discarded.
// The output order is not guaranteed: results are written to the outputs as soon as they're ready.
// Use OrderedSplit2 to preserve the input order.
func Split2[A any](in <-chan A, n int, f func(A) int) (out0 <-chan A, out1 <-chan A) {
outs := common.MapAndSplit(in, 2, n, func(a A) (A, int) {
return a, f(a)
})
return outs[0], outs[1]
}

// OrderedSplit2 is similar to Split2, but it guarantees that the order of the outputs matches the order of the input.
func OrderedSplit2[A any](in <-chan A, n int, f func(A) int) (out0 <-chan A, out1 <-chan A) {
outs := common.OrderedMapAndSplit(in, 2, n, func(a A) (A, int) {
return a, f(a)
Expand Down
6 changes: 6 additions & 0 deletions chans/util.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package chans

// Drain consumes and discards all items from an input channel, blocking until the channel is closed
func Drain[A any](in <-chan A) {
for range in {
}
}

// DrainNB is a non-blocking version of Drain.
func DrainNB[A any](in <-chan A) {
go Drain(in)
}

// Buffer takes a channel of items and returns a buffered channel of exact same items in the same order.
// This is useful when you want to write to the input channel without blocking the writer.
func Buffer[A any](in <-chan A, n int) <-chan A {
// we use n+1 since 1 additional is held on the stack (x variable)
out := make(chan A, n-1)
Expand All @@ -23,6 +27,7 @@ func Buffer[A any](in <-chan A, n int) <-chan A {
return out
}

// FromSlice converts a slice into a channel.
func FromSlice[A any](slice []A) <-chan A {
out := make(chan A, len(slice))
for _, a := range slice {
Expand All @@ -32,6 +37,7 @@ func FromSlice[A any](slice []A) <-chan A {
return out
}

// ToSlice converts a channel into a slice.
func ToSlice[A any](in <-chan A) []A {
var res []A
for x := range in {
Expand Down
5 changes: 5 additions & 0 deletions echans/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ import (
"github.com/destel/rill/chans"
)

// Batch groups items from an input channel into batches based on a maximum size and a timeout.
// A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes.
// To emit batches only when full, set the timeout to -1. This function never emits empty batches.
// The timeout countdown starts when the first item is added to a new batch.
func Batch[A any](in <-chan Try[A], n int, timeout time.Duration) <-chan Try[[]A] {
values, errs := Unwrap(in)
batches := chans.Batch(values, n, timeout)
return WrapAsync(batches, errs)
}

// Unbatch is the inverse of Batch. It takes a channel of batches and emits individual items.
func Unbatch[A any](in <-chan Try[[]A]) <-chan Try[A] {
batches, errs := Unwrap(in)
values := chans.Unbatch(batches)
Expand Down
27 changes: 27 additions & 0 deletions echans/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"github.com/destel/rill/internal/common"
)

// Map applies a transformation function to each item in an input channel, using n goroutines for concurrency.
// If an error is encountered, either from the function f itself or from upstream it is forwarded to the output for further handling.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedMap to preserve the input order.
func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] {
return common.MapOrFilter(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
Expand All @@ -22,6 +26,7 @@ func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B]
})
}

// OrderedMap is similar to Map, but it guarantees that the output order is the same as the input order.
func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] {
return common.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
Expand All @@ -37,6 +42,10 @@ func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan
})
}

// Filter removes items that do not meet a specified condition, using n goroutines for concurrency.
// If an error is encountered, either from the function f itself or from upstream it is forwarded to the output for further handling.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedFilter to preserve the input order.
func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] {
return common.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error != nil {
Expand All @@ -52,6 +61,7 @@ func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[
})
}

// OrderedFilter is similar to Filter, but it guarantees that the output order is the same as the input order.
func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] {
return common.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error != nil {
Expand All @@ -67,6 +77,10 @@ func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-ch
})
}

// FlatMap applies a function to each item in an input channel, where the function returns a channel of items.
// These items are then flattened into a single output channel. Uses n goroutines for concurrency.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedFlatMap to preserve the input order.
func FlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] {
return common.MapOrFlatMap(in, n, func(a Try[A]) (b Try[B], bb <-chan Try[B], flat bool) {
if a.Error != nil {
Expand All @@ -76,6 +90,7 @@ func FlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan
})
}

// OrderedFlatMap is similar to FlatMap, but it guarantees that the output order is the same as the input order.
func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] {
return common.OrderedMapOrFlatMap(in, n, func(a Try[A]) (b Try[B], bb <-chan Try[B], flat bool) {
if a.Error != nil {
Expand All @@ -85,6 +100,10 @@ func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B])
})
}

// Catch allows handling errors from the input channel concurrently using n goroutines for concurrency.
// When f returns nil, error is considered handled and filtered out; otherwise it is replaced by the result of f.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedCatch to preserve the input order.
func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
return common.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error == nil {
Expand All @@ -100,6 +119,7 @@ func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
})
}

// OrderedCatch is similar to Catch, but it guarantees that the output order is the same as the input order.
func OrderedCatch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
return common.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error == nil {
Expand All @@ -115,6 +135,13 @@ func OrderedCatch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Tr
})
}

// ForEach applies a function f to each item in an input channel using n goroutines for parallel processing. The function
// blocks until all items are processed or an error is encountered, either from the function f itself or from upstream.
// In case of an error leading to early termination, ForEach ensures the input channel is drained to avoid goroutine leaks,
// making it safe for use in environments where cleanup is crucial. The function returns the first encountered error, or nil
// if all items were processed successfully.
// While this function does not guarantee the order of item processing due to its concurrent nature,
// using n = 1 results in sequential processing, as in a simple for-range loop.
func ForEach[A any](in <-chan Try[A], n int, f func(A) error) error {
var retErr error
var once sync.Once
Expand Down
2 changes: 2 additions & 0 deletions echans/delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/destel/rill/chans"
)

// Delay postpones the delivery of items from an input channel by a specified duration, maintaining the order.
// Useful for adding delays in processing or simulating latency.
func Delay[A any](in <-chan A, delay time.Duration) <-chan A {
return chans.Delay(in, delay)
}
10 changes: 10 additions & 0 deletions echans/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@ import (
"github.com/destel/rill/internal/common"
)

// Merge combines multiple input channels into a single output channel. Items are emitted as soon as they're available,
// so the output order is not defined.
func Merge[A any](ins ...<-chan A) <-chan A {
return chans.Merge(ins...)
}

// Split2 divides the input channel into two output channels based on the discriminator function f, using n goroutines for concurrency.
// The function f takes an item from the input and decides which output channel (out0 or out1) it should go to by returning 0 or 1, respectively.
// Return values other than 0 or 1 lead to the item being discarded.
// If an error is encountered, either from the function f itself or from upstream it is intentionally sent
// to one of the output channels in a non-deterministic manner.
// The output order is not guaranteed: results are written to the outputs as soon as they're ready.
// Use OrderedSplit2 to preserve the input order.
func Split2[A any](in <-chan Try[A], n int, f func(A) (int, error)) (out0 <-chan Try[A], out1 <-chan Try[A]) {
outs := common.MapAndSplit(in, 2, n, func(a Try[A]) (Try[A], int) {
if a.Error != nil {
Expand All @@ -28,6 +37,7 @@ func Split2[A any](in <-chan Try[A], n int, f func(A) (int, error)) (out0 <-chan
return outs[0], outs[1]
}

// OrderedSplit2 is similar to Split2, but it guarantees that the order of the outputs matches the order of the input.
func OrderedSplit2[A any](in <-chan Try[A], n int, f func(A) (int, error)) (out0 <-chan Try[A], out1 <-chan Try[A]) {
outs := common.OrderedMapAndSplit(in, 2, n, func(a Try[A]) (Try[A], int) {
if a.Error != nil {
Expand Down
7 changes: 7 additions & 0 deletions echans/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ import (
"github.com/destel/rill/chans"
)

// Drain consumes and discards all items from an input channel, blocking until the channel is closed
func Drain[A any](in <-chan A) {
chans.Drain(in)
}

// DrainNB is a non-blocking version of Drain.
func DrainNB[A any](in <-chan A) {
chans.DrainNB(in)
}

// Buffer takes a channel of items and returns a buffered channel of exact same items in the same order.
// This is useful when you want to write to the input channel without blocking the writer.
func Buffer[A any](in <-chan A, n int) <-chan A {
return chans.Buffer(in, n)
}

// FromSlice converts a slice into a channel.
func FromSlice[A any](slice []A) <-chan Try[A] {
out := make(chan Try[A], len(slice))
for _, a := range slice {
Expand All @@ -25,6 +30,8 @@ func FromSlice[A any](slice []A) <-chan Try[A] {
return out
}

// ToSlice converts a channel into a slice.
// If an error is encountered, it will be returned and the rest of the channel will be drained.
func ToSlice[A any](in <-chan Try[A]) ([]A, error) {
var res []A

Expand Down
7 changes: 7 additions & 0 deletions echans/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ import (
"github.com/destel/rill/internal/common"
)

// Try is a container for a value or an error
type Try[A any] struct {
V A
Error error
}

// Wrap converts a regular channel of items into a channel of items wrapped in a Try container.
// This function can also take an error, which will also be added to the output channel.
// Either the input channel or the error can be nil, but not both simultaneously.
func Wrap[A any](values <-chan A, err error) <-chan Try[A] {
if values == nil && err == nil {
return nil
Expand All @@ -31,6 +35,8 @@ func Wrap[A any](values <-chan A, err error) <-chan Try[A] {
return out
}

// WrapAsync converts channel of values and channel of errors into a channel of values wrapped in a Try container.
// Either the input channel or the error channel can be nil, but not both simultaneously.
func WrapAsync[A any](values <-chan A, errs <-chan error) <-chan Try[A] {
wrappedValues := chans.Map(values, 1, func(a A) Try[A] {
return Try[A]{V: a}
Expand All @@ -52,6 +58,7 @@ func WrapAsync[A any](values <-chan A, errs <-chan error) <-chan Try[A] {
return chans.Merge(wrappedErrs, wrappedValues)
}

// Unwrap converts a channel of Try containers into a channel of values and a channel of errors.
func Unwrap[A any](in <-chan Try[A]) (<-chan A, <-chan error) {
if in == nil {
return nil, nil
Expand Down

0 comments on commit 4ee445c

Please sign in to comment.