Skip to content

Commit

Permalink
Refactor: simplify code and remove internal deprecations (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
destel authored Oct 1, 2024
1 parent c65b09c commit 3736822
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 83 deletions.
6 changes: 3 additions & 3 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// Batch take a stream of items and returns a stream of batches based on a maximum size and a timeout.
//
// A batch is emitted when one of the following conditions is met:
// - The batch reaches the size of n items
// - The batch reaches the maximum size
// - The time since the first item was added to the batch exceeds the timeout
// - The input stream is closed
//
Expand All @@ -19,9 +19,9 @@ import (
// This is a non-blocking ordered function that processes items sequentially.
//
// See the package documentation for more information on non-blocking ordered functions and error handling.
func Batch[A any](in <-chan Try[A], n int, timeout time.Duration) <-chan Try[[]A] {
func Batch[A any](in <-chan Try[A], size int, timeout time.Duration) <-chan Try[[]A] {
values, errs := ToChans(in)
batches := core.Batch(values, n, timeout)
batches := core.Batch(values, size, timeout)
return FromChans(batches, errs)
}

Expand Down
12 changes: 6 additions & 6 deletions internal/core/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes.
// This function never emits empty batches. The timeout countdown starts when the first item is added to a new batch.
// To emit batches only when full, set the timeout to -1. Zero timeout is not supported and will panic.
func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
func Batch[A any](in <-chan A, size int, timeout time.Duration) <-chan []A {
if in == nil {
return nil
}
Expand All @@ -27,9 +27,9 @@ func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
var batch []A
for a := range in {
batch = append(batch, a)
if len(batch) >= n {
if len(batch) >= size {
out <- batch
batch = make([]A, 0, n)
batch = make([]A, 0, size)
}
}
if len(batch) > 0 {
Expand All @@ -40,14 +40,14 @@ func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
default:
// finite timeout
go func() {
batch := make([]A, 0, n)
batch := make([]A, 0, size)
t := time.NewTicker(1 * time.Hour)
t.Stop()

flush := func() {
if len(batch) > 0 {
out <- batch
batch = make([]A, 0, n)
batch = make([]A, 0, size)
}

t.Stop()
Expand Down Expand Up @@ -81,7 +81,7 @@ func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
t.Reset(timeout)
}

if len(batch) >= n {
if len(batch) >= size {
// batch is full
flush()
}
Expand Down
42 changes: 27 additions & 15 deletions internal/core/loops_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package core

import (
"runtime"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -29,36 +28,49 @@ func TestLoop(t *testing.T) {
in := th.FromRange(0, 20)
done := make(chan struct{})

sum := int64(0)
var sum atomic.Int64

universalLoop(ord, in, done, n, func(x int, canWrite <-chan struct{}) {
<-canWrite
atomic.AddInt64(&sum, int64(x))
sum.Add(int64(x))
})

<-done
th.ExpectValue(t, sum, 19*20/2)
th.ExpectValue(t, sum.Load(), 19*20/2)
})

t.Run(th.Name("concurrency and ordering", n), func(t *testing.T) {
in := th.FromRange(0, 20000)
t.Run(th.Name("concurrency", n), func(t *testing.T) {
in := th.FromRange(0, 100)
out := make(chan int)

var inProgress th.InProgressCounter
monitor := th.NewConcurrencyMonitor(1 * time.Second)

universalLoop(ord, in, out, n, func(x int, canWrite <-chan struct{}) {
inProgress.Inc()
runtime.Gosched()
inProgress.Dec()
monitor.Inc()
defer monitor.Dec()

<-canWrite

out <- x
})

outSlice := th.ToSlice(out)
Drain(out)

th.ExpectValue(t, monitor.Max(), n)
})

th.ExpectValue(t, inProgress.Max(), n)
t.Run(th.Name("ordering", n), func(t *testing.T) {
in := th.FromRange(0, 20000)
out := make(chan int)

universalLoop(ord, in, out, n, func(x int, canWrite <-chan struct{}) {

<-canWrite

out <- x
})

outSlice := th.ToSlice(out)

if ord || n == 1 {
th.ExpectSorted(t, outSlice)
Expand All @@ -77,13 +89,13 @@ func TestForEach(t *testing.T) {
t.Run(th.Name("correctness", n), func(t *testing.T) {
in := th.FromRange(0, 20)

sum := int64(0)
var sum atomic.Int64

ForEach(in, n, func(x int) {
atomic.AddInt64(&sum, int64(x))
sum.Add(int64(x))
})

th.ExpectValue(t, sum, 19*20/2)
th.ExpectValue(t, sum.Load(), 19*20/2)
})

t.Run(th.Name("concurrency", n), func(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions internal/core/once_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ import (
func TestOnceWithWait(t *testing.T) {
t.Run("Do called once", func(t *testing.T) {
var o OnceWithWait
var calls int64
var calls atomic.Int64

th.ExpectValue(t, o.WasCalled(), false)

for i := 0; i < 5; i++ {
go func() {
o.Do(func() {
atomic.AddInt64(&calls, 1)
calls.Add(1)
})
}()
}

time.Sleep(1 * time.Second)

th.ExpectValue(t, atomic.LoadInt64(&calls), 1)
th.ExpectValue(t, calls.Load(), 1)
th.ExpectValue(t, o.WasCalled(), true)
})

Expand Down
6 changes: 3 additions & 3 deletions internal/core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ func DrainNB[A any](in <-chan A) {
go Drain(in)
}

func Buffer[A any](in <-chan A, n int) <-chan A {
// we use n-1 since 1 additional item is held on the stack (x variable)
out := make(chan A, n-1)
func Buffer[A any](in <-chan A, size int) <-chan A {
// we use size-1 since 1 additional item is held on the stack (x variable)
out := make(chan A, size-1)

go func() {
defer close(out)
Expand Down
40 changes: 0 additions & 40 deletions internal/th/in_progress.go

This file was deleted.

10 changes: 5 additions & 5 deletions reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ func TestMapReduce(t *testing.T) {
t.Run(th.Name("no errors", nm, nr), func(t *testing.T) {
in := FromChan(th.FromRange(0, 1000), nil)

var cntMap, cntReduce int64
var cntMap, cntReduce atomic.Int64
out, err := MapReduce(in,
nm, func(x int) (string, int, error) {
atomic.AddInt64(&cntMap, 1)
cntMap.Add(1)
s := fmt.Sprint(x)
return fmt.Sprintf("%d-digit", len(s)), x, nil
},
nr, func(x, y int) (int, error) {
atomic.AddInt64(&cntReduce, 1)
cntReduce.Add(1)
return x + y, nil
},
)
Expand All @@ -157,8 +157,8 @@ func TestMapReduce(t *testing.T) {
"2-digit": (10 + 99) * 90 / 2,
"3-digit": (100 + 999) * 900 / 2,
})
th.ExpectValue(t, cntMap, 1000)
th.ExpectValue(t, cntReduce, 9+89+899)
th.ExpectValue(t, cntMap.Load(), 1000)
th.ExpectValue(t, cntReduce.Load(), 9+89+899)
th.ExpectDrainedChan(t, in)
})

Expand Down
6 changes: 3 additions & 3 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ func DrainNB[A any](in <-chan A) {
// Buffer takes a channel of items and returns a buffered channel of exact same items in the same order.
// This can be useful for preventing write operations on the input channel from blocking, especially if subsequent stages
// in the processing pipeline are slow.
// Buffering allows up to n items to be held in memory before back pressure is applied to the upstream producer.
// Buffering allows up to size items to be held in memory before back pressure is applied to the upstream producer.
//
// Typical usage of Buffer might look like this:
//
// users := getUsers(ctx, companyID)
// users = rill.Buffer(users, 100)
// // Now work with the users channel as usual.
// // Up to 100 users can be buffered if subsequent stages of the pipeline are slow.
func Buffer[A any](in <-chan A, n int) <-chan A {
return core.Buffer(in, n)
func Buffer[A any](in <-chan A, size int) <-chan A {
return core.Buffer(in, size)
}
20 changes: 16 additions & 4 deletions wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,30 @@ func Wrap[A any](value A, err error) Try[A] {
//
// stream := rill.FromSlice(someFunc())
func FromSlice[A any](slice []A, err error) <-chan Try[A] {
const maxBufferSize = 512

if err != nil {
out := make(chan Try[A], 1)
out <- Try[A]{Error: err}
close(out)
return out
}

out := make(chan Try[A], len(slice))
for _, a := range slice {
out <- Try[A]{Value: a}
sendAll := func(in []A, out chan Try[A]) {
for _, a := range in {
out <- Try[A]{Value: a}
}
close(out)
}
close(out)

if len(slice) <= maxBufferSize {
out := make(chan Try[A], len(slice))
sendAll(slice, out)
return out
}

out := make(chan Try[A], maxBufferSize)
go sendAll(slice, out)
return out
}

Expand Down
15 changes: 14 additions & 1 deletion wrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,20 @@ func TestFromSlice(t *testing.T) {

t.Run("no errors", func(t *testing.T) {
inSlice := make([]int, 20)
for i := 0; i < 20; i++ {
for i := range inSlice {
inSlice[i] = i
}

in := FromSlice(inSlice, nil)
outSlice, err := ToSlice(in)

th.ExpectSlice(t, outSlice, inSlice)
th.ExpectNoError(t, err)
})

t.Run("no errors large", func(t *testing.T) {
inSlice := make([]int, 4000)
for i := range inSlice {
inSlice[i] = i
}

Expand Down

0 comments on commit 3736822

Please sign in to comment.