Skip to content

Commit

Permalink
Readme and documentation fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
destel committed Mar 20, 2024
1 parent 343b506 commit b87cb78
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 18 deletions.
189 changes: 189 additions & 0 deletions echans/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Rill
Rill (noun: a small stream) is a comprehensive Go toolkit for streaming, parallel processing, and pipeline construction.
Designed to reduce boilerplate and simplify usage, it empowers developers to focus on core logic
without getting bogged down by the complexity of concurrency.


## Key features
- **Lightweight**: fast and modular, can be easily integrated into existing projects
- **Easy to use**: the complexity of managing goroutines, wait groups, and error handling is abstracted away
- **Concurrent**: control the level of concurrency for all operations
- **Batching**: provides a simple way to organize and process data in batches
- **Error Handling**: provides a structured way to handle errors in concurrent apps
- **Streaming**: handles real-time data streams or large datasets with a minimal memory footprint
- **Order Preservation**: offers functions that preserve the original order of data, while still allowing for concurrent processing
- **Functional Programming**: based on functional programming concepts, making operations like map, filter, flatMap and others available for channel-based workflows
- **Generic**: all operations are type-safe and can be used with any data type


## Installation
```bash
go get github.com/destel/rill
```

## Example
A function that fetches keys from multiple URLs, retrieves their values from a Redis database, and prints them.
This example demonstrates the library's strengths in handling concurrent tasks, error propagation, batching and data streaming,
all while maintaining simplicity and efficiency.
See full runnable example at examples/redis-read/main.go

```go
type KV struct {
Key string
Value string
}

func printValues(ctx context.Context, urls []string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error or early exit, this ensures all http and redis operations are canceled

// Convert URLs into a channel
urlsChan := echans.FromSlice(urls)

// Fetch and stream keys from each URL concurrently
keys := echans.FlatMap(urlsChan, 10, func(url string) <-chan echans.Try[string] {
return streamKeys(ctx, url)
})

// Exclude any empty keys from the stream
keys = echans.Filter(keys, 5, func(key string) (bool, error) {
return key != "", nil
})

// Organize keys into manageable batches of 10 for bulk operations
keyBatches := echans.Batch(keys, 10, 1*time.Second)

// Fetch values from Redis for each batch of keys
resultBatches := echans.Map(keyBatches, 5, func(keys []string) ([]KV, error) {
values, err := redisMGet(ctx, keys...)
if err != nil {
return nil, err
}

results := make([]KV, len(keys))
for i, key := range keys {
results[i] = KV{Key: key, Value: values[i]}
}

return results, nil
})

// Convert batches back to a single items for final processing
results := echans.Unbatch(resultBatches)

// Exclude any empty values from the stream
results = echans.Filter(results, 5, func(kv KV) (bool, error) {
return kv.Value != "<nil>", nil
})

// Iterate over each key-value pair and print
cnt := 0
err := echans.ForEach(results, 1, func(kv KV) error {
fmt.Println(kv.Key, "=>", kv.Value)
cnt++
return nil
})
fmt.Println("Total keys:", cnt)

return err
}




```


## Design philosophy
At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the Try structure.
Such channels can be created manually or through utilities like **FromSlice**, **Wrap**, and **WrapAsync**, and then transformed via operations
such as **Map**, **Filter**, **FlatMap** and others. Finally when all processing stages are completed, the data can be consumed by
**ForEach**, **ToSlice** or manually by iterating over the resulting channel.



## Batching
Batching is a common pattern in concurrent processing, especially when dealing with external services or databases.
Rill provides a Batch function that organizes a stream of items into batches of a specified size. It's also possible
to specify a timeout, after which the batch is emitted even if it's not full. This is useful for keeping an app reactive
when input stream is slow or sparse.





## Error handling
In the examples above errors are handled using **ForEach**, which is good for most use cases.
**ForEach** stops processing on the first error and returns it. If you need to handle error in the middle of pipeline,
and continue processing, there is a **Catch** function that can be used for that.

```go
results := echans.Map(input, 10, func(item int) (int, error) {
// do some processing
})

results = echans.Catch(results, 5, func(err error) {
if errors.Is(err, sql.ErrNoRows) {
return nil // ignore this error
} else {
return fmt.Errorf("error processing item: %w", err) // wrap error and continue processing
}
})

err := echans.ForEach(results, 1, func(item int) error {
// process results as usual
})
```


## Order preservation
There are use cases where it's necessary to preserve the original order of data, while still allowing for concurrent processing.
Below is an example function that fetches temperature measurements for each day in a specified range
and prints temperature movements for each day. OrderedMap function fetches measurements in parallel, but returns them in chronological order.
This allows the next stage of processing to calculate temperature differences between consecutive days.
See full runnable example at examples/weather/main.go

