diff --git a/pkg/runner/batch.go b/pkg/runner/batch.go index 5f9d251db..7462a003a 100644 --- a/pkg/runner/batch.go +++ b/pkg/runner/batch.go @@ -11,32 +11,16 @@ import ( "sync/atomic" "time" - "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging" "github.com/pkg/errors" ) -var logger = flogging.MustGetLogger("batch-executor") - -type BatchExecutor[I any, O any] interface { - Execute(input I) (O, error) -} - -type BatchRunner[V any] interface { - Run(v V) error -} - -type Output[O any] struct { - Val O - Err error -} - type batcher[I any, O any] struct { idx uint32 inputs []chan I outputs []chan O locks []sync.Mutex len uint32 - executor func([]I) []O + executor ExecuteFunc[I, O] timeout time.Duration } @@ -120,7 +104,7 @@ type batchExecutor[I any, O any] struct { *batcher[I, Output[O]] } -func NewBatchExecutor[I any, O any](executor func([]I) []Output[O], capacity int, timeout time.Duration) BatchExecutor[I, O] { +func NewBatchExecutor[I any, O any](executor ExecuteFunc[I, Output[O]], capacity int, timeout time.Duration) BatchExecutor[I, O] { return &batchExecutor[I, O]{batcher: newBatcher(executor, capacity, timeout)} } @@ -133,10 +117,6 @@ type batchRunner[V any] struct { *batcher[V, error] } -func NewSerialRunner[V any](runner func([]V) []error) BatchRunner[V] { - return NewBatchRunner(runner, 1, 1*time.Hour) -} - func NewBatchRunner[V any](runner func([]V) []error, capacity int, timeout time.Duration) BatchRunner[V] { return &batchRunner[V]{batcher: newBatcher(runner, capacity, timeout)} } diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go new file mode 100644 index 000000000..a1ec4a082 --- /dev/null +++ b/pkg/runner/runner.go @@ -0,0 +1,26 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package runner + +import "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging" + +var logger = flogging.MustGetLogger("batch-executor") + +type BatchExecutor[I any, O any] interface { + Execute(input I) (O, error) +} + +type BatchRunner[V any] interface { + Run(v V) error +} + +type Output[O any] struct { + Val O + Err error +} + +type ExecuteFunc[I any, O any] func([]I) []O diff --git a/pkg/runner/serial.go b/pkg/runner/serial.go new file mode 100644 index 000000000..02dfb57de --- /dev/null +++ b/pkg/runner/serial.go @@ -0,0 +1,32 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package runner + +func NewSerialRunner[V any](runner ExecuteFunc[V, error]) BatchRunner[V] { + return &serialRunner[V]{executor: runner} +} + +type serialRunner[V any] struct { + executor ExecuteFunc[V, error] +} + +func (r *serialRunner[V]) Run(val V) error { + return r.executor([]V{val})[0] +} + +func NewSerialExecutor[I any, O any](executor ExecuteFunc[I, Output[O]]) BatchExecutor[I, O] { + return &serialExecutor[I, O]{executor: executor} +} + +type serialExecutor[I any, O any] struct { + executor ExecuteFunc[I, Output[O]] +} + +func (r *serialExecutor[I, O]) Execute(input I) (O, error) { + res := r.executor([]I{input})[0] + return res.Val, res.Err +}