diff --git a/pkg/runner/batch.go b/pkg/runner/batch.go index a75cdddad..7462a003a 100644 --- a/pkg/runner/batch.go +++ b/pkg/runner/batch.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package runner import ( + "sync" "sync/atomic" "time" @@ -17,6 +18,7 @@ type batcher[I any, O any] struct { idx uint32 inputs []chan I outputs []chan O + locks []sync.Mutex len uint32 executor ExecuteFunc[I, O] timeout time.Duration @@ -25,14 +27,17 @@ type batcher[I any, O any] struct { func newBatcher[I any, O any](executor func([]I) []O, capacity int, timeout time.Duration) *batcher[I, O] { inputs := make([]chan I, capacity) outputs := make([]chan O, capacity) + locks := make([]sync.Mutex, capacity) for i := 0; i < capacity; i++ { inputs[i] = make(chan I) outputs[i] = make(chan O) + locks[i] = sync.Mutex{} } e := &batcher[I, O]{ inputs: inputs, outputs: outputs, + locks: locks, len: uint32(capacity), executor: executor, timeout: timeout, @@ -87,6 +92,8 @@ func (r *batcher[I, O]) start() { func (r *batcher[I, O]) call(input I) O { idx := atomic.AddUint32(&r.idx, 1) - 1 + r.locks[idx%r.len].Lock() + defer r.locks[idx%r.len].Unlock() r.inputs[idx%r.len] <- input logger.Debugf("Enqueued input [%d] and waiting for result", idx) defer logger.Debugf("Return result of output [%d]", idx)