From dd919989b6ff89caa16f5afcb97f10983cd1f4cb Mon Sep 17 00:00:00 2001 From: Viktor Nikolaiev Date: Wed, 19 Jun 2024 17:03:43 +0300 Subject: [PATCH] Add more detailed docs, examples and readme (#23) --- .github/release.yaml | 4 +- README.md | 579 +++++++++++++++---------------- batch.go | 23 +- consume.go | 67 ++-- doc.go | 82 ++++- example_test.go | 803 ++++++++++++++++++++++++++++++------------- merge.go | 25 +- reduce.go | 51 ++- transform.go | 56 +-- util.go | 16 +- wrap.go | 43 ++- 11 files changed, 1086 insertions(+), 663 deletions(-) diff --git a/.github/release.yaml b/.github/release.yaml index 165ce26..70616e6 100644 --- a/.github/release.yaml +++ b/.github/release.yaml @@ -4,7 +4,9 @@ changelog: labels: ["new"] - title: 🐛 Fixes labels: ["fix", "bug"] - - title: 📝 Dependencies + - title: 📖 Documentation + labels: ["documentation", "docs"] + - title: 🔗 Dependencies labels: ["dependencies"] - title: 📦 Other labels: ["*"] \ No newline at end of file diff --git a/README.md b/README.md index eb8c78c..b630675 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,23 @@ # Rill [![GoDoc](https://pkg.go.dev/badge/github.com/destel/rill)](https://pkg.go.dev/github.com/destel/rill) [![Go Report Card](https://goreportcard.com/badge/github.com/destel/rill)](https://goreportcard.com/report/github.com/destel/rill) [![codecov](https://codecov.io/gh/destel/rill/graph/badge.svg?token=252K8OQ7E1)](https://codecov.io/gh/destel/rill) -Rill (noun: a small stream) is a comprehensive Go toolkit for streaming, parallel processing, and pipeline construction. -Designed to reduce boilerplate and simplify usage, it empowers developers to focus on core logic -without getting bogged down by the complexity of concurrency. +Rill (noun: a small stream) is a Go toolkit that offers a collection of easy-to-use functions for concurrency, streaming, +batching and pipeline construction. It abstracts away the complexities of concurrency, removes boilerplate, +provides a structured way to handle errors and allows developers to focus on core logic. +Whether you need to perform a basic concurrent ForEach or construct a complex multi-stage processing pipeline, +Rill has got you covered. -## Key features -- **Lightweight**: fast and modular, can be easily integrated into existing projects -- **Easy to use**: the complexity of managing goroutines, wait groups, and error handling is abstracted away -- **Concurrent**: control the level of concurrency for all operations -- **Batching**: provides a simple way to organize and process data in batches -- **Error Handling**: provides a structured way to handle errors in concurrent apps +## Key Features +- **Easy to Use**: the complexity of managing goroutines, channels, wait groups, and atomics is abstracted away +- **Easy to Integrate**: seamlessly integrates into existing projects without any setup or configuration +- **Concurrent**: provides control over the level of concurrency for all operations +- **Error Handling**: provides a structured way to handle errors in concurrent applications - **Streaming**: handles real-time data streams or large datasets with a minimal memory footprint -- **Order Preservation**: offers functions that preserve the original order of data, while still allowing for concurrent processing -- **Efficient Resource Use**: the number of goroutines and allocations does not depend on the data size +- **Modular**: allows composing functions to create custom pipelines and higher-order operations +- **Batching**: simplifies organizing and processing data in batches +- **Order Preservation**: provides functions that maintain the original order of data during concurrent processing +- **Efficient Resource Use**: ensures goroutine pool sizes and memory allocations are independent of input size - **Generic**: all operations are type-safe and can be used with any data type -- **Functional Programming**: based on functional programming concepts, making operations like map, filter, flatMap and others available for channel-based workflows - ## Installation @@ -25,108 +26,49 @@ go get github.com/destel/rill ``` +## Example Usage +Consider an example application that loads users from an API concurrently, +updates their status to active and saves them back, +while controlling the level of concurrency for each operation. -## Example usage -A basic example demonstrating how **ForEach** can be used to process a list of items concurrently and handle errors. - -[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Basic) +[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package) ```go func main() { - items := rill.FromSlice([]string{"item1", "item2", "item3", "item4", "item5", "item6", "item7", "item8", "item9", "item10"}, nil) - - err := rill.ForEach(items, 3, func(item string) error { - randomSleep(1000 * time.Millisecond) // simulate some additional work - res := strings.ToUpper(item) - fmt.Println(res) - return nil - }) - if err != nil { - fmt.Println("Error:", err) - } + // In case of early exit this will cancel the file streaming, + // which in turn will terminate the entire pipeline. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start with a stream of user ids + ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Read users from the API. + // Concurrency = 3 + users := rill.Map(ids, 3, func(id int) (*User, error) { + return getUser(ctx, id) + }) + + // Activate users. + // Concurrency = 2 + err := rill.ForEach(users, 2, func(u *User) error { + if u.IsActive { + fmt.Printf("User %d is already active\n", u.ID) + return nil + } + + u.IsActive = true + return saveUser(ctx, u) + }) + + fmt.Println("Error:", err) } ``` -Consider a more advanced example: an application that fetches keys from multiple URLs, retrieves their values from a key-value database in batches, and prints them. -This example demonstrates the library's strengths in handling concurrent tasks, error propagation, batching and data streaming, -all while maintaining simplicity and efficiency. - -[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Batching) - -```go -func main() { - urls := rill.FromSlice([]string{ - "https://example.com/file1.txt", - "https://example.com/file2.txt", - "https://example.com/file3.txt", - "https://example.com/file4.txt", - }, nil) - - // Fetch keys from each URL and flatten them into a single stream - keys := rill.FlatMap(urls, 3, func(url string) <-chan rill.Try[string] { - return streamFileLines(url) - }) - - // Exclude any empty keys from the stream - keys = rill.Filter(keys, 3, func(key string) (bool, error) { - return key != "", nil - }) - - // Organize keys into manageable batches of 10 for bulk operations - keyBatches := rill.Batch(keys, 10, 1*time.Second) - - // Fetch values from DB for each batch of keys - resultBatches := rill.Map(keyBatches, 3, func(keys []string) ([]KV, error) { - values, err := kvMultiGet(keys...) - if err != nil { - return nil, err - } - - results := make([]KV, len(keys)) - for i, key := range keys { - results[i] = KV{Key: key, Value: values[i]} - } - - return results, nil - }) - - // Convert batches back to a single items for final processing - results := rill.Unbatch(resultBatches) - - // Exclude any empty values from the stream - results = rill.Filter(results, 3, func(kv KV) (bool, error) { - return kv.Value != "", nil - }) - - // Iterate over each key-value pair and print - cnt := 0 - err := rill.ForEach(results, 1, func(kv KV) error { - fmt.Println(kv.Key, "=>", kv.Value) - cnt++ - return nil - }) - if err != nil { - fmt.Println("Error:", err) - } - - fmt.Println("Total keys:", cnt) -} - -// streamFileLines does line-by-line streaming of a file from a URL, -func streamFileLines(url string) <-chan rill.Try[string] { - // ... -} -// kvMultiGet does a batch read from a key-value database, -func kvMultiGet(keys ...string) ([]string, error) { - // ... -} -``` - - -## Testing strategy +## Testing Strategy Rill has a test coverage of over 95%, with testing focused on: - **Correctness**: ensuring that functions produce accurate results at different levels of concurrency - **Concurrency**: confirming that correct number of goroutines is spawned and utilized @@ -137,170 +79,214 @@ Rill has a test coverage of over 95%, with testing focused on: -## Design philosophy -At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the **Try** structure. -Such channels can be created manually or through utilities like **FromSlice** or **FromChan**, and then transformed via operations -such as **Map**, **Filter**, **FlatMap** and others. Finally, when all processing stages are completed, the data can be consumed by -**ForEach**, **ToSlice** or manually by iterating over the resulting channel. +## Design Philosophy +At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the **Try** container. +This allows to propagate both values and errors through the pipeline, ensuring that errors are handled correctly at each stage. +Such wrapped channels can be created manually or through utilities like **FromSlice** or **FromChan**, and then transformed via non-blocking +functions like **Map** or **Filter**. Finally, the transformed stream can be consumed by a blocking function such as +**ForEach**, **Reduce** or **MapReduce** + +One of the key features of Rill is the ability to control the level of concurrency for almost all operations through the **n** parameter. +This is possible due to the channel and goroutine orchestration that library does under the hood. Rill's built-in functions manage +worker pools internally, making the number of goroutines and allocations independent of the input size. + +Finally, rill is designed to be modular and extensible. Most functions take streams as input and return transformed streams as output, +It's easy to create custom reusable higher-order operations and pipelines by combining existing ones. ## Batching Batching is a common pattern in concurrent processing, especially when dealing with external services or databases. -Rill provides a **Batch** function that organizes a stream of items into batches of a specified size. It's also possible +Rill provides a **Batch** function that transforms a stream of items into a stream of batches of a specified size. It's also possible to specify a timeout, after which the batch is emitted even if it's not full. This is useful for keeping an application reactive when input stream is slow or sparse. +Consider a modification of the previous example, where list of ids is streamed from a remote file, +and users are fetched from the API in batches. - -## Fan-In and Fan-Out -The library offers mechanisms for fanning in and out data streams. Fan-in is done with the **Merge** function, -which consolidates multiple data streams into a single unified channel. -Fan-out is done with the **Split2** function, that divides a single input stream into two distinct output channels. -This division is based on a discriminator function, allowing parallel processing paths based on data characteristics. - - - -## Error handling -In the examples above errors are handled using **ForEach**, which is good for most use cases. -**ForEach** stops processing on the first error and returns it. If you need to handle errors in the middle of a pipeline, -and/or continue processing after an error, there is a **Catch** function that can be used for that. +[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Batching) ```go -results := rill.Map(input, 10, func(item int) (int, error) { - // do some processing -}) - -results = rill.Catch(results, 5, func(err error) { - if errors.Is(err, sql.ErrNoRows) { - return nil // ignore this error - } else { - return fmt.Errorf("error processing item: %w", err) // wrap other errors - } -}) - -err := rill.ForEach(results, 1, func(item int) error { - // process results as usual -}) +func main() { + // In case of early exit this will cancel the file streaming, + // which in turn will terminate the entire pipeline. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Stream a file with user ids as an io.Reader + reader, err := downloadFile(ctx, "http://example.com/user_ids1.txt") + if err != nil { + fmt.Println("Error:", err) + return + } + + // Transform the reader into a stream of words + lines := streamLines(reader) + + // Parse lines as integers + // Concurrency = 3 + ids := rill.Map(lines, 3, func(line string) (int, error) { + return strconv.Atoi(line) + }) + + // Group IDs into batches of 5 for bulk processing + idBatches := rill.Batch(ids, 5, 1*time.Second) + + // Fetch users for each batch of IDs + // Concurrency = 3 + userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*User, error) { + return getUsers(ctx, ids...) + }) + + // Transform batches back into a stream of users + users := rill.Unbatch(userBatches) + + // Activate users. + // Concurrency = 2 + err = rill.ForEach(users, 2, func(u *User) error { + if u.IsActive { + fmt.Printf("User %d is already active\n", u.ID) + return nil + } + + u.IsActive = true + return saveUser(ctx, u) + }) + + fmt.Println("Error:", err) +} ``` +## Fan-in and Fan-out +Go channels support both Fan-in and Fan-out patterns, meaning that multiple goroutines can write to a single channel (fan-in) +or read from a single channel (fan-out). On top of that Rill adds a Merge function that can be used to combine multiple streams into a single one. +Consider a basic example application that concurrently sends messages through multiple servers, then collects the results +into a single stream and handles errors. -## Termination and resource leaks -In Go concurrent applications, if there are no readers for a channel, writers can become stuck, -leading to potential goroutine and memory leaks. This issue extends to rill pipelines, which are built on Go channels; -if any stage in a pipeline lacks a consumer, the whole chain of producers upstream may become blocked. -Therefore, it's vital to ensure that pipelines are fully consumed, especially in cases where errors lead to early termination. -The example below demonstrates a situation where the final processing stage exits upon the first encountered error, -risking a blocked pipeline state. +[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-FanIn_FanOut) ```go -func doWork(ctx context.Context) error { - // Initialize the first stage of the pipeline - ids := streamIDs(ctx) - - // Define other pipeline stages... - - // Final stage processing - for value := range results { - // Process value... - if someCondition { - return fmt.Errorf("some error") // Early exit on error - } - } - return nil +func main() { + messages := rill.FromSlice([]string{ + "message1", "message2", "message3", "message4", "message5", + "message6", "message7", "message8", "message9", "message10", + }, nil) + + // Fan-out the messages to three servers + results1 := rill.Map(messages, 2, func(message string) (string, error) { + return message, sendMessage(message, "server1") + }) + + results2 := rill.Map(messages, 2, func(message string) (string, error) { + return message, sendMessage(message, "server2") + }) + + results3 := rill.Map(messages, 2, func(message string) (string, error) { + return message, sendMessage(message, "server3") + }) + + // Fan-in the results from all servers into a single stream + results := rill.Merge(results1, results2, results3) + + // Handle errors + err := rill.Err(results) + fmt.Println("Error:", err) } ``` -To prevent such issues, it's advisable to ensure the results channel is drained in the event of an error. -A straightforward approach is to use defer to invoke **DrainNB**: +## Errors, Termination and Contexts +Error handling can be non-trivial in concurrent applications. Rill simplifies this by providing a structured error handling approach. +Usually rill pipelines consist of zero or more non-blocking stages that transform the input stream, +and one blocking stage that returns the results. General rule is: any error happening anywhere in the pipeline is +propagated down to the final stage, where it's caught by some blocking function and returned to the caller. + +Rill provides several blocking functions out of the box: + +- **ForEach:** Concurrently applies a user function to each item in the stream. + [Example](https://pkg.go.dev/github.com/destel/rill#example-ForEach) +- **ToSlice:** Collects all stream items into a slice. + [Example](https://pkg.go.dev/github.com/destel/rill#example-ToSlice) +- **Reduce:** Concurrently reduces the stream to a single value, using a user provided reducer function. + [Example](https://pkg.go.dev/github.com/destel/rill#example-Reduce) +- **MapReduce:** Performs a concurrent MapReduce operation one the stream, reducing it to Go map, + using user provided mapper and reducer functions. + [Example](https://pkg.go.dev/github.com/destel/rill#example-MapReduce) +- **All:** Concurrently checks if all items in the stream satisfy a user provided condition. + [Example](https://pkg.go.dev/github.com/destel/rill#example-All) +- **Any:** Concurrently checks if at least one item in the stream satisfies a user provided condition. + [Example](https://pkg.go.dev/github.com/destel/rill#example-Any) +- **First:** Returns the first item or error encountered in the stream. + [Example](https://pkg.go.dev/github.com/destel/rill#example-First) +- **Err:** Returns the first error encountered in the stream. + [Example](https://pkg.go.dev/github.com/destel/rill#example-Err) + + +All blocking functions share a common behavior. In case of an early termination (before reaching the end of the input stream), +such functions initiate background draining of the remaining items. This is done to prevent goroutine leaks by ensuring that +all goroutines feeding the stream are allowed to complete. +The input stream should not be used anymore after calling a blocking function. + +It also possible to use a for-range loop instead of a blocking function to consume the stream. +In this case, the caller would be responsible for draining the stream in case of an early termination. +See more details in the package documentation. + +Rill is context-agnostic, meaning that it does not enforce any specific context usage. +However, it's recommended to make user-defined pipeline stages context-aware. +This is especially important for the initial stage, as it allows to finish background draining +process, described above, faster. + +In the example below the printOddSquares function initiates a pipeline that depends on a context. +When an error occurs in one of the pipeline stages, it propagates down the pipeline, causing an early return, +context cancellation (via defer) and resource cleanup. + +[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Context) ```go -func doWork(ctx context.Context) error { - // Initialize the first stage of the pipeline - ids := streamIDs(ctx) - - // Define other pipeline stages... - - // Ensure pipeline is drained in case of failure - defer rill.DrainNB(results) - - // Final stage processing - for value := range results { - // Process value... - if someCondition { - return fmt.Errorf("some error") // Early exit on error - } - } - return nil +func main() { + ctx := context.Background() + + err := printOddSquares(ctx) + fmt.Println("Error:", err) } -``` -Utilizing functions like **ForEach** or **ToSlice**, which incorporate built-in draining mechanisms, can simplify -the code and enhance readability: +func printOddSquares(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() -```go -func doWork(ctx context.Context) error { - // Initialize the first stage of the pipeline - ids := streamIDs(ctx) - - // Define other pipeline stages... - - // Final stage processing - return rill.ForEach(results, 5, func(value string) error { - // Process value... - if someCondition { - return fmt.Errorf("some error") // Early exit on error, with automatic draining - } - return nil - }) -} -``` + numbers := infiniteNumberStream(ctx) -While these measures are effective in preventing leaks, the pipeline may continue draining values in the background as long -as the initial stage produces values. A best practice is to manage the first stage (and potentially others) with a context, -allowing for a controlled shutdown: + odds := rill.Filter(numbers, 3, func(x int) (bool, error) { + if x == 20 { + return false, fmt.Errorf("early exit") + } + return x%2 == 1, nil + }) -```go -func doWork(ctx context.Context) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() // Ensures first stage is cancelled upon function exit - - // Initialize the first stage of the pipeline - ids := streamIDs(ctx) - - // Define other pipeline stages... - - // Final stage processing - return rill.ForEach(results, 5, func(value string) error { - // Process value - if someCondition { - return fmt.Errorf("some error") // Early exit on error, with automatic draining - } - return nil - }) + return rill.ForEach(odds, 3, func(x int) error { + fmt.Println(x * x) + return nil + }) } ``` +## Order Preservation +In concurrent applications, maintaining the original sequence of processed items is challenging due to the nature of parallel execution. +When values are read from an input stream, concurrently processed through a function **f**, and written to an output stream, their order might not +match the order of the input. To address this, rill provides ordered versions of its core functions, such as **OrderedMap** or **OrderedFilter**. +These ensure that if value **x** precedes value **y** in the input channel, then **f(x)** will precede **f(y)** in the output, +preserving the original order. It's important to note that these ordered functions have a small overhead compared to their unordered counterparts, +due to more advanced orchestration and synchronization happening under the hood. - -## Order preservation -In concurrent environments, maintaining the original sequence of processed items is challenging due to the nature of parallel execution. -When values are read from an input channel, processed through a function **f**, and written to an output channel, their order might not -mirror the input sequence. To address this, rill provides ordered versions of its core functions, such as **OrderedMap**, **OrderedFilter**, -and others. These ensure that if value **x** precedes value **y** in the input channel, then **f(x)** will precede **f(y)** in the output, -preserving the original order. It's important to note that these ordered functions incur a small overhead compared to their unordered counterparts, -due to the additional logic required to maintain order. - -Order preservation is vital in scenarios where the sequence of data impacts the outcome. Take, for instance, an application that retrieves -daily temperature measurements over a specific period and calculates the change in temperature from one day to the next. -Although fetching the data in parallel boosts efficiency, processing it in the original order is crucial for -accurate computation of temperature variations. +Order preservation is vital in scenarios where the sequence of data impacts the outcome, such as time-series data processing. +Take, for instance, an application that retrieves daily temperature measurements over a specific period and calculates the change +in temperature from one day to the next. Such application can benefit from concurrent data fetching, but need fetched data +to be processed in the correct order. [Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Ordering) @@ -311,83 +297,68 @@ type Measurement struct { } func main() { - city := "New York" - endDate := time.Now() - startDate := endDate.AddDate(0, 0, -30) - - // Make a channel that emits all the days between startDate and endDate - days := make(chan rill.Try[time.Time]) - go func() { - defer close(days) - for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) { - days <- rill.Wrap(date, nil) - } - }() - - // Download the temperature for each day concurrently - measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) { - temp, err := getTemperature(city, date) - return Measurement{Date: date, Temp: temp}, err - }) - - // Iterate over the measurements, calculate and print changes. Use a single goroutine - prev := Measurement{Temp: math.NaN()} - err := rill.ForEach(measurements, 1, func(m Measurement) error { - change := m.Temp - prev.Temp - prev = m - - fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change) - return nil - }) - if err != nil { - fmt.Println("Error:", err) - } -} - -// getTemperature fetches a temperature reading for a city and date, -func getTemperature(city string, date time.Time) (float64, error) { - // ... + city := "New York" + endDate := time.Now() + startDate := endDate.AddDate(0, 0, -30) + + // Create a stream of all days between startDate and endDate + days := make(chan rill.Try[time.Time]) + go func() { + defer close(days) + for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) { + days <- rill.Wrap(date, nil) + } + }() + + // Fetch the temperature for each day from the API + // Concurrency = 10; Ordered + measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) { + temp, err := getTemperature(city, date) + return Measurement{Date: date, Temp: temp}, err + }) + + // Iterate over the measurements, calculate and print changes. + // Concurrency = 1; Ordered + prev := Measurement{Temp: math.NaN()} + err := rill.ForEach(measurements, 1, func(m Measurement) error { + change := m.Temp - prev.Temp + prev = m + + fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change) + return nil + }) + + fmt.Println("Error:", err) } ``` -## Working with slices -Rill is designed for channel based workflows, but it can also be used with slices, thanks to its ability -to do ordered processing. Example below demonstrates how to create a **mapSLice** generic helper function that -does parallel slice processing. That helper is then used to fetch users from an API concurrently. +## Limitations +While rill provides a powerful and expressive way to build concurrent pipelines, there are certain limitations and +scenarios where alternative approaches might be more suitable. -[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Slices) +Go channels are a fundamental and convenient feature for handling concurrency and communication between goroutines. +However, it's important to note that channels come with a certain overhead. The impact of this overhead varies depending on +the specific use: -```go -type User struct { - ID int - Username string -} - -// mapSLice is a helper function that does a parallel map operation on a slice of items -func mapSLice[A, B any](in []A, n int, f func(A) (B, error)) ([]B, error) { - inChan := rill.FromSlice(in, nil) - outChan := rill.OrderedMap(inChan, n, f) - return rill.ToSlice(outChan) -} - -func main() { - ids := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} +- **I/O-bound tasks:** Channels are great for handling I/O-bound tasks, such as reading from or writing to files, + network communication, or database operations. The overhead of channels is typically negligible compared to + the time spent waiting for I/O operations to complete +- **Heavy CPU-bound tasks:** For more computationally intensive tasks, such as complex string manipulation, parsing, encryption, + or hash calculation, the overhead of channels becomes less significant compared to the overall processing time. + In these scenarios, using channels and rill can still provide an efficient way to parallelize the workload. See [benchmarks](https://github.com/destel/rill/wiki/Benchmarks) for more details. +- **Light CPU-bound tasks:** When parallelizing a large number of small CPU-bound tasks, such as simple arithmetic operations, + the overhead of channels can become significant. In such cases, using channels and goroutines may not provide + the desired performance benefits. - users, err := mapSLice(ids, 3, getUser) - if err != nil { - fmt.Println("Error:", err) - return - } +If your use case requires high-performance calculations and you want to minimize the overhead of channels, +you can consider alternative approaches or libraries. For example, it's possible to transform a slice without channels and +with almost zero orchestration, just by dividing the slice into n chunks and assigning each chunk to a separate goroutine. - fmt.Printf("%+v\n", users) -} - - - -// getUser fetches a user from an API -func getUser(id int) (User, error) { - // ... -} +Because of the reasons mentioned above and to avoid misleading users, rill does not provide functions that operate directly on slices. +It main focus is channels and streaming. However, slices can still be used with rill by converting them to and from channels, +and leveraging ordered transformations when necessary. -``` \ No newline at end of file +Another limitation of rill is that it does not provide a way to create a global worker pool for the entire pipeline. +Each stage of the pipeline must have at least one alive goroutine to keep the whole pipeline running. +That's why each stage has its own goroutine pool, which is created and managed internally. \ No newline at end of file diff --git a/batch.go b/batch.go index 58ed6c5..455aec0 100644 --- a/batch.go +++ b/batch.go @@ -6,18 +6,29 @@ import ( "github.com/destel/rill/internal/core" ) -// Batch groups items from an input channel into batches based on a maximum size and a timeout. -// A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes. -// To emit batches only when full, set the timeout to -1. This function never emits empty batches. -// The timeout countdown starts when the first item is added to a new batch. -// Zero timeout is not supported and will panic. +// Batch take a stream of items and returns a stream of batches based on a maximum size and a timeout. +// +// A batch is emitted when one of the following conditions is met: +// - The batch reaches the size of n items +// - The time since the first item was added to the batch exceeds the timeout +// - The input stream is closed +// +// This function never emits empty batches. To disable the timeout and emit batches only based on the size, +// set the timeout to -1. Setting the timeout to zero is not supported and will result in a panic +// +// This is a non-blocking ordered function that processes items sequentially. +// +// See the package documentation for more information on non-blocking ordered functions and error handling. func Batch[A any](in <-chan Try[A], n int, timeout time.Duration) <-chan Try[[]A] { values, errs := ToChans(in) batches := core.Batch(values, n, timeout) return FromChans(batches, errs) } -// Unbatch is the inverse of [Batch]. It takes a channel of batches and emits individual items. +// Unbatch is the inverse of [Batch]. It takes a stream of batches and returns a stream of individual items. +// +// This is a non-blocking ordered function that processes items sequentially. +// See the package documentation for more information on non-blocking ordered functions and error handling. func Unbatch[A any](in <-chan Try[[]A]) <-chan Try[A] { batches, errs := ToChans(in) values := core.Unbatch(batches) diff --git a/consume.go b/consume.go index a915647..26c3d0b 100644 --- a/consume.go +++ b/consume.go @@ -7,13 +7,13 @@ import ( "github.com/destel/rill/internal/core" ) -// ForEach applies a function f to each item in an input channel using n goroutines for parallel processing. The function -// blocks until all items are processed or an error is encountered, either from the function f itself or from upstream. -// In case of an error leading to early termination, ForEach ensures the input channel is drained to avoid goroutine leaks, -// making it safe for use in environments where cleanup is crucial. The function returns the first encountered error, or nil -// if all items were processed successfully. -// While this function does not guarantee the order of item processing due to its concurrent nature, -// using n = 1 results in sequential processing, as in a simple for-range loop. +// ForEach applies a function f to each item in an input stream. +// +// This is a blocking unordered function that processes items concurrently using n goroutines. +// The case when n = 1 is optimized: it does not spawn additional goroutines and processes items sequentially, +// making the function ordered and similar to a regular for-range loop. +// +// See the package documentation for more information on blocking unordered functions and error handling. func ForEach[A any](in <-chan Try[A], n int, f func(A) error) error { if n == 1 { for a := range in { @@ -66,15 +66,10 @@ func onceFunc1[T any](f func(T)) func(T) { } } -// Err returns the first error encountered in the input channel. -// This function blocks until: -// - An error is found. -// - The end of the input channel is reached. +// Err returns the first error encountered in the input stream or nil if there were no errors. // -// If Err terminates early (before the input channel is fully consumed), -// it initiates background draining of the remaining items in the channel. This is done -// to prevent goroutine leaks by ensuring that all goroutines feeding the channel are allowed to complete. -// The input channel should not be used anymore after calling this function. +// This is a blocking ordered function that processes items sequentially. +// See the package documentation for more information on blocking ordered functions and error handling. func Err[A any](in <-chan Try[A]) error { defer DrainNB(in) @@ -87,16 +82,11 @@ func Err[A any](in <-chan Try[A]) error { return nil } -// First returns the first value or error encountered in the input channel. -// This function blocks until: -// - A value is found. In this case, the found flag is set to true. -// - The end of the input channel is reached. In this case, the found flag is set to false. -// - An error is encountered in the input channel. +// First returns the first item or error encountered in the input stream, whichever comes first. +// The found return flag is set to false if the stream was empty, otherwise it is set to true. // -// If First terminates early (before the input channel is fully consumed), -// it initiates background draining of the remaining items in the channel. This is done -// to prevent goroutine leaks by ensuring that all goroutines feeding the channel are allowed to complete. -// The input channel should not be used anymore after calling this function. +// This is a blocking ordered function that processes items sequentially. +// See the package documentation for more information on blocking ordered functions and error handling. func First[A any](in <-chan Try[A]) (value A, found bool, err error) { defer DrainNB(in) @@ -108,16 +98,14 @@ func First[A any](in <-chan Try[A]) (value A, found bool, err error) { return } -// Any checks if there is an item in the input channel that satisfies the condition f. -// This function uses n goroutines for concurrency. It blocks execution until either: -// - A matching item is found -// - All items have been checked -// - An error is encountered in the condition function f or from the upstream +// Any checks if there is an item in the input stream that satisfies the condition f. +// This function returns true as soon as it finds such an item. Otherwise, it returns false. // -// In case of early termination, Any ensures the input channel is drained to avoid goroutine leaks, -// making it safe for use in environments where cleanup is crucial. The function returns the first encountered error, or nil +// Any is a blocking unordered function that processes items concurrently using n goroutines. +// The case when n = 1 is optimized: it does not spawn additional goroutines and processes items sequentially, +// making the function ordered. // -// The function returns true if a match is found, false otherwise, or a first encountered error. +// See the package documentation for more information on blocking unordered functions and error handling. func Any[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error) { errBreak := errors.New("break") res := false @@ -145,16 +133,15 @@ func Any[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error) return res, err } -// All checks if all items in the input channel satisfy the condition function f. -// This function uses n goroutines for concurrency and blocks execution until: -// - A non-matching item is found, -// - All items have been checked, -// - An error is encountered in the condition function f or from the upstream. +// All checks if all items in the input stream satisfy the condition f. +// This function returns false as soon as it finds an item that does not satisfy the condition. Otherwise, it returns true, +// including the case when the stream was empty. // -// In case of early termination, All ensures the input channel is drained to avoid goroutine leaks, -// making it safe for use in environments where cleanup is crucial. The function returns the first encountered error, or nil +// This is a blocking unordered function that processes items concurrently using n goroutines. +// The case when n = 1 is optimized: it does not spawn additional goroutines and processes items sequentially, +// making the function ordered. // -// Returns true if all items match the condition, false otherwise, or a first encountered error. +// See the package documentation for more information on blocking unordered functions and error handling. func All[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error) { // Idea: x && y && z is the same as !(!x || !y || !z) // So we can use Any with a negated condition to implement All diff --git a/doc.go b/doc.go index ea2c9d3..d99dd33 100644 --- a/doc.go +++ b/doc.go @@ -1,5 +1,79 @@ -// Package rill is a Go toolkit designed for efficient and straightforward streaming, parallel processing, and pipeline construction. -// It abstracts away the complexities of concurrency management, enabling developers to focus on core logic. -// With features like lightweight integration, batch processing, error handling, and support for functional programming paradigms, -// rill enhances productivity in building concurrent applications. It offers type-safe operations, and minimizes memory usage even for large data sets. +// Package rill is a collection of easy-to-use functions for concurrency, streaming, batching and pipeline construction. +// It abstracts away the complexities of concurrency, removes boilerplate, and provides a structured way to handle errors. +// Rill is modular and can be easily integrated into existing projects: it requires no setup and allows using only the necessary functions. +// At the same time, rill's functions can be composed into complex, concurrent, and reusable pipelines when needed. +// +// # Streams and Try Containers +// +// In this package, a stream refers to a channel of [Try] containers. A Try container is a simple struct that holds a value and an error. +// When an "empty stream" is referred to, it means a channel of Try containers that has been closed and was never written to. +// +// Most functions in this package are concurrent, and the level of concurrency can be controlled by the argument n. +// Some functions share common behaviors and characteristics, which are described below. +// +// # Non-blocking functions +// +// Functions such as [Map], [Filter], and [Batch] take a stream as an input and return a new stream as an output. +// They do not block and return the output stream immediately. All the processing is done in the background by the goroutine pools they spawn. +// These functions forward all errors from the input stream to the output stream. +// Any errors returned by the user-provided functions are also sent to the output stream. +// When such function reaches the end of the input stream, it closes the output stream, stops processing and cleans up resources. +// +// Such functions are designed to be composed together to build complex processing pipelines: +// +// stage2 := rill.Map(input, ...) +// stage3 := rill.Batch(stage2, ...) +// stage4 := rill.Map(stage3, ...) +// results := rill.Unbatch(stage4, ...) +// // consume the results and handle errors with some blocking function +// +// # Blocking functions +// +// Functions such as [ForEach], [Reduce] and [MapReduce] are used at the last stage of the pipeline +// to consume the stream and return the final result or error. +// +// Usually, these functions block until one of the following conditions is met: +// - The end of the stream is reached. In this case, the function returns the final result. +// - An error is encountered either in the input stream or in some user-provided function. In this case, the function returns the error. +// +// In case of an early termination (before reaching the end of the input stream), such functions initiate +// background draining of the remaining items. This is done to prevent goroutine +// leaks by ensuring that all goroutines feeding the stream are allowed to complete. +// The input stream should not be used anymore after calling such functions. +// +// It's also possible to consume the pipeline results manually, for example using a for-range loop. +// In this case, add a deferred call to [DrainNB] before the loop to ensure that goroutines are not leaked. +// +// defer rill.DrainNB(results) +// +// for res := range results { +// if res.Error != nil { +// return res.Error +// } +// // process res.Value +// } +// +// # Unordered functions +// +// Functions such as [Map], [Filter] and [FlatMap] write items to the output stream as soon as they become available. +// Due to the concurrent nature of these functions, the order of items in the output stream may not match the order of items in the input stream. +// These functions prioritize performance and concurrency over maintaining the original order. +// +// # Ordered functions +// +// Functions such as [OrderedMap] or [OrderedFilter] preserve the order of items from the input stream. +// These functions are still concurrent, but use special synchronization techniques to ensure that +// items are written to the output stream in the same order as they were read from the input stream. +// This additional synchronization has some overhead, but it is negligible for i/o bound workloads. +// +// Some other functions, such as [ToSlice], [Batch] or [First] are not concurrent and are ordered by nature. +// +// # Error handling +// +// Error handling can be non-trivial in concurrent applications. Rill simplifies this by providing a structured error handling approach. +// As described above, all errors are automatically propagated down the pipeline to the final stage, where they can be caught. +// This allows the pipeline to terminate after the first error is encountered and return it to the caller. +// +// In cases where more complex error handling logic is required, the [Catch] function can be used. +// It allows to catch and handle errors at any point in the pipeline, providing the flexibility to handle not only the first error, but any of them. package rill diff --git a/example_test.go b/example_test.go index 7c4aeb1..8ac8671 100644 --- a/example_test.go +++ b/example_test.go @@ -2,22 +2,21 @@ package rill_test import ( "bufio" + "context" + "errors" "fmt" + "hash/fnv" "io" "math" "math/rand" - "path/filepath" + "regexp" + "strconv" "strings" "time" "github.com/destel/rill" ) -type KV struct { - Key string - Value string -} - type Measurement struct { Date time.Time Temp float64 @@ -26,128 +25,133 @@ type Measurement struct { type User struct { ID int Username string + IsActive bool } -type Company struct { - ID int - Name string -} +// --- Package examples --- -// A basic example demonstrating how [ForEach] can be used to process a list of items concurrently. -func Example_basic() { - items := rill.FromSlice([]string{"item1", "item2", "item3", "item4", "item5", "item6", "item7", "item8", "item9", "item10"}, nil) +// This example demonstrates a Rill pipeline that fetches users from an API, +// and updates their status to active and saves them back. Both operations are done concurrently. +func Example() { + // In case of early exit this will cancel the file streaming, + // which in turn will terminate the entire pipeline. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - err := rill.ForEach(items, 3, func(item string) error { - randomSleep(1000 * time.Millisecond) // simulate some additional work - res := strings.ToUpper(item) - fmt.Println(res) - return nil + // Start with a stream of user ids + ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Read users from the API. + // Concurrency = 3 + users := rill.Map(ids, 3, func(id int) (*User, error) { + return getUser(ctx, id) }) - if err != nil { - fmt.Println("Error:", err) - } -} -// Rill is designed for channel based workflows, but it can also be used with slices, thanks to its ability -// to do ordered processing. Example below demonstrates how you can create a **mapSLice** generic helper function that -// does parallel slice processing. That helper is then used to fetch users from an API concurrently. -func Example_slices() { - startedAt := time.Now() - defer func() { fmt.Println("Elapsed:", time.Since(startedAt)) }() + // Activate users. + // Concurrency = 2 + err := rill.ForEach(users, 2, func(u *User) error { + if u.IsActive { + fmt.Printf("User %d is already active\n", u.ID) + return nil + } - ids := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + u.IsActive = true + return saveUser(ctx, u) + }) - users, err := mapSLice(ids, 3, getUser) + fmt.Println("Error:", err) +} + +// This example showcases the use of Rill for building a multi-stage data processing pipeline, +// with a focus on batch processing. It streams user ids from a remote file, then fetches users from an API in batches, +// updates their status to active, and saves them back. All operations are done concurrently. +func Example_batching() { + // In case of early exit this will cancel the file streaming, + // which in turn will terminate the entire pipeline. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Stream a file with user ids as an io.Reader + reader, err := downloadFile(ctx, "http://example.com/user_ids1.txt") if err != nil { fmt.Println("Error:", err) return } - fmt.Printf("%+v\n", users) -} - -// mapSLice is a helper function that does a parallel map operation on a slice of items -func mapSLice[A, B any](in []A, n int, f func(A) (B, error)) ([]B, error) { - inChan := rill.FromSlice(in, nil) - outChan := rill.OrderedMap(inChan, n, f) - return rill.ToSlice(outChan) -} + // Transform the reader into a stream of words + lines := streamLines(reader) -// This example fetches keys from a list of URLs, retrieves their values from a key-value database, and prints them. -// The pipeline leverages concurrency for fetching and processing and uses batching to reduce the number of database calls. -func Example_batching() { - startedAt := time.Now() - defer func() { fmt.Println("Elapsed:", time.Since(startedAt)) }() - - urls := rill.FromSlice([]string{ - "https://example.com/file1.txt", - "https://example.com/file2.txt", - "https://example.com/file3.txt", - "https://example.com/file4.txt", - }, nil) - - // Fetch keys from each URL and flatten them into a single stream - keys := rill.FlatMap(urls, 3, func(url string) <-chan rill.Try[string] { - return streamFileLines(url) + // Parse lines as integers + // Concurrency = 3 + ids := rill.Map(lines, 3, func(line string) (int, error) { + return strconv.Atoi(line) }) - // Exclude any empty keys from the stream - keys = rill.Filter(keys, 3, func(key string) (bool, error) { - return key != "", nil - }) + // Group IDs into batches of 5 for bulk processing + idBatches := rill.Batch(ids, 5, 1*time.Second) - // Organize keys into manageable batches of 10 for bulk operations - keyBatches := rill.Batch(keys, 10, 1*time.Second) + // Fetch users for each batch of IDs + // Concurrency = 3 + userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*User, error) { + return getUsers(ctx, ids...) + }) - // Fetch values from DB for each batch of keys - resultBatches := rill.Map(keyBatches, 3, func(keys []string) ([]KV, error) { - values, err := kvMultiGet(keys...) - if err != nil { - return nil, err - } + // Transform batches back into a stream of users + users := rill.Unbatch(userBatches) - results := make([]KV, len(keys)) - for i, key := range keys { - results[i] = KV{Key: key, Value: values[i]} + // Activate users. + // Concurrency = 2 + err = rill.ForEach(users, 2, func(u *User) error { + if u.IsActive { + fmt.Printf("User %d is already active\n", u.ID) + return nil } - return results, nil + u.IsActive = true + return saveUser(ctx, u) }) - // Convert batches back to a single items for final processing - results := rill.Unbatch(resultBatches) + fmt.Println("Error:", err) +} - // Exclude any empty values from the stream - results = rill.Filter(results, 3, func(kv KV) (bool, error) { - return kv.Value != "", nil +// This example demonstrates how to use the Fan-in and Fan-out patterns +// to send messages through multiple servers concurrently. +func Example_fanIn_FanOut() { + messages := rill.FromSlice([]string{ + "message1", "message2", "message3", "message4", "message5", + "message6", "message7", "message8", "message9", "message10", + }, nil) + + // Fan-out the messages to three servers + results1 := rill.Map(messages, 2, func(message string) (string, error) { + return message, sendMessage(message, "server1") }) - // Iterate over each key-value pair and print - cnt := 0 - err := rill.ForEach(results, 1, func(kv KV) error { - fmt.Println(kv.Key, "=>", kv.Value) - cnt++ - return nil + results2 := rill.Map(messages, 2, func(message string) (string, error) { + return message, sendMessage(message, "server2") }) - if err != nil { - fmt.Println("Error:", err) - } - fmt.Println("Total keys:", cnt) + results3 := rill.Map(messages, 2, func(message string) (string, error) { + return message, sendMessage(message, "server3") + }) + + // Fan-in the results from all servers into a single stream + results := rill.Merge(results1, results2, results3) + + // Handle errors + err := rill.Err(results) + fmt.Println("Error:", err) } // This example demonstrates how [OrderedMap] can be used to enforce ordering of processing results. // Pipeline below fetches temperature measurements for a city and calculates daily temperature changes. // Measurements are fetched concurrently, but ordered processing is used to calculate the changes. func Example_ordering() { - startedAt := time.Now() - defer func() { fmt.Println("Elapsed:", time.Since(startedAt)) }() - city := "New York" endDate := time.Now() startDate := endDate.AddDate(0, 0, -30) - // Make a channel that emits all the days between startDate and endDate + // Create a stream of all days between startDate and endDate days := make(chan rill.Try[time.Time]) go func() { defer close(days) @@ -156,13 +160,15 @@ func Example_ordering() { } }() - // Download the temperature for each day concurrently + // Fetch the temperature for each day from the API + // Concurrency = 10; Ordered measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) { temp, err := getTemperature(city, date) return Measurement{Date: date, Temp: temp}, err }) - // Iterate over the measurements, calculate and print changes. Use a single goroutine + // Iterate over the measurements, calculate and print changes. + // Concurrency = 1; Ordered prev := Measurement{Temp: math.NaN()} err := rill.ForEach(measurements, 1, func(m Measurement) error { change := m.Temp - prev.Temp @@ -171,36 +177,188 @@ func Example_ordering() { fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change) return nil }) - if err != nil { - fmt.Println("Error:", err) - } + + fmt.Println("Error:", err) } -// This example demonstrates how [Reduce] can be used to calculate a sum of numbers from a channel. -func ExampleReduce() { - // Create a channel with some values +// This example demonstrates a concurrent [MapReduce] performed on a set of remote files. +// It downloads them and calculates how many times each word appears in all the files. +func Example_mapReduce() { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + defer cancel() + + // Start with a stream of file URLs + urls := rill.FromSlice([]string{ + "http://example.com/text1.txt", + "http://example.com/text2.txt", + "http://example.com/text3.txt", + }, nil) + + // Download files concurrently, and get a stream of all words from all files + // Concurrency = 2 + words := rill.FlatMap(urls, 2, func(url string) <-chan rill.Try[string] { + reader, err := downloadFile(ctx, url) + if err != nil { + return rill.FromSlice[string](nil, err) // Wrap the error in a stream + } + + return streamWords(reader) + }) + + // Count the number of occurrences of each word + counts, err := rill.MapReduce(words, + // Map phase: Use the word as key and "1" as value + // Concurrency = 3 + 3, func(word string) (string, int, error) { + return strings.ToLower(word), 1, nil + }, + // Reduce phase: Sum all "1" values for the same key + // Concurrency = 2 + 2, func(x, y int) (int, error) { + return x + y, nil + }, + ) + + fmt.Println("Result:", counts) + fmt.Println("Error:", err) +} + +// This example demonstrates how to use context cancellation to terminate a Rill pipeline in case of an early exit. +// The printOddSquares function initiates a pipeline that prints squares of odd numbers. +// The infiniteNumberStream function is the initial stage of the pipeline. It generates numbers indefinitely until the context is canceled. +// When an error occurs in one of the pipeline stages: +// - The error is propagated down the pipeline and reaches the ForEach stage. +// - The ForEach function returns the error. +// - The printOddSquares function returns, and the context is canceled using defer. +// - The infiniteNumberStream function terminates due to context cancellation. +// - The entire pipeline is cleaned up gracefully. +func Example_context() { + ctx := context.Background() + + err := printOddSquares(ctx) + fmt.Println("Error:", err) + + // Wait one more second to see "infiniteNumberStream terminated" printed + time.Sleep(1 * time.Second) +} + +func printOddSquares(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + numbers := infiniteNumberStream(ctx) + + odds := rill.Filter(numbers, 3, func(x int) (bool, error) { + if x == 20 { + return false, fmt.Errorf("early exit") + } + return x%2 == 1, nil + }) + + return rill.ForEach(odds, 3, func(x int) error { + fmt.Println(x * x) + return nil + }) +} + +// --- Function examples --- + +func ExampleAll() { numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Reduce the channel to a single value by summing all numbers - sum, ok, err := rill.Reduce(numbers, 3, func(a, b int) (int, error) { - return a + b, nil + // Are all numbers even? + // Concurrency = 3 + ok, err := rill.All(numbers, 3, func(x int) (bool, error) { + return x%2 == 0, nil + }) + + fmt.Println("Result:", ok) + fmt.Println("Error:", err) +} + +func ExampleAny() { + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Is there at least one even number? + // Concurrency = 3 + ok, err := rill.Any(numbers, 3, func(x int) (bool, error) { + return x%2 == 0, nil }) - if err != nil { - fmt.Println("Error:", err) - return - } - fmt.Println("Sum:", sum) - fmt.Println("OK:", ok) + fmt.Println("Result: ", ok) + fmt.Println("Error: ", err) +} + +// Also check out the package level examples to see Batch in action +func ExampleBatch() { + // New number is emitted every 50ms + numbers := make(chan rill.Try[int]) + go func() { + defer close(numbers) + for i := 0; i < 50; i++ { + numbers <- rill.Wrap(i, nil) + time.Sleep(50 * time.Millisecond) + } + }() + + // Group numbers into batches of up to 5 + batches := rill.Batch(numbers, 5, 1*time.Second) + + printStream(batches) +} + +func ExampleCatch() { + strs := rill.FromSlice([]string{"1", "2", "3", "4", "5", "not a number 6", "7", "8", "9", "10"}, nil) + + // Convert strings to ints + // Concurrency = 3; Unordered + ids := rill.Map(strs, 3, func(s string) (int, error) { + randomSleep(1000 * time.Millisecond) // simulate some additional work + return strconv.Atoi(s) + }) + + // Catch and ignore number parsing errors + // Concurrency = 2; Unordered + ids = rill.Catch(ids, 2, func(err error) error { + if errors.Is(err, strconv.ErrSyntax) { + return nil // Ignore this error + } + return err + }) + + // No error will be printed + printStream(ids) +} + +// The same example as for the [Catch], but using ordered versions of functions. +func ExampleOrderedCatch() { + strs := rill.FromSlice([]string{"1", "2", "3", "4", "5", "not a number 6", "7", "8", "9", "10"}, nil) + + // Convert strings to ints + // Concurrency = 3; Unordered + ids := rill.OrderedMap(strs, 3, func(s string) (int, error) { + randomSleep(1000 * time.Millisecond) // simulate some additional work + return strconv.Atoi(s) + }) + + // Catch and ignore number parsing errors + // Concurrency = 2; Unordered + ids = rill.OrderedCatch(ids, 2, func(err error) error { + if errors.Is(err, strconv.ErrSyntax) { + return nil // Ignore this error + } + return err + }) + + // No error will be printed + printStream(ids) } -// This example demonstrates how to use the combination of [Map], [Merge] and Err to catch errors from multiple concurrent -// tasks of different types, such as saving users and companies. -// [ForEach] can't be used here because there are two input channels. -// Those channels can't be merged directly because their types do not match. -// A practical solution is to perform side-effect-only processing using Map, -// then merge the results and use Err to catch the first error that occurs. func ExampleErr() { + ctx := context.Background() + users := rill.FromSlice([]*User{ {ID: 1, Username: "foo"}, {ID: 2, Username: "bar"}, @@ -210,184 +368,312 @@ func ExampleErr() { {ID: 6, Username: "quux"}, }, nil) - companies := rill.FromSlice([]*Company{ - {ID: 1, Name: "Company 1"}, - {ID: 2, Name: "Company 2"}, - }, nil) - // Save users. Use struct{} as a result type - userResults := rill.Map(users, 2, func(user *User) (struct{}, error) { - return struct{}{}, saveUser(user) + // Concurrency = 2; Unordered + results := rill.Map(users, 2, func(user *User) (struct{}, error) { + return struct{}{}, saveUser(ctx, user) }) - // Save users. Use struct{} as a result type - companyResults := rill.Map(companies, 3, func(company *Company) (struct{}, error) { - return struct{}{}, saveCompany(company) + // We're interested only in side effects and errors from + // the pipeline above + err := rill.Err(results) + fmt.Println("Error:", err) +} + +func ExampleFilter() { + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Keep only even numbers + // Concurrency = 3; Unordered + evens := rill.Filter(numbers, 3, func(x int) (bool, error) { + randomSleep(1000 * time.Millisecond) // simulate some additional work + return x%2 == 0, nil }) - // Merge results - allResults := rill.Merge(userResults, companyResults) + printStream(evens) +} - // Use Err to wait until everything is saved and get the first error - err := rill.Err(allResults) - fmt.Println("Error:", err) +// The same example as for the [Filter], but using ordered versions of functions. +func ExampleOrderedFilter() { + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Keep only even numbers + // Concurrency = 3; Ordered + evens := rill.OrderedFilter(numbers, 3, func(x int) (bool, error) { + randomSleep(1000 * time.Millisecond) // simulate some additional work + return x%2 == 0, nil + }) + + printStream(evens) } -// This example demonstrates how to use the combination of First and [OrderedFilter] functions -// to find the first number divisible by 4 in a channel of numbers. func ExampleFirst() { numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + // Keep only the numbers divisible by 4 + // Concurrency = 3; Ordered dvisibleBy4 := rill.OrderedFilter(numbers, 3, func(x int) (bool, error) { return x%4 == 0, nil }) + // Get the first number divisible by 4 first, ok, err := rill.First(dvisibleBy4) - if err != nil { - fmt.Println("Error:", err) - return - } fmt.Println("Result:", first, ok) + fmt.Println("Error:", err) } -// This example demonstrates using the Any function to check if there is an even number in a channel. -// The function exits immediately after the first even number is found -func ExampleAny() { +func ExampleFlatMap() { + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5}, nil) + + // Replace each number with three strings + // Concurrency = 3; Unordered + result := rill.FlatMap(numbers, 3, func(x int) <-chan rill.Try[string] { + randomSleep(1000 * time.Millisecond) // simulate some additional work + + return rill.FromSlice([]string{ + fmt.Sprintf("foo%d", x), + fmt.Sprintf("bar%d", x), + fmt.Sprintf("baz%d", x), + }, nil) + }) + + printStream(result) +} + +// The same example as for the [FlatMap], but using ordered versions of functions. +func ExampleOrderedFlatMap() { + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5}, nil) + + // Replace each number with three strings + // Concurrency = 3; Ordered + result := rill.OrderedFlatMap(numbers, 3, func(x int) <-chan rill.Try[string] { + randomSleep(1000 * time.Millisecond) // simulate some additional work + + return rill.FromSlice([]string{ + fmt.Sprintf("foo%d", x), + fmt.Sprintf("bar%d", x), + fmt.Sprintf("baz%d", x), + }, nil) + }) + + printStream(result) +} + +func ExampleForEach() { numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - ok, err := rill.Any(numbers, 3, func(x int) (bool, error) { - if x == 10 { - return true, fmt.Errorf("some error") - } + // Square and print each number + // Concurrency = 3; Unordered + err := rill.ForEach(numbers, 3, func(x int) error { + randomSleep(1000 * time.Millisecond) // simulate some additional work - return x%2 == 0, nil + y := x * x + fmt.Println(y) + return nil }) - if err != nil { - fmt.Println("Error:", err) - return - } + fmt.Println("Error:", err) +} - fmt.Printf("Has even number: %t\n", ok) +// There is no ordered version of the ForEach function. To achieve ordered processing, use concurrency set to 1. +// If you need a concurrent and ordered ForEach, then do all processing with the [OrderedMap], +// and then use ForEach with concurrency set to 1 at the final stage. +func ExampleForEach_ordered() { + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Square each number. + // Concurrency = 3; Ordered + squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) { + randomSleep(1000 * time.Millisecond) // simulate some additional work + return x * x, nil + }) + + // Print results. + // Concurrency = 1; Ordered + err := rill.ForEach(squares, 1, func(y int) error { + fmt.Println(y) + return nil + }) + fmt.Println("Error:", err) } -// This example demonstrates using the All function to check if all numbers in a channel are even. -// The function exits immediately after the first odd number is found -func ExampleAll() { +func ExampleMap() { numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - ok, err := rill.All(numbers, 3, func(x int) (bool, error) { - return x%2 == 0, nil + // Square each number. + // Concurrency = 3; Unordered + squares := rill.Map(numbers, 3, func(x int) (int, error) { + randomSleep(1000 * time.Millisecond) // simulate some additional work + return x * x, nil }) - if err != nil { - fmt.Println("Error:", err) - return - } + printStream(squares) +} - fmt.Printf("All numbers are even: %t\n", ok) +// The same example as for the [Map], but using ordered versions of functions. +func ExampleOrderedMap() { + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Square each number. + // Concurrency = 3; Ordered + squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) { + randomSleep(1000 * time.Millisecond) // simulate some additional work + return x * x, nil + }) + + printStream(squares) } -// This example demonstrates how MapReduce can be used to count how many times each word appears in a stream. -// Mappers emit a count of '1' for each word, and reducers sum these counts to calculate the total occurrences of each word. func ExampleMapReduce() { - stream := strings.NewReader(`Early morning brings early birds to the early market. Birds sing, the market buzzes, and the morning shines.`) + var re = regexp.MustCompile(`\w+`) + text := "Early morning brings early birds to the early market. Birds sing, the market buzzes, and the morning shines." - words := streamWords(stream) + // Start with a stream of words + words := rill.FromSlice(re.FindAllString(text, -1), nil) + // Count the number of occurrences of each word mr, err := rill.MapReduce(words, + // Map phase: Use the word as key and "1" as value + // Concurrency = 3 3, func(word string) (string, int, error) { return strings.ToLower(word), 1, nil }, + // Reduce phase: Sum all "1" values for the same key + // Concurrency = 2 2, func(x, y int) (int, error) { return x + y, nil }, ) - if err != nil { - fmt.Println("Error:", err) - return - } fmt.Println("Result:", mr) + fmt.Println("Error:", err) } -// streamFileLines simulates line-by-line streaming of a file from a URL, -// introducing a randomized delay to simulate network latency. -// It's a simplified placeholder for actual network-based file streaming. -func streamFileLines(url string) <-chan rill.Try[string] { +func ExampleMerge() { + numbers1 := rill.FromSlice([]int{1, 2, 3, 4, 5}, nil) + numbers2 := rill.FromSlice([]int{6, 7, 8, 9, 10}, nil) + numbers3 := rill.FromSlice([]int{11, 12}, nil) + + numbers := rill.Merge(numbers1, numbers2, numbers3) + + printStream(numbers) +} + +func ExampleReduce() { + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Sum all numbers + sum, ok, err := rill.Reduce(numbers, 3, func(a, b int) (int, error) { + return a + b, nil + }) + + fmt.Println("Result:", sum, ok) + fmt.Println("Error:", err) +} + +func ExampleToSlice() { + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Square each number + // Concurrency = 3; Ordered + squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) { + return x * x, nil + }) + + squaresSlice, err := rill.ToSlice(squares) + + fmt.Println("Result:", squaresSlice) + fmt.Println("Error:", err) +} + +func ExampleUnbatch() { + batches := rill.FromSlice([][]int{ + {1, 2, 3}, + {4, 5}, + {6, 7, 8, 9}, + {10}, + }, nil) + + numbers := rill.Unbatch(batches) + + printStream(numbers) +} + +// --- Helpers --- + +// streamLines converts an io.Reader into a stream of lines +func streamLines(r io.ReadCloser) <-chan rill.Try[string] { out := make(chan rill.Try[string]) go func() { + defer r.Close() defer close(out) - base := filepath.Base(url) - base = strings.TrimSuffix(base, filepath.Ext(base)) - - for i := 0; i < 10; i++ { - randomSleep(20 * time.Millisecond) // Simulate a network delay - out <- rill.Wrap(fmt.Sprintf("%s:key:%d", base, i), nil) + scanner := bufio.NewScanner(r) + for scanner.Scan() { + out <- rill.Wrap(scanner.Text(), nil) + } + if err := scanner.Err(); err != nil { + out <- rill.Wrap("", err) } }() return out } -// streamWords is helper function that reads words from a reader and streams them as strings. -func streamWords(r io.Reader) <-chan rill.Try[string] { - raw := make(chan rill.Try[string], 1) +// streamWords is helper function that converts an io.Reader into a stream of words. +func streamWords(r io.ReadCloser) <-chan rill.Try[string] { + words := make(chan rill.Try[string], 1) go func() { - defer close(raw) + defer r.Close() + defer close(words) scanner := bufio.NewScanner(r) scanner.Split(bufio.ScanWords) for scanner.Scan() { word := scanner.Text() - word = strings.Trim(word, ".,;:!?&()") // it's basic and just for demonstration + word = strings.Trim(word, ".,;:!?&()") // strip all punctuation. it's basic and just for demonstration if len(word) > 0 { - raw <- rill.Wrap(word, nil) + words <- rill.Wrap(word, nil) } } if err := scanner.Err(); err != nil { - raw <- rill.Wrap("", err) + words <- rill.Wrap("", err) } }() - return raw + return words } -// kvGet simulates fetching a value form a key-value database, -// introducing a randomized delay to simulate network latency. -// It's a simplified placeholder for actual database operation. -func kvGet(key string) (string, error) { - randomSleep(1000 * time.Millisecond) // Simulate a network delay +var ErrFileNotFound = errors.New("file not found") - // Simulates that some keys are missing - if strings.HasSuffix(key, "2") || strings.HasSuffix(key, "3") { - return "", nil - } - - return strings.Replace(key, "key:", "val:", 1), nil +var files = map[string]string{ + "http://example.com/user_ids1.txt": strings.ReplaceAll("1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20", " ", "\n"), + "http://example.com/user_ids2.txt": strings.ReplaceAll("21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40", " ", "\n"), + "http://example.com/user_ids3.txt": strings.ReplaceAll("41 42 43 44 45", " ", "\n"), + "http://example.com/text1.txt": "Early morning brings early birds to the early market. Birds sing, the market buzzes, and the morning shines.", + "http://example.com/text2.txt": "The birds often sing at the market", + "http://example.com/text3.txt": "The market closes, the birds rest, and the night brings peace to the town.", } -// kvMultiGet simulates a batch read from a key-value database, -// introducing a randomized delay to simulate network latency. -// It's a simplified placeholder for actual database operation. -func kvMultiGet(keys ...string) ([]string, error) { - randomSleep(1000 * time.Millisecond) // Simulate a network delay - - values := make([]string, len(keys)) - for i, key := range keys { - // Simulates that some keys are missing - if strings.HasSuffix(key, "2") || strings.HasSuffix(key, "3") { - values[i] = "" - continue - } - - values[i] = strings.Replace(key, "key:", "val:", 1) +// downloadFile simulates downloading a file from a URL. +// Returns a reader for the file content. +func downloadFile(ctx context.Context, url string) (io.ReadCloser, error) { + content, ok := files[url] + if !ok { + return nil, ErrFileNotFound } - return values, nil + // In a real-world scenario, this would be an HTTP request depending on the ctx. + return io.NopCloser(strings.NewReader(content)), nil +} + +// Helper function that simulates sending a message through a server +func sendMessage(message string, server string) error { + randomSleep(1000 * time.Millisecond) // simulate some additional work + fmt.Printf("Sent through %s: %s\n", server, message) + return nil } // getTemperature simulates fetching a temperature reading for a city and date, @@ -395,36 +681,79 @@ func getTemperature(city string, date time.Time) (float64, error) { randomSleep(1000 * time.Millisecond) // Simulate a network delay // Basic city hash, to make measurements unique for each city - var h float64 - for _, c := range city { - h += float64(c) - } + cityHash := float64(hash(city)) // Simulate a temperature reading, by retuning a pseudo-random, but deterministic value - temp := 15 - 10*math.Sin(h+float64(date.Unix())) + temp := 15 - 10*math.Sin(cityHash+float64(date.Unix())) return temp, nil } -func randomSleep(max time.Duration) { - time.Sleep(time.Duration(rand.Intn(int(max)))) +func infiniteNumberStream(ctx context.Context) <-chan rill.Try[int] { + out := make(chan rill.Try[int]) + go func() { + defer fmt.Println("infiniteNumberStream terminated") + defer close(out) + for i := 1; ; i++ { + if err := ctx.Err(); err != nil { + return // This can be rewritten as select, but it's not necessary + } + out <- rill.Wrap(i, nil) + time.Sleep(100 * time.Millisecond) + } + }() + return out } -// getUser simulates fetching a user from an API, introducing a randomized delay to simulate network latency. -func getUser(id int) (User, error) { +var adjs = []string{"big", "small", "fast", "slow", "smart", "happy", "sad", "funny", "serious", "angry"} +var nouns = []string{"dog", "cat", "bird", "fish", "mouse", "elephant", "lion", "tiger", "bear", "wolf"} + +// getUsers simulates fetching multiple users from an API. +// User fields are pseudo-random, but deterministic based on the user ID. +func getUsers(ctx context.Context, ids ...int) ([]*User, error) { randomSleep(1000 * time.Millisecond) // Simulate a network delay - // generate random name adj+noun - adj := []string{"big", "small", "fast", "slow", "smart", "happy", "sad", "funny", "serious"} - noun := []string{"dog", "cat", "bird", "fish", "mouse", "elephant", "lion", "tiger", "bear", "wolf"} - username := fmt.Sprintf("%s_%s", adj[rand.Intn(len(adj))], noun[rand.Intn(len(noun))]) + users := make([]*User, 0, len(ids)) + for _, id := range ids { + if err := ctx.Err(); err != nil { + return nil, err + } + + user := User{ + ID: id, + Username: adjs[hash(id, "adj")%len(adjs)] + "_" + nouns[hash(id, "noun")%len(nouns)], // adj + noun + IsActive: hash(id, "active")%100 < 60, // 60% + } - return User{ID: id, Username: username}, nil + users = append(users, &user) + } + return users, nil } -func saveUser(user *User) error { +var ErrUserNotFound = errors.New("user not found") + +// getUser simulates fetching a user from an API. +func getUser(ctx context.Context, id int) (*User, error) { + users, err := getUsers(ctx, id) + if err != nil { + return nil, err + } + + if len(users) == 0 { + return nil, ErrUserNotFound + } + + return users[0], nil +} + +// saveUser simulates saving a user through an API. +func saveUser(ctx context.Context, user *User) error { randomSleep(1000 * time.Millisecond) // Simulate a network delay + if err := ctx.Err(); err != nil { + return err + } + if user.Username == "" { return fmt.Errorf("empty username") } @@ -433,13 +762,23 @@ func saveUser(user *User) error { return nil } -func saveCompany(comp *Company) error { - randomSleep(1000 * time.Millisecond) // Simulate a network delay +// printStream prints all items from a stream (one per line) and an error if any. +func printStream[A any](stream <-chan rill.Try[A]) { + fmt.Println("Result:") + err := rill.ForEach(stream, 1, func(x A) error { + fmt.Printf("%+v\n", x) + return nil + }) + fmt.Println("Error:", err) +} - if comp.Name == "" { - return fmt.Errorf("empty name") - } +func randomSleep(max time.Duration) { + time.Sleep(time.Duration(rand.Intn(int(max)))) +} - fmt.Printf("Company saved: %+v\n", comp) - return nil +// hash is a simple hash function that returns an integer hash for a given input. +func hash(input ...any) int { + hasher := fnv.New32() + fmt.Fprintln(hasher, input...) + return int(hasher.Sum32()) } diff --git a/merge.go b/merge.go index 1ae7035..c886715 100644 --- a/merge.go +++ b/merge.go @@ -6,18 +6,25 @@ import ( "github.com/destel/rill/internal/core" ) -// Merge combines multiple input channels into a single output channel. Items are emitted as soon as they're available, -// so the output order is not defined. +// Merge performs a fan-in operation on the list of input channels, returning a single output channel. +// The resulting channel will contain all items from all inputs, +// and will be closed when all inputs are fully consumed. +// +// This is a non-blocking function that processes items from each input sequentially. +// +// See the package documentation for more information on non-blocking functions and error handling. func Merge[A any](ins ...<-chan A) <-chan A { return core.Merge(ins...) } -// Split2 divides the input channel into two output channels based on the discriminator function f, using n goroutines for concurrency. -// The function f takes an item from the input and decides which output channel (outTrue or outFalse) it should go to by returning a boolean. -// If an error is encountered, either from the function f itself or from upstream it is intentionally sent -// to one of the output channels in a non-deterministic manner. -// The output order is not guaranteed: results are written to the outputs as soon as they're ready. -// Use [OrderedSplit2] to preserve the input order. +// Split2 divides the input stream into two output streams based on the predicate function f: +// The splitting behavior is determined by the boolean return value of f. When f returns true, the item is sent to the outTrue stream, +// otherwise it is sent to the outFalse stream. In case of any error, the item is sent to one of the output streams in a non-deterministic way. +// +// This is a non-blocking unordered function that processes items concurrently using n goroutines. +// An ordered version of this function, [OrderedSplit2], is also available. +// +// See the package documentation for more information on non-blocking unordered functions and error handling. func Split2[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (outTrue <-chan Try[A], outFalse <-chan Try[A]) { outs := core.MapAndSplit(in, 2, n, func(a Try[A]) (Try[A], int) { if a.Error != nil { @@ -38,7 +45,7 @@ func Split2[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (outTrue <- return outs[0], outs[1] } -// OrderedSplit2 is similar to [Split2], but it guarantees that the order of the outputs matches the order of the input. +// OrderedSplit2 is the ordered version of [Split2]. func OrderedSplit2[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (outTrue <-chan Try[A], outFalse <-chan Try[A]) { outs := core.OrderedMapAndSplit(in, 2, n, func(a Try[A]) (Try[A], int) { if a.Error != nil { diff --git a/reduce.go b/reduce.go index f55691e..1c6cd16 100644 --- a/reduce.go +++ b/reduce.go @@ -6,21 +6,21 @@ import ( "github.com/destel/rill/internal/core" ) -// Reduce combines all elements from the input channel into a single value -// using a binary function f. The function f must be commutative, meaning -// f(x,y) == f(y,x). It is applied to pairs of elements, using n -// goroutines, progressively reducing the channel's contents until only one value remains. -// The order in which the function f is applied is not guaranteed due to concurrent processing. +// Reduce combines all items from the input stream into a single value using a binary function f. +// The function f is called for pairs of items, progressively reducing the stream contents until only one value remains. // -// Reduce blocks until all items are processed or an error is encountered, -// either from the function f itself or from the upstream. In case of an error -// leading to early termination, Reduce ensures the input channel is drained to -// avoid goroutine leaks, making it safe for use in environments where cleanup -// is crucial. +// As an unordered function, Reduce can apply f to any pair of items in any order, which requires f to be: +// - Associative: f(a, f(b, c)) == f(f(a, b), c) +// - Commutative: f(a, b) == f(b, a) // -// The function returns the first encountered error, if any, or the reduction result. -// The second return value is false if the input channel is empty, and true otherwise. -func Reduce[A any](in <-chan Try[A], n int, f func(A, A) (A, error)) (A, bool, error) { +// The hasResult return flag is set to false if the stream was empty, otherwise it is set to true. +// +// Reduce is a blocking unordered function that processes items concurrently using n goroutines. +// The case when n = 1 is optimized: it does not spawn additional goroutines and processes items sequentially, +// making the function ordered. This also removes the need for the function f to be commutative. +// +// See the package documentation for more information on blocking unordered functions and error handling. +func Reduce[A any](in <-chan Try[A], n int, f func(A, A) (A, error)) (result A, hasResult bool, err error) { in, earlyExit := core.Breakable(in) res, ok := core.Reduce(in, n, func(a1, a2 Try[A]) Try[A] { @@ -46,24 +46,19 @@ func Reduce[A any](in <-chan Try[A], n int, f func(A, A) (A, error)) (A, bool, e return res.Value, ok, res.Error } -// MapReduce reduces the input channel to a map using a mapper and a reducer functions. -// Reduction is done in two phases, both occurring concurrently. In the first phase, -// the mapper function transforms each input item into a key-value pair. -// As a result of this phase, we can get multiple values for the same key, so -// in the second phase, the reducer function reduces values for the same key into a single value. -// The order in which the reducer is applied is not guaranteed due to concurrent processing. -// See [Reduce] documentation for more details on reduction phase semantics. +// MapReduce transforms the input stream into a Go map using a mapper and a reducer functions. +// The transformation is performed in two concurrent phases. // -// The number of concurrent mappers and reducers can be controlled using nm and nr parameters respectively. +// - The mapper function transforms each input item into a key-value pair. +// - The reducer function reduces values for the same key into a single value. +// This phase has the same semantics as the [Reduce] function, in particular +// the reducer function must be commutative and associative. // -// MapReduce blocks until all items are processed or an error is encountered, -// either from the mapper, reducer, or the upstream. In case of an error -// leading to early termination, MapReduce ensures the input channel is drained -// to avoid goroutine leaks, making it safe for use in environments where -// cleanup is crucial. +// MapReduce is a blocking unordered function that processes items concurrently using nm and nr goroutines +// for the mapper and reducer functions respectively. Setting nr = 1 will make the reduce phase sequential and ordered, +// see [Reduce] for more information. // -// The function returns the first encountered error, if any, or a map where -// each key is associated with a single reduced value +// See the package documentation for more information on blocking unordered functions and error handling. func MapReduce[A any, K comparable, V any](in <-chan Try[A], nm int, mapper func(A) (K, V, error), nr int, reducer func(V, V) (V, error)) (map[K]V, error) { var zeroKey K var zeroVal V diff --git a/transform.go b/transform.go index 907f82a..a06984a 100644 --- a/transform.go +++ b/transform.go @@ -4,10 +4,13 @@ import ( "github.com/destel/rill/internal/core" ) -// Map applies a transformation function to each item in an input channel, using n goroutines for concurrency. -// If an error is encountered, either from the function f itself or from upstream it is forwarded to the output for further handling. -// The output order is not guaranteed: results are written to the output as soon as they're ready. -// Use [OrderedMap] to preserve the input order. +// Map takes a stream of items of type A and transforms them into items of type B using a function f. +// Returns a new stream of transformed items. +// +// This is a non-blocking unordered function that processes items concurrently using n goroutines. +// An ordered version of this function, [OrderedMap], is also available. +// +// See the package documentation for more information on non-blocking unordered functions and error handling. func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] { return core.MapOrFilter(in, n, func(a Try[A]) (Try[B], bool) { if a.Error != nil { @@ -23,7 +26,7 @@ func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] }) } -// OrderedMap is similar to [Map], but it guarantees that the output order is the same as the input order. +// OrderedMap is the ordered version of [Map]. func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] { return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[B], bool) { if a.Error != nil { @@ -39,10 +42,13 @@ func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan }) } -// Filter removes items that do not meet a specified condition, using n goroutines for concurrency. -// If an error is encountered, either from the function f itself or from upstream it is forwarded to the output for further handling. -// The output order is not guaranteed: results are written to the output as soon as they're ready. -// Use [OrderedFilter] to preserve the input order. +// Filter takes a stream of items of type A and filters them using a predicate function f. +// Returns a new stream of items that passed the filter. +// +// This is a non-blocking unordered function that processes items concurrently using n goroutines. +// An ordered version of this function, [OrderedFilter], is also available. +// +// See the package documentation for more information on non-blocking unordered functions and error handling. func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] { return core.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { if a.Error != nil { @@ -58,7 +64,7 @@ func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[ }) } -// OrderedFilter is similar to [Filter], but it guarantees that the output order is the same as the input order. +// OrderedFilter is the ordered version of [Filter]. func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] { return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { if a.Error != nil { @@ -74,10 +80,13 @@ func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-ch }) } -// FlatMap applies a function to each item in an input channel, where the function returns a channel of items. -// These items are then flattened into a single output channel using n goroutines for concurrency. -// The output order is not guaranteed: results are written to the output as soon as they're ready. -// Use [OrderedFlatMap] to preserve the input order. +// FlatMap takes a stream of items of type A and transforms each item into a new sub-stream of items of type B using a function f. +// Those sub-streams are then flattened into a single output stream, which is returned. +// +// This is a non-blocking unordered function that processes items concurrently using n goroutines. +// An ordered version of this function, [OrderedFlatMap], is also available. +// +// See the package documentation for more information on non-blocking unordered functions and error handling. func FlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] { if in == nil { return nil @@ -100,7 +109,7 @@ func FlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan return out } -// OrderedFlatMap is similar to [FlatMap], but it guarantees that the output order is the same as the input order. +// OrderedFlatMap is the ordered version of [FlatMap]. func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] { if in == nil { return nil @@ -125,10 +134,17 @@ func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) return out } -// Catch allows handling errors from the input channel using n goroutines for concurrency. -// When f returns nil, error is considered handled and filtered out; otherwise it is replaced by the result of f. -// The output order is not guaranteed: results are written to the output as soon as they're ready. -// Use [OrderedCatch] to preserve the input order. +// Catch allows handling errors in the middle of a stream processing pipeline. +// Every error encountered in the input stream is passed to the function f for handling. +// +// The outcome depends on the return value of f: +// - If f returns nil, the error is considered handled and filtered out from the output stream. +// - If f returns a non-nil error, the original error is replaced with the result of f. +// +// This is a non-blocking unordered function that handles errors concurrently using n goroutines. +// An ordered version of this function, [OrderedCatch], is also available. +// +// See the package documentation for more information on non-blocking unordered functions and error handling. func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] { return core.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { if a.Error == nil { @@ -144,7 +160,7 @@ func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] { }) } -// OrderedCatch is similar to [Catch], but it guarantees that the output order is the same as the input order. +// OrderedCatch is the ordered version of [Catch]. func OrderedCatch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] { return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { if a.Error == nil { diff --git a/util.go b/util.go index f508459..b21b691 100644 --- a/util.go +++ b/util.go @@ -2,23 +2,27 @@ package rill import "github.com/destel/rill/internal/core" -// Drain consumes and discards all items from an input channel, blocking until the channel is closed +// Drain consumes and discards all items from an input channel, blocking until the channel is closed. func Drain[A any](in <-chan A) { core.Drain(in) } -// DrainNB is a non-blocking version of [Drain]. +// DrainNB is a non-blocking version of [Drain]. Is does draining in a separate goroutine. func DrainNB[A any](in <-chan A) { core.DrainNB(in) } // Buffer takes a channel of items and returns a buffered channel of exact same items in the same order. -// This is useful when you want to write to the input channel without blocking the writer. +// This can be useful for preventing write operations on the input channel from blocking, especially if subsequent stages +// in the processing pipeline are slow. +// Buffering allows up to n items to be held in memory before back pressure is applied to the upstream producer. // -// Typical use case would look like +// Typical usage of Buffer might look like this: // -// ids = Buffer(ids, 100) -// // Now up to 100 ids can be buffered if subsequent stages of the pipeline are slow +// users := getUsers(ctx, companyID) +// users = rill.Buffer(users, 100) +// // Now work with the users channel as usual. +// // Up to 100 users can be buffered if subsequent stages of the pipeline are slow. func Buffer[A any](in <-chan A, n int) <-chan A { return core.Buffer(in, n) } diff --git a/wrap.go b/wrap.go index 3451cf9..9447a99 100644 --- a/wrap.go +++ b/wrap.go @@ -1,6 +1,6 @@ package rill -// Try is a container for a value or an error +// Try is a container holding a value of type A or an error type Try[A any] struct { Value A Error error @@ -8,12 +8,20 @@ type Try[A any] struct { // Wrap converts a value and/or error into a [Try] container. // It's a convenience function to avoid creating a [Try] container manually and benefit from type inference. +// +// Such function signature also allows concise wrapping of functions that return a value and an error: +// +// item := rill.Wrap(strconv.ParseInt("42")) func Wrap[A any](value A, err error) Try[A] { return Try[A]{Value: value, Error: err} } -// FromSlice converts a slice into a channel of [Try] containers. -// If err is not nil function returns a single [Try] container with the error. +// FromSlice converts a slice into a stream. +// If err is not nil function returns a stream with a single error. +// +// Such function signature allows concise wrapping of functions that return a slice and an error: +// +// stream := rill.FromSlice(someFunc()) func FromSlice[A any](slice []A, err error) <-chan Try[A] { if err != nil { out := make(chan Try[A], 1) @@ -30,9 +38,10 @@ func FromSlice[A any](slice []A, err error) <-chan Try[A] { return out } -// ToSlice converts a channel of [Try] containers into a slice of values and an error. -// It's an inverse of [FromSlice]. The function blocks until the whole channel is processed or an error is encountered. -// In case of an error leading to early termination, ToSlice ensures the input channel is drained to avoid goroutine leaks. +// ToSlice converts an input stream into a slice. +// +// This is a blocking ordered function that processes items sequentially. +// See the package documentation for more information on blocking ordered functions and error handling. func ToSlice[A any](in <-chan Try[A]) ([]A, error) { var res []A @@ -47,9 +56,13 @@ func ToSlice[A any](in <-chan Try[A]) ([]A, error) { return res, nil } -// FromChan converts a regular channel into a channel of values wrapped in a [Try] container. -// Additionally, this function can take an error, that will be added to the output channel alongside the values. -// If both values and error are nil, the function returns nil. +// FromChan converts a regular channel into a stream. +// Additionally, this function can take an error, that will be added to the output stream alongside the values. +// Either argument can be nil, in which case it is ignored. If both arguments are nil, the function returns nil. +// +// Such function signature allows concise wrapping of functions that return a channel and an error: +// +// stream := rill.FromChan(someFunc()) func FromChan[A any](values <-chan A, err error) <-chan Try[A] { if values == nil && err == nil { return nil @@ -72,10 +85,14 @@ func FromChan[A any](values <-chan A, err error) <-chan Try[A] { return out } -// FromChans converts a regular channel into a channel of values wrapped in a [Try] container. +// FromChans converts a regular channel into a stream. // Additionally, this function can take a channel of errors, which will be added to -// the output channel alongside the values. -// If both values and errors are nil, the function returns nil. +// the output stream alongside the values. +// Either argument can be nil, in which case it is ignored. If both arguments are nil, the function returns nil. +// +// Such function signature allows concise wrapping of functions that return two channels: +// +// stream := rill.FromChans(someFunc()) func FromChans[A any](values <-chan A, errs <-chan error) <-chan Try[A] { if values == nil && errs == nil { return nil @@ -115,7 +132,7 @@ func FromChans[A any](values <-chan A, errs <-chan error) <-chan Try[A] { return out } -// ToChans splits a channel of [Try] containers into a channel of values and a channel of errors. +// ToChans splits an input stream into two channels: one for values and one for errors. // It's an inverse of [FromChans]. Returns two nil channels if the input is nil. func ToChans[A any](in <-chan Try[A]) (<-chan A, <-chan error) { if in == nil {