```go
type Measurement struct {
Date time.Time
Temp float64
Movement float64
}

func printTemperatureMovements(ctx context.Context, city string, startDate, endDate time.Time) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error or early exit, this ensures all http are canceled

// Make a channel that emits all the days between startDate and endDate
days := make(chan echans.Try[time.Time])
go func() {
defer close(days)
for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) {
days <- echans.Try[time.Time]{V: date}
}
}()

// Download the temperature for each day in parallel and in order
measurements := echans.OrderedMap(days, 10, func(date time.Time) (Measurement, error) {
temp, err := getTemperature(ctx, city, date)
return Measurement{Date: date, Temp: temp}, err
})

// Calculate the temperature movements. Use a single goroutine
prev := Measurement{Temp: math.NaN()}
measurements = echans.OrderedMap(measurements, 1, func(m Measurement) (Measurement, error) {
m.Movement = m.Temp - prev.Temp
prev = m
return m, nil
})

// Iterate over the measurements and print the movements
err := echans.ForEach(measurements, 1, func(m Measurement) error {
fmt.Printf("%s: %.1f°C (movement %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Movement)
prev = m
return nil
})

return err
}
```
18 changes: 9 additions & 9 deletions echans/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// Map applies a transformation function to each item in an input channel, using n goroutines for concurrency.
// If an error is encountered, either from the function f itself or from upstream it is forwarded to the output for further handling.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedMap to preserve the input order.
// Use [OrderedMap] to preserve the input order.
func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] {
return common.MapOrFilter(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
Expand All @@ -26,7 +26,7 @@ func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B]
})
}

// OrderedMap is similar to Map, but it guarantees that the output order is the same as the input order.
// OrderedMap is similar to [Map], but it guarantees that the output order is the same as the input order.
func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] {
return common.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
Expand All @@ -45,7 +45,7 @@ func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan
// Filter removes items that do not meet a specified condition, using n goroutines for concurrency.
// If an error is encountered, either from the function f itself or from upstream it is forwarded to the output for further handling.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedFilter to preserve the input order.
// Use [OrderedFilter] to preserve the input order.
func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] {
return common.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error != nil {
Expand All @@ -61,7 +61,7 @@ func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[
})
}

// OrderedFilter is similar to Filter, but it guarantees that the output order is the same as the input order.
// OrderedFilter is similar to [Filter], but it guarantees that the output order is the same as the input order.
func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] {
return common.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error != nil {
Expand All @@ -80,7 +80,7 @@ func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-ch
// FlatMap applies a function to each item in an input channel, where the function returns a channel of items.
// These items are then flattened into a single output channel. Uses n goroutines for concurrency.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedFlatMap to preserve the input order.
// Use [OrderedFlatMap] to preserve the input order.
func FlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] {
return common.MapOrFlatMap(in, n, func(a Try[A]) (b Try[B], bb <-chan Try[B], flat bool) {
if a.Error != nil {
Expand All @@ -90,7 +90,7 @@ func FlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan
})
}

// OrderedFlatMap is similar to FlatMap, but it guarantees that the output order is the same as the input order.
// OrderedFlatMap is similar to [FlatMap], but it guarantees that the output order is the same as the input order.
func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] {
return common.OrderedMapOrFlatMap(in, n, func(a Try[A]) (b Try[B], bb <-chan Try[B], flat bool) {
if a.Error != nil {
Expand All @@ -100,10 +100,10 @@ func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B])
})
}

// Catch allows handling errors from the input channel concurrently using n goroutines for concurrency.
// Catch allows handling errors from the input channel using n goroutines for concurrency.
// When f returns nil, error is considered handled and filtered out; otherwise it is replaced by the result of f.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedCatch to preserve the input order.
// Use [OrderedCatch] to preserve the input order.
func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
return common.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error == nil {
Expand All @@ -119,7 +119,7 @@ func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
})
}

// OrderedCatch is similar to Catch, but it guarantees that the output order is the same as the input order.
// OrderedCatch is similar to [Catch], but it guarantees that the output order is the same as the input order.
func OrderedCatch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
return common.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error == nil {
Expand Down
5 changes: 5 additions & 0 deletions echans/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Package rill is a Go toolkit designed for efficient and straightforward streaming, parallel processing, and pipeline construction.
// It abstracts away the complexities of concurrency management, enabling developers to focus on core logic.
// With features like lightweight integration, batch processing, error handling, and support for functional programming paradigms,
// rill enhances productivity in building concurrent applications. It offers type-safe operations, and minimizes memory usage even for large data sets.
package echans
4 changes: 2 additions & 2 deletions echans/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func Merge[A any](ins ...<-chan A) <-chan A {
// If an error is encountered, either from the function f itself or from upstream it is intentionally sent
// to one of the output channels in a non-deterministic manner.
// The output order is not guaranteed: results are written to the outputs as soon as they're ready.
// Use OrderedSplit2 to preserve the input order.
// Use [OrderedSplit2] to preserve the input order.
func Split2[A any](in <-chan Try[A], n int, f func(A) (int, error)) (out0 <-chan Try[A], out1 <-chan Try[A]) {
outs := common.MapAndSplit(in, 2, n, func(a Try[A]) (Try[A], int) {
if a.Error != nil {
Expand All @@ -37,7 +37,7 @@ func Split2[A any](in <-chan Try[A], n int, f func(A) (int, error)) (out0 <-chan
return outs[0], outs[1]
}

// OrderedSplit2 is similar to Split2, but it guarantees that the order of the outputs matches the order of the input.
// OrderedSplit2 is similar to [Split2], but it guarantees that the order of the outputs matches the order of the input.
func OrderedSplit2[A any](in <-chan Try[A], n int, f func(A) (int, error)) (out0 <-chan Try[A], out1 <-chan Try[A]) {
outs := common.OrderedMapAndSplit(in, 2, n, func(a Try[A]) (Try[A], int) {
if a.Error != nil {
Expand Down
10 changes: 8 additions & 2 deletions echans/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ func Drain[A any](in <-chan A) {
chans.Drain(in)
}

// DrainNB is a non-blocking version of Drain.
// DrainNB is a non-blocking version of [Drain].
func DrainNB[A any](in <-chan A) {
chans.DrainNB(in)
}

// Buffer takes a channel of items and returns a buffered channel of exact same items in the same order.
// This is useful when you want to write to the input channel without blocking the writer.
//
// Typical use case would look like
//
// ids = Buffer(ids, 100)
// // Now up to 100 ids can be buffered if subsequent stages of the pipeline are slow
func Buffer[A any](in <-chan A, n int) <-chan A {
return chans.Buffer(in, n)
}
Expand All @@ -31,7 +36,8 @@ func FromSlice[A any](slice []A) <-chan Try[A] {
}

// ToSlice converts a channel into a slice.
// If an error is encountered, it will be returned and the rest of the channel will be drained.
// Conversion stops at the first error encountered.
// In case of an error, ToSlice ensures the input channel is drained to avoid goroutine leaks,
func ToSlice[A any](in <-chan Try[A]) ([]A, error) {
var res []A

Expand Down
11 changes: 6 additions & 5 deletions echans/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ type Try[A any] struct {
Error error
}

// Wrap converts a regular channel of items into a channel of items wrapped in a Try container.
// This function can also take an error, which will also be added to the output channel.
// Wrap converts a regular channel of items into a channel of items wrapped in a [Try] container.
// Additionally, this function can also take an error, which will be added to the output channel.
// Either the input channel or the error can be nil, but not both simultaneously.
func Wrap[A any](values <-chan A, err error) <-chan Try[A] {
if values == nil && err == nil {
Expand All @@ -35,8 +35,9 @@ func Wrap[A any](values <-chan A, err error) <-chan Try[A] {
return out
}

// WrapAsync converts channel of values and channel of errors into a channel of values wrapped in a Try container.
// Either the input channel or the error channel can be nil, but not both simultaneously.
// WrapAsync converts a regular channel of items into a channel of items wrapped in a [Try] container.
// Additionally, this function can also take a channel of errors, that will be added to the output channel.
// Either the input channel or the error channel can be nil, but not both simultaneously.
func WrapAsync[A any](values <-chan A, errs <-chan error) <-chan Try[A] {
wrappedValues := chans.Map(values, 1, func(a A) Try[A] {
return Try[A]{V: a}
Expand All @@ -58,7 +59,7 @@ func WrapAsync[A any](values <-chan A, errs <-chan error) <-chan Try[A] {
return chans.Merge(wrappedErrs, wrappedValues)
}

// Unwrap converts a channel of Try containers into a channel of values and a channel of errors.
// Unwrap converts a channel of [Try] containers into a channel of values and a channel of errors.
func Unwrap[A any](in <-chan Try[A]) (<-chan A, <-chan error) {
if in == nil {
return nil, nil
Expand Down
Loading

0 comments on commit b87cb78

Please sign in to comment.