Skip to content

Commit

Permalink
Batch tx fetches from ledger
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Jan 20, 2025
1 parent 8ed155a commit edc6ba0
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 26 deletions.
153 changes: 153 additions & 0 deletions platform/common/utils/batch/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package batch

import (
"sync"
"sync/atomic"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
"github.com/pkg/errors"
)

var logger = logging.MustGetLogger("batch-executor")

type Config struct {
Capacity int
Timeout time.Duration
}

type Output[O any] struct {
Val O
Err error
}

type batcher[I any, O any] struct {
idx uint32
inputs []chan I
outputs []chan O
locks []sync.Mutex
len uint32
executor func([]I) []O
timeout time.Duration
}

func newBatcher[I any, O any](executor func([]I) []O, c Config) *batcher[I, O] {
if c.Capacity <= 0 {
logger.Warnf("Capacity not set. Defaulting to 1: Each invocation will trigger a batch execution.")
c.Capacity = 1
}
if c.Timeout <= 0 {
logger.Warnf("Timeout not set. Batches will only be cut based on size.")
}
inputs := make([]chan I, c.Capacity)
outputs := make([]chan O, c.Capacity)
locks := make([]sync.Mutex, c.Capacity)
for i := 0; i < c.Capacity; i++ {
inputs[i] = make(chan I)
outputs[i] = make(chan O)
locks[i] = sync.Mutex{}
}

e := &batcher[I, O]{
inputs: inputs,
outputs: outputs,
locks: locks,
len: uint32(c.Capacity),
executor: executor,
timeout: c.Timeout,
}
go e.start()
return e
}

func (r *batcher[I, O]) start() {
var inputs []I
t := newTicker(r.timeout)
firstIdx := uint32(0) // Points to the first element of a new cycle
for {
// If we fill a whole cycle, the elements will be from firstIdx % r.len to lastIdx % r.len
var lastIdx uint32
var lastElement I
select {
case lastElement = <-r.inputs[(firstIdx+r.len-1)%r.len]:
lastIdx = firstIdx + r.len
logger.Debugf("Execute because %d input channels are full", r.len)
case <-t.C():
lastIdx = atomic.LoadUint32(&r.idx)
if lastIdx == firstIdx {
logger.Debugf("No new elements. Skip execution...")
continue
}
lastElement = <-r.inputs[(lastIdx-1)%r.len] // We read the lastElement here just to avoid code repetition
logger.Debugf("Execute because timeout of %v passed", r.timeout)
}
logger.Debugf("Read batch range [%d,%d)", firstIdx, lastIdx)

inputs = make([]I, lastIdx-firstIdx)
for i := uint32(0); i < lastIdx-firstIdx-1; i++ {
inputs[i] = <-r.inputs[(i+firstIdx)%r.len]
}
inputs[lastIdx-firstIdx-1] = lastElement
t.Reset(r.timeout)

logger.Debugf("Start execution for %d inputs", len(inputs))
outs := r.executor(inputs)
logger.Debugf("Execution finished with %d outputs", len(outs))
if len(inputs) != len(outs) {
panic(errors.Errorf("expected %d outputs, but got %d", len(inputs), len(outs)))
}
for i, err := range outs {
r.outputs[(firstIdx+uint32(i))%r.len] <- err
}
logger.Debugf("Results distributed for range [%d,%d)", firstIdx, lastIdx)
firstIdx = lastIdx
}
}

type ticker interface {
C() <-chan time.Time
Reset(time.Duration)
Stop()
}

func newTicker(timeout time.Duration) ticker {
if timeout <= 0 {
return newDummyTicker()
}
return newWrapperTicker(timeout)
}

func newWrapperTicker(duration time.Duration) *wrapperTicker {
return &wrapperTicker{t: time.NewTicker(duration)}
}

type wrapperTicker struct{ t *time.Ticker }

func (t *wrapperTicker) C() <-chan time.Time { return t.t.C }
func (t *wrapperTicker) Reset(d time.Duration) { t.t.Reset(d) }
func (t *wrapperTicker) Stop() { t.t.Stop() }

func newDummyTicker() *dummyTicker {
return &dummyTicker{ch: make(chan time.Time)}
}

type dummyTicker struct{ ch <-chan time.Time }

func (t *dummyTicker) C() <-chan time.Time { return t.ch }
func (t *dummyTicker) Reset(time.Duration) {}
func (t *dummyTicker) Stop() {}

func (r *batcher[I, O]) call(input I) O {
idx := atomic.AddUint32(&r.idx, 1) - 1
r.locks[idx%r.len].Lock()
defer r.locks[idx%r.len].Unlock()
r.inputs[idx%r.len] <- input
logger.Debugf("Enqueued input [%d] and waiting for result", idx)
defer logger.Debugf("Return result of output [%d]", idx)
return <-r.outputs[idx%r.len]
}
24 changes: 24 additions & 0 deletions platform/common/utils/batch/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package batch

// Executor executes a function that returns a result when a batch-cut condition is met
type Executor[I any, O any] interface {
Execute(input I) (O, error)
}

type batchExecutor[I any, O any] struct {
*batcher[I, Output[O]]
}

func NewExecutor[I any, O any](executor func([]I) []Output[O], c Config) Executor[I, O] {
return &batchExecutor[I, O]{batcher: newBatcher(executor, c)}
}

func (r *batchExecutor[I, O]) Execute(input I) (O, error) {
o := r.batcher.call(input)
return o.Val, o.Err
}
23 changes: 23 additions & 0 deletions platform/common/utils/batch/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package batch

// Runner executes a function that does not return an output when the batch cut condition is met
type Runner[V any] interface {
Run(v V) error
}

type batchRunner[V any] struct {
*batcher[V, error]
}

func NewRunner[V any](runner func([]V) []error, c Config) Runner[V] {
return &batchRunner[V]{batcher: newBatcher(runner, c)}
}

func (r *batchRunner[V]) Run(val V) error {
return r.call(val)
}
81 changes: 81 additions & 0 deletions platform/common/utils/batch/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package batch

import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

var counter uint32

func TestBatchRunner(t *testing.T) {
runner, m, locksObtained := newBatchRunner()

run(t, runner, 1000)
assert.Len(t, m, 1000)
assert.Equal(t, "val_10", m["key_10"])
assert.Equal(t, 10, int(atomic.LoadUint32(locksObtained)))
}

func TestBatchRunnerFewRequests(t *testing.T) {
runner, m, locksObtained := newBatchRunner()

run(t, runner, 1)
time.Sleep(100 * time.Millisecond)

assert.Len(t, m, 1)
assert.Equal(t, "val_1", m["key_1"])
assert.Equal(t, 1, int(atomic.LoadUint32(locksObtained)))

run(t, runner, 3)
assert.Len(t, m, 4)
assert.Equal(t, 2, int(atomic.LoadUint32(locksObtained)))
}

func newBatchRunner() (Runner[int], map[string]string, *uint32) {
var locksObtained uint32
m := make(map[string]string)
var mu sync.RWMutex
runner := NewRunner(func(vs []int) []error {
mu.Lock()
atomic.AddUint32(&locksObtained, 1)
defer mu.Unlock()
errs := make([]error, len(vs))
for i, v := range vs {
m[fmt.Sprintf("key_%d", v)] = fmt.Sprintf("val_%d", v)
if v%10 == 0 {
errs[i] = errors.Errorf("error_%d", v)
}
}
return errs
}, Config{Capacity: 100, Timeout: 10 * time.Millisecond})
return runner, m, &locksObtained
}

func run(t *testing.T, runner Runner[int], times int) {
var wg sync.WaitGroup
wg.Add(times)
for i := 0; i < times; i++ {
v := int(atomic.AddUint32(&counter, 1))
go func() {
defer wg.Done()
err := runner.Run(v)
if v%10 == 0 {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
}()
}
wg.Wait()
}
5 changes: 4 additions & 1 deletion platform/common/utils/cache/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"time"
)

// How often we check for evicted entries
const pollingTimeout = 1 * time.Second

// NewTimeoutCache creates a cache that keeps elements for evictionTimeout time.
// An element might return even if it is marked stale.
func NewTimeoutCache[K comparable, V any](evictionTimeout time.Duration, onEvict func(map[K]V)) *evictionCache[K, V] {
Expand Down Expand Up @@ -50,7 +53,7 @@ func NewTimeoutEviction[K comparable](timeout time.Duration, evict func([]K)) *t

func (e *timeoutEviction[K]) cleanup(timeout time.Duration) {
logger.Infof("Launch cleanup function with eviction timeout [%v]", timeout)
for range time.Tick(1 * time.Second) {
for range time.Tick(pollingTimeout) {
expiry := time.Now().Add(-timeout)
logger.Debugf("Cleanup invoked: evicting everything created after [%v]", expiry)
e.mu.RLock()
Expand Down
14 changes: 14 additions & 0 deletions platform/common/utils/collections/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@ func CopyMap[K comparable, V any](to map[K]V, from map[K]V) {
}
}

func MergeMaps[K comparable, V any](ms ...map[K]V) map[K]V {
size := 0
for _, m := range ms {
size += len(m)
}
merged := make(map[K]V, size)
for _, m := range ms {
for k, v := range m {
merged[k] = v
}
}
return merged
}

func InverseMap[K comparable, V comparable](in map[K]V) map[V]K {
out := make(map[V]K, len(in))
for k, v := range in {
Expand Down
Loading

0 comments on commit edc6ba0

Please sign in to comment.