Skip to content

Commit

Permalink
Add more detailed docs, examples and readme (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
destel authored Jun 19, 2024
1 parent 79cc3b9 commit dd91998
Show file tree
Hide file tree
Showing 11 changed files with 1,086 additions and 663 deletions.
4 changes: 3 additions & 1 deletion .github/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ changelog:
labels: ["new"]
- title: 🐛 Fixes
labels: ["fix", "bug"]
- title: 📝 Dependencies
- title: 📖 Documentation
labels: ["documentation", "docs"]
- title: 🔗 Dependencies
labels: ["dependencies"]
- title: 📦 Other
labels: ["*"]
579 changes: 275 additions & 304 deletions README.md

Large diffs are not rendered by default.

23 changes: 17 additions & 6 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,29 @@ import (
"github.com/destel/rill/internal/core"
)

// Batch groups items from an input channel into batches based on a maximum size and a timeout.
// A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes.
// To emit batches only when full, set the timeout to -1. This function never emits empty batches.
// The timeout countdown starts when the first item is added to a new batch.
// Zero timeout is not supported and will panic.
// Batch take a stream of items and returns a stream of batches based on a maximum size and a timeout.
//
// A batch is emitted when one of the following conditions is met:
// - The batch reaches the size of n items
// - The time since the first item was added to the batch exceeds the timeout
// - The input stream is closed
//
// This function never emits empty batches. To disable the timeout and emit batches only based on the size,
// set the timeout to -1. Setting the timeout to zero is not supported and will result in a panic
//
// This is a non-blocking ordered function that processes items sequentially.
//
// See the package documentation for more information on non-blocking ordered functions and error handling.
func Batch[A any](in <-chan Try[A], n int, timeout time.Duration) <-chan Try[[]A] {
values, errs := ToChans(in)
batches := core.Batch(values, n, timeout)
return FromChans(batches, errs)
}

// Unbatch is the inverse of [Batch]. It takes a channel of batches and emits individual items.
// Unbatch is the inverse of [Batch]. It takes a stream of batches and returns a stream of individual items.
//
// This is a non-blocking ordered function that processes items sequentially.
// See the package documentation for more information on non-blocking ordered functions and error handling.
func Unbatch[A any](in <-chan Try[[]A]) <-chan Try[A] {
batches, errs := ToChans(in)
values := core.Unbatch(batches)
Expand Down
67 changes: 27 additions & 40 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"github.com/destel/rill/internal/core"
)

// ForEach applies a function f to each item in an input channel using n goroutines for parallel processing. The function
// blocks until all items are processed or an error is encountered, either from the function f itself or from upstream.
// In case of an error leading to early termination, ForEach ensures the input channel is drained to avoid goroutine leaks,
// making it safe for use in environments where cleanup is crucial. The function returns the first encountered error, or nil
// if all items were processed successfully.
// While this function does not guarantee the order of item processing due to its concurrent nature,
// using n = 1 results in sequential processing, as in a simple for-range loop.
// ForEach applies a function f to each item in an input stream.
//
// This is a blocking unordered function that processes items concurrently using n goroutines.
// The case when n = 1 is optimized: it does not spawn additional goroutines and processes items sequentially,
// making the function ordered and similar to a regular for-range loop.
//
// See the package documentation for more information on blocking unordered functions and error handling.
func ForEach[A any](in <-chan Try[A], n int, f func(A) error) error {
if n == 1 {
for a := range in {
Expand Down Expand Up @@ -66,15 +66,10 @@ func onceFunc1[T any](f func(T)) func(T) {
}
}

// Err returns the first error encountered in the input channel.
// This function blocks until:
// - An error is found.
// - The end of the input channel is reached.
// Err returns the first error encountered in the input stream or nil if there were no errors.
//
// If Err terminates early (before the input channel is fully consumed),
// it initiates background draining of the remaining items in the channel. This is done
// to prevent goroutine leaks by ensuring that all goroutines feeding the channel are allowed to complete.
// The input channel should not be used anymore after calling this function.
// This is a blocking ordered function that processes items sequentially.
// See the package documentation for more information on blocking ordered functions and error handling.
func Err[A any](in <-chan Try[A]) error {
defer DrainNB(in)

Expand All @@ -87,16 +82,11 @@ func Err[A any](in <-chan Try[A]) error {
return nil
}

// First returns the first value or error encountered in the input channel.
// This function blocks until:
// - A value is found. In this case, the found flag is set to true.
// - The end of the input channel is reached. In this case, the found flag is set to false.
// - An error is encountered in the input channel.
// First returns the first item or error encountered in the input stream, whichever comes first.
// The found return flag is set to false if the stream was empty, otherwise it is set to true.
//
// If First terminates early (before the input channel is fully consumed),
// it initiates background draining of the remaining items in the channel. This is done
// to prevent goroutine leaks by ensuring that all goroutines feeding the channel are allowed to complete.
// The input channel should not be used anymore after calling this function.
// This is a blocking ordered function that processes items sequentially.
// See the package documentation for more information on blocking ordered functions and error handling.
func First[A any](in <-chan Try[A]) (value A, found bool, err error) {
defer DrainNB(in)

Expand All @@ -108,16 +98,14 @@ func First[A any](in <-chan Try[A]) (value A, found bool, err error) {
return
}

// Any checks if there is an item in the input channel that satisfies the condition f.
// This function uses n goroutines for concurrency. It blocks execution until either:
// - A matching item is found
// - All items have been checked
// - An error is encountered in the condition function f or from the upstream
// Any checks if there is an item in the input stream that satisfies the condition f.
// This function returns true as soon as it finds such an item. Otherwise, it returns false.
//
// In case of early termination, Any ensures the input channel is drained to avoid goroutine leaks,
// making it safe for use in environments where cleanup is crucial. The function returns the first encountered error, or nil
// Any is a blocking unordered function that processes items concurrently using n goroutines.
// The case when n = 1 is optimized: it does not spawn additional goroutines and processes items sequentially,
// making the function ordered.
//
// The function returns true if a match is found, false otherwise, or a first encountered error.
// See the package documentation for more information on blocking unordered functions and error handling.
func Any[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error) {
errBreak := errors.New("break")
res := false
Expand Down Expand Up @@ -145,16 +133,15 @@ func Any[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error)
return res, err
}

// All checks if all items in the input channel satisfy the condition function f.
// This function uses n goroutines for concurrency and blocks execution until:
// - A non-matching item is found,
// - All items have been checked,
// - An error is encountered in the condition function f or from the upstream.
// All checks if all items in the input stream satisfy the condition f.
// This function returns false as soon as it finds an item that does not satisfy the condition. Otherwise, it returns true,
// including the case when the stream was empty.
//
// In case of early termination, All ensures the input channel is drained to avoid goroutine leaks,
// making it safe for use in environments where cleanup is crucial. The function returns the first encountered error, or nil
// This is a blocking unordered function that processes items concurrently using n goroutines.
// The case when n = 1 is optimized: it does not spawn additional goroutines and processes items sequentially,
// making the function ordered.
//
// Returns true if all items match the condition, false otherwise, or a first encountered error.
// See the package documentation for more information on blocking unordered functions and error handling.
func All[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error) {
// Idea: x && y && z is the same as !(!x || !y || !z)
// So we can use Any with a negated condition to implement All
Expand Down
82 changes: 78 additions & 4 deletions doc.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,79 @@
// Package rill is a Go toolkit designed for efficient and straightforward streaming, parallel processing, and pipeline construction.
// It abstracts away the complexities of concurrency management, enabling developers to focus on core logic.
// With features like lightweight integration, batch processing, error handling, and support for functional programming paradigms,
// rill enhances productivity in building concurrent applications. It offers type-safe operations, and minimizes memory usage even for large data sets.
// Package rill is a collection of easy-to-use functions for concurrency, streaming, batching and pipeline construction.
// It abstracts away the complexities of concurrency, removes boilerplate, and provides a structured way to handle errors.
// Rill is modular and can be easily integrated into existing projects: it requires no setup and allows using only the necessary functions.
// At the same time, rill's functions can be composed into complex, concurrent, and reusable pipelines when needed.
//
// # Streams and Try Containers
//
// In this package, a stream refers to a channel of [Try] containers. A Try container is a simple struct that holds a value and an error.
// When an "empty stream" is referred to, it means a channel of Try containers that has been closed and was never written to.
//
// Most functions in this package are concurrent, and the level of concurrency can be controlled by the argument n.
// Some functions share common behaviors and characteristics, which are described below.
//
// # Non-blocking functions
//
// Functions such as [Map], [Filter], and [Batch] take a stream as an input and return a new stream as an output.
// They do not block and return the output stream immediately. All the processing is done in the background by the goroutine pools they spawn.
// These functions forward all errors from the input stream to the output stream.
// Any errors returned by the user-provided functions are also sent to the output stream.
// When such function reaches the end of the input stream, it closes the output stream, stops processing and cleans up resources.
//
// Such functions are designed to be composed together to build complex processing pipelines:
//
// stage2 := rill.Map(input, ...)
// stage3 := rill.Batch(stage2, ...)
// stage4 := rill.Map(stage3, ...)
// results := rill.Unbatch(stage4, ...)
// // consume the results and handle errors with some blocking function
//
// # Blocking functions
//
// Functions such as [ForEach], [Reduce] and [MapReduce] are used at the last stage of the pipeline
// to consume the stream and return the final result or error.
//
// Usually, these functions block until one of the following conditions is met:
// - The end of the stream is reached. In this case, the function returns the final result.
// - An error is encountered either in the input stream or in some user-provided function. In this case, the function returns the error.
//
// In case of an early termination (before reaching the end of the input stream), such functions initiate
// background draining of the remaining items. This is done to prevent goroutine
// leaks by ensuring that all goroutines feeding the stream are allowed to complete.
// The input stream should not be used anymore after calling such functions.
//
// It's also possible to consume the pipeline results manually, for example using a for-range loop.
// In this case, add a deferred call to [DrainNB] before the loop to ensure that goroutines are not leaked.
//
// defer rill.DrainNB(results)
//
// for res := range results {
// if res.Error != nil {
// return res.Error
// }
// // process res.Value
// }
//
// # Unordered functions
//
// Functions such as [Map], [Filter] and [FlatMap] write items to the output stream as soon as they become available.
// Due to the concurrent nature of these functions, the order of items in the output stream may not match the order of items in the input stream.
// These functions prioritize performance and concurrency over maintaining the original order.
//
// # Ordered functions
//
// Functions such as [OrderedMap] or [OrderedFilter] preserve the order of items from the input stream.
// These functions are still concurrent, but use special synchronization techniques to ensure that
// items are written to the output stream in the same order as they were read from the input stream.
// This additional synchronization has some overhead, but it is negligible for i/o bound workloads.
//
// Some other functions, such as [ToSlice], [Batch] or [First] are not concurrent and are ordered by nature.
//
// # Error handling
//
// Error handling can be non-trivial in concurrent applications. Rill simplifies this by providing a structured error handling approach.
// As described above, all errors are automatically propagated down the pipeline to the final stage, where they can be caught.
// This allows the pipeline to terminate after the first error is encountered and return it to the caller.
//
// In cases where more complex error handling logic is required, the [Catch] function can be used.
// It allows to catch and handle errors at any point in the pipeline, providing the flexibility to handle not only the first error, but any of them.
package rill
Loading

0 comments on commit dd91998

Please sign in to comment.