Skip to content

Commit

Permalink
fixup! Fixes on the batcher
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Jan 21, 2025
1 parent e8b8446 commit a45e904
Showing 1 changed file with 7 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pkg/runner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package runner

import (
"sync"
"sync/atomic"
"time"

Expand All @@ -17,6 +18,7 @@ type batcher[I any, O any] struct {
idx uint32
inputs []chan I
outputs []chan O
locks []sync.Mutex
len uint32
executor ExecuteFunc[I, O]
timeout time.Duration
Expand All @@ -25,14 +27,17 @@ type batcher[I any, O any] struct {
func newBatcher[I any, O any](executor func([]I) []O, capacity int, timeout time.Duration) *batcher[I, O] {
inputs := make([]chan I, capacity)
outputs := make([]chan O, capacity)
locks := make([]sync.Mutex, capacity)
for i := 0; i < capacity; i++ {
inputs[i] = make(chan I)
outputs[i] = make(chan O)
locks[i] = sync.Mutex{}
}

e := &batcher[I, O]{
inputs: inputs,
outputs: outputs,
locks: locks,
len: uint32(capacity),
executor: executor,
timeout: timeout,
Expand Down Expand Up @@ -87,6 +92,8 @@ func (r *batcher[I, O]) start() {

func (r *batcher[I, O]) call(input I) O {
idx := atomic.AddUint32(&r.idx, 1) - 1
r.locks[idx%r.len].Lock()
defer r.locks[idx%r.len].Unlock()
r.inputs[idx%r.len] <- input
logger.Debugf("Enqueued input [%d] and waiting for result", idx)
defer logger.Debugf("Return result of output [%d]", idx)
Expand Down

0 comments on commit a45e904

Please sign in to comment.