Skip to content

Commit

Permalink
Fixes on the batcher
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Jan 21, 2025
1 parent 7dc4d95 commit a0708a0
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 16 deletions.
7 changes: 0 additions & 7 deletions pkg/runner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
package runner

import (
"sync"
"sync/atomic"
"time"

Expand All @@ -18,7 +17,6 @@ type batcher[I any, O any] struct {
idx uint32
inputs []chan I
outputs []chan O
locks []sync.Mutex
len uint32
executor ExecuteFunc[I, O]
timeout time.Duration
Expand All @@ -27,17 +25,14 @@ type batcher[I any, O any] struct {
func newBatcher[I any, O any](executor func([]I) []O, capacity int, timeout time.Duration) *batcher[I, O] {
inputs := make([]chan I, capacity)
outputs := make([]chan O, capacity)
locks := make([]sync.Mutex, capacity)
for i := 0; i < capacity; i++ {
inputs[i] = make(chan I)
outputs[i] = make(chan O)
locks[i] = sync.Mutex{}
}

e := &batcher[I, O]{
inputs: inputs,
outputs: outputs,
locks: locks,
len: uint32(capacity),
executor: executor,
timeout: timeout,
Expand Down Expand Up @@ -92,8 +87,6 @@ func (r *batcher[I, O]) start() {

func (r *batcher[I, O]) call(input I) O {
idx := atomic.AddUint32(&r.idx, 1) - 1
r.locks[idx%r.len].Lock()
defer r.locks[idx%r.len].Unlock()
r.inputs[idx%r.len] <- input
logger.Debugf("Enqueued input [%d] and waiting for result", idx)
defer logger.Debugf("Return result of output [%d]", idx)
Expand Down
16 changes: 7 additions & 9 deletions pkg/runner/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,27 @@ import (
"github.com/stretchr/testify/assert"
)

var counter uint32

func TestBatchRunner(t *testing.T) {
atomic.StoreUint32(&counter, 0)
ctr := &atomic.Uint32{}
runner, m, locksObtained := newBatchRunner()

run(t, runner, 1000)
run(t, ctr, runner, 1000)
assert.Len(t, m, 1000)
assert.Equal(t, "val_10", m["key_10"])
assert.Equal(t, 10, int(atomic.LoadUint32(locksObtained)))
}

func TestBatchRunnerFewRequests(t *testing.T) {
atomic.StoreUint32(&counter, 0)
ctr := &atomic.Uint32{}
runner, m, locksObtained := newBatchRunner()

run(t, runner, 1)
run(t, ctr, runner, 1)

assert.Len(t, m, 1)
assert.Equal(t, "val_1", m["key_1"])
assert.Equal(t, 1, int(atomic.LoadUint32(locksObtained)))

run(t, runner, 3)
run(t, ctr, runner, 3)
assert.Len(t, m, 4)
assert.Equal(t, 2, int(atomic.LoadUint32(locksObtained)))
}
Expand All @@ -64,11 +62,11 @@ func newBatchRunner() (BatchRunner[int], map[string]string, *uint32) {
return runner, m, &locksObtained
}

func run(t *testing.T, runner BatchRunner[int], times int) {
func run(t *testing.T, ctr *atomic.Uint32, runner BatchRunner[int], times int) {
var wg sync.WaitGroup
wg.Add(times)
for i := 0; i < times; i++ {
v := int(atomic.AddUint32(&counter, 1))
v := int(ctr.Add(1))
go func() {
defer wg.Done()
err := runner.Run(v)
Expand Down

0 comments on commit a0708a0

Please sign in to comment.