Skip to content

Commit

Permalink
Better StreamUsers example
Browse files Browse the repository at this point in the history
  • Loading branch information
destel committed Nov 24, 2024
1 parent 0b9cd25 commit 792a1d5
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 22 deletions.
16 changes: 5 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,24 +373,20 @@ func main() {
}

// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function.
// It iterates through all listing pages and returns a stream of users.
// It iterates through all listing pages and uses [Generate] to simplify sending users and errors to the resulting stream.
// This function is useful both on its own and as part of larger pipelines.
func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] {
res := make(chan rill.Try[*mockapi.User])

if query == nil {
query = &mockapi.UserQuery{}
}

go func() {
defer close(res)

return rill.Generate(func(send func(*mockapi.User), sendErr func(error)) {
for page := 0; ; page++ {
query.Page = page

users, err := mockapi.ListUsers(ctx, query)
if err != nil {
res <- rill.Wrap[*mockapi.User](nil, err)
sendErr(err)
return
}

Expand All @@ -399,12 +395,10 @@ func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[
}

for _, user := range users {
res <- rill.Wrap(user, nil)
send(user)
}
}
}()

return res
})
}
```

Expand Down
16 changes: 5 additions & 11 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,24 +267,20 @@ func Example_flatMap() {
}

// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function.
// It iterates through all listing pages and returns a stream of users.
// It iterates through all listing pages and uses [Generate] to simplify sending users and errors to the resulting stream.
// This function is useful both on its own and as part of larger pipelines.
func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] {
res := make(chan rill.Try[*mockapi.User])

if query == nil {
query = &mockapi.UserQuery{}
}

go func() {
defer close(res)

return rill.Generate(func(send func(*mockapi.User), sendErr func(error)) {
for page := 0; ; page++ {
query.Page = page

users, err := mockapi.ListUsers(ctx, query)
if err != nil {
res <- rill.Wrap[*mockapi.User](nil, err)
sendErr(err)
return
}

Expand All @@ -293,12 +289,10 @@ func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[
}

for _, user := range users {
res <- rill.Wrap(user, nil)
send(user)
}
}
}()

return res
})
}

// This example demonstrates how to gracefully stop a pipeline on the first error.
Expand Down

0 comments on commit 792a1d5

Please sign in to comment.