diff --git a/platform/common/utils/batch/batcher.go b/platform/common/utils/batch/batcher.go new file mode 100644 index 000000000..a970688dc --- /dev/null +++ b/platform/common/utils/batch/batcher.go @@ -0,0 +1,153 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package batch + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging" + "github.com/pkg/errors" +) + +var logger = logging.MustGetLogger("batch-executor") + +type Config struct { + Capacity int + Timeout time.Duration +} + +type Output[O any] struct { + Val O + Err error +} + +type batcher[I any, O any] struct { + idx uint32 + inputs []chan I + outputs []chan O + locks []sync.Mutex + len uint32 + executor func([]I) []O + timeout time.Duration +} + +func newBatcher[I any, O any](executor func([]I) []O, c Config) *batcher[I, O] { + if c.Capacity <= 0 { + logger.Warnf("Capacity not set. Defaulting to 1: Each invocation will trigger a batch execution.") + c.Capacity = 1 + } + if c.Timeout <= 0 { + logger.Warnf("Timeout not set. Batches will only be cut based on size.") + } + inputs := make([]chan I, c.Capacity) + outputs := make([]chan O, c.Capacity) + locks := make([]sync.Mutex, c.Capacity) + for i := 0; i < c.Capacity; i++ { + inputs[i] = make(chan I) + outputs[i] = make(chan O) + locks[i] = sync.Mutex{} + } + + e := &batcher[I, O]{ + inputs: inputs, + outputs: outputs, + locks: locks, + len: uint32(c.Capacity), + executor: executor, + timeout: c.Timeout, + } + go e.start() + return e +} + +func (r *batcher[I, O]) start() { + var inputs []I + t := newTicker(r.timeout) + firstIdx := uint32(0) // Points to the first element of a new cycle + for { + // If we fill a whole cycle, the elements will be from firstIdx % r.len to lastIdx % r.len + var lastIdx uint32 + var lastElement I + select { + case lastElement = <-r.inputs[(firstIdx+r.len-1)%r.len]: + lastIdx = firstIdx + r.len + logger.Debugf("Execute because %d input channels are full", r.len) + case <-t.C(): + lastIdx = atomic.LoadUint32(&r.idx) + if lastIdx == firstIdx { + logger.Debugf("No new elements. Skip execution...") + continue + } + lastElement = <-r.inputs[(lastIdx-1)%r.len] // We read the lastElement here just to avoid code repetition + logger.Debugf("Execute because timeout of %v passed", r.timeout) + } + logger.Debugf("Read batch range [%d,%d)", firstIdx, lastIdx) + + inputs = make([]I, lastIdx-firstIdx) + for i := uint32(0); i < lastIdx-firstIdx-1; i++ { + inputs[i] = <-r.inputs[(i+firstIdx)%r.len] + } + inputs[lastIdx-firstIdx-1] = lastElement + t.Reset(r.timeout) + + logger.Debugf("Start execution for %d inputs", len(inputs)) + outs := r.executor(inputs) + logger.Debugf("Execution finished with %d outputs", len(outs)) + if len(inputs) != len(outs) { + panic(errors.Errorf("expected %d outputs, but got %d", len(inputs), len(outs))) + } + for i, err := range outs { + r.outputs[(firstIdx+uint32(i))%r.len] <- err + } + logger.Debugf("Results distributed for range [%d,%d)", firstIdx, lastIdx) + firstIdx = lastIdx + } +} + +type ticker interface { + C() <-chan time.Time + Reset(time.Duration) + Stop() +} + +func newTicker(timeout time.Duration) ticker { + if timeout <= 0 { + return newDummyTicker() + } + return newWrapperTicker(timeout) +} + +func newWrapperTicker(duration time.Duration) *wrapperTicker { + return &wrapperTicker{t: time.NewTicker(duration)} +} + +type wrapperTicker struct{ t *time.Ticker } + +func (t *wrapperTicker) C() <-chan time.Time { return t.t.C } +func (t *wrapperTicker) Reset(d time.Duration) { t.t.Reset(d) } +func (t *wrapperTicker) Stop() { t.t.Stop() } + +func newDummyTicker() *dummyTicker { + return &dummyTicker{ch: make(chan time.Time)} +} + +type dummyTicker struct{ ch <-chan time.Time } + +func (t *dummyTicker) C() <-chan time.Time { return t.ch } +func (t *dummyTicker) Reset(time.Duration) {} +func (t *dummyTicker) Stop() {} + +func (r *batcher[I, O]) call(input I) O { + idx := atomic.AddUint32(&r.idx, 1) - 1 + r.locks[idx%r.len].Lock() + defer r.locks[idx%r.len].Unlock() + r.inputs[idx%r.len] <- input + logger.Debugf("Enqueued input [%d] and waiting for result", idx) + defer logger.Debugf("Return result of output [%d]", idx) + return <-r.outputs[idx%r.len] +} diff --git a/platform/common/utils/batch/executor.go b/platform/common/utils/batch/executor.go new file mode 100644 index 000000000..187726258 --- /dev/null +++ b/platform/common/utils/batch/executor.go @@ -0,0 +1,24 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package batch + +// Executor executes a function that returns a result when a batch-cut condition is met +type Executor[I any, O any] interface { + Execute(input I) (O, error) +} + +type batchExecutor[I any, O any] struct { + *batcher[I, Output[O]] +} + +func NewExecutor[I any, O any](executor func([]I) []Output[O], c Config) Executor[I, O] { + return &batchExecutor[I, O]{batcher: newBatcher(executor, c)} +} + +func (r *batchExecutor[I, O]) Execute(input I) (O, error) { + o := r.batcher.call(input) + return o.Val, o.Err +} diff --git a/platform/common/utils/batch/runner.go b/platform/common/utils/batch/runner.go new file mode 100644 index 000000000..2986732fa --- /dev/null +++ b/platform/common/utils/batch/runner.go @@ -0,0 +1,23 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package batch + +// Runner executes a function that does not return an output when the batch cut condition is met +type Runner[V any] interface { + Run(v V) error +} + +type batchRunner[V any] struct { + *batcher[V, error] +} + +func NewRunner[V any](runner func([]V) []error, c Config) Runner[V] { + return &batchRunner[V]{batcher: newBatcher(runner, c)} +} + +func (r *batchRunner[V]) Run(val V) error { + return r.call(val) +} diff --git a/platform/common/utils/batch/runner_test.go b/platform/common/utils/batch/runner_test.go new file mode 100644 index 000000000..66a090e36 --- /dev/null +++ b/platform/common/utils/batch/runner_test.go @@ -0,0 +1,81 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package batch + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +var counter uint32 + +func TestBatchRunner(t *testing.T) { + runner, m, locksObtained := newBatchRunner() + + run(t, runner, 1000) + assert.Len(t, m, 1000) + assert.Equal(t, "val_10", m["key_10"]) + assert.Equal(t, 10, int(atomic.LoadUint32(locksObtained))) +} + +func TestBatchRunnerFewRequests(t *testing.T) { + runner, m, locksObtained := newBatchRunner() + + run(t, runner, 1) + time.Sleep(100 * time.Millisecond) + + assert.Len(t, m, 1) + assert.Equal(t, "val_1", m["key_1"]) + assert.Equal(t, 1, int(atomic.LoadUint32(locksObtained))) + + run(t, runner, 3) + assert.Len(t, m, 4) + assert.Equal(t, 2, int(atomic.LoadUint32(locksObtained))) +} + +func newBatchRunner() (Runner[int], map[string]string, *uint32) { + var locksObtained uint32 + m := make(map[string]string) + var mu sync.RWMutex + runner := NewRunner(func(vs []int) []error { + mu.Lock() + atomic.AddUint32(&locksObtained, 1) + defer mu.Unlock() + errs := make([]error, len(vs)) + for i, v := range vs { + m[fmt.Sprintf("key_%d", v)] = fmt.Sprintf("val_%d", v) + if v%10 == 0 { + errs[i] = errors.Errorf("error_%d", v) + } + } + return errs + }, Config{Capacity: 100, Timeout: 10 * time.Millisecond}) + return runner, m, &locksObtained +} + +func run(t *testing.T, runner Runner[int], times int) { + var wg sync.WaitGroup + wg.Add(times) + for i := 0; i < times; i++ { + v := int(atomic.AddUint32(&counter, 1)) + go func() { + defer wg.Done() + err := runner.Run(v) + if v%10 == 0 { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }() + } + wg.Wait() +} diff --git a/platform/common/utils/cache/timeout.go b/platform/common/utils/cache/timeout.go index 3592d9304..820a09260 100644 --- a/platform/common/utils/cache/timeout.go +++ b/platform/common/utils/cache/timeout.go @@ -11,6 +11,9 @@ import ( "time" ) +// How often we check for evicted entries +const pollingTimeout = 1 * time.Second + // NewTimeoutCache creates a cache that keeps elements for evictionTimeout time. // An element might return even if it is marked stale. func NewTimeoutCache[K comparable, V any](evictionTimeout time.Duration, onEvict func(map[K]V)) *evictionCache[K, V] { @@ -50,7 +53,7 @@ func NewTimeoutEviction[K comparable](timeout time.Duration, evict func([]K)) *t func (e *timeoutEviction[K]) cleanup(timeout time.Duration) { logger.Infof("Launch cleanup function with eviction timeout [%v]", timeout) - for range time.Tick(1 * time.Second) { + for range time.Tick(pollingTimeout) { expiry := time.Now().Add(-timeout) logger.Debugf("Cleanup invoked: evicting everything created after [%v]", expiry) e.mu.RLock() diff --git a/platform/common/utils/collections/maps.go b/platform/common/utils/collections/maps.go index d76f82e1b..ad52328d6 100644 --- a/platform/common/utils/collections/maps.go +++ b/platform/common/utils/collections/maps.go @@ -15,6 +15,20 @@ func CopyMap[K comparable, V any](to map[K]V, from map[K]V) { } } +func MergeMaps[K comparable, V any](ms ...map[K]V) map[K]V { + size := 0 + for _, m := range ms { + size += len(m) + } + merged := make(map[K]V, size) + for _, m := range ms { + for k, v := range m { + merged[k] = v + } + } + return merged +} + func InverseMap[K comparable, V comparable](in map[K]V) map[V]K { out := make(map[V]K, len(in)) for k, v := range in { diff --git a/platform/fabric/core/generic/finality/listenermanager.go b/platform/fabric/core/generic/finality/listenermanager.go index 057256a71..25493dcc3 100644 --- a/platform/fabric/core/generic/finality/listenermanager.go +++ b/platform/fabric/core/generic/finality/listenermanager.go @@ -12,12 +12,14 @@ import ( "time" driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/batch" "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/cache" "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric" "github.com/hyperledger/fabric-protos-go/common" "github.com/pkg/errors" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" ) @@ -53,11 +55,18 @@ type DeliveryListenerManagerConfig struct { } func NewListenerManager[T TxInfo](config DeliveryListenerManagerConfig, delivery *fabric.Delivery, tracer trace.Tracer, mapper TxInfoMapper[T]) (*listenerManager[T], error) { + fetchBatchedTxs := batch.NewRunner(func(vs []map[driver2.TxID][]ListenerEntry[T]) []error { + fetchTxs(collections.MergeMaps(vs...), mapper, delivery) + return nil + }, batch.Config{Capacity: 10, Timeout: 10 * time.Second}) var listeners cache.Map[driver2.TxID, []ListenerEntry[T]] if config.ListenerTimeout > 0 { listeners = cache.NewTimeoutCache[driver2.TxID, []ListenerEntry[T]](config.ListenerTimeout, func(evicted map[driver2.TxID][]ListenerEntry[T]) { - logger.Debugf("Listeners for TXs [%v] timed out. Either the TX finality is too slow or it reached finality too long ago and were evicted from the txInfos cache. The IDs will be queried directly from ledger...", collections.Keys(evicted)) - fetchTxs(evicted, mapper, delivery) + logger.Debugf("Listeners for TXs [%v] timed out. Either the TX finality is too slow or it reached finality too long ago and were evicted from the txInfos cache. The IDs will be queried directly from ledger (when the batch is cut)...", collections.Keys(evicted)) + err := fetchBatchedTxs.Run(evicted) + if err != nil { + logger.Errorf("Failed fetching txs directly from ledger: %v", err) + } }) } else { listeners = cache.NewMapCache[driver2.TxID, []ListenerEntry[T]]() @@ -87,31 +96,33 @@ func NewListenerManager[T TxInfo](config DeliveryListenerManagerConfig, delivery } func fetchTxs[T TxInfo](evicted map[driver2.TxID][]ListenerEntry[T], mapper TxInfoMapper[T], delivery *fabric.Delivery) { - for txID, listeners := range evicted { - go func(txID driver2.TxID, listeners []ListenerEntry[T]) { - logger.Debugf("Launching routine to scan for tx [%s]", txID) - err := delivery.Scan(context.TODO(), txID, func(tx *fabric.ProcessedTransaction) (bool, error) { - if tx.TxID() != txID { - return false, nil - } - logger.Debugf("Received result for tx [%s, %v, %d]...", tx.TxID(), tx.ValidationCode(), len(tx.Results())) - infos, err := mapper.MapProcessedTx(tx) - if err != nil { - logger.Errorf("failed mapping tx [%s]: %v", tx.TxID(), err) - return true, err - } - for _, info := range infos { - for _, listener := range listeners { - go listener.OnStatus(context.TODO(), info) - } - } - return true, nil - }) + go func() { + if logger.IsEnabledFor(zapcore.DebugLevel) { + logger.Debugf("Launching routine to scan for txs [%v]", collections.Keys(evicted)) + } + err := delivery.Scan(context.TODO(), "", func(tx *fabric.ProcessedTransaction) (bool, error) { + listeners, ok := evicted[tx.TxID()] + if !ok { + return false, nil + } + logger.Debugf("Received result for tx [%s, %v, %d]...", tx.TxID(), tx.ValidationCode(), len(tx.Results())) + infos, err := mapper.MapProcessedTx(tx) if err != nil { - logger.Errorf("Failed scanning for tx [%s]: %v", txID, err) + logger.Errorf("failed mapping tx [%s]: %v", tx.TxID(), err) + return true, err } - }(txID, listeners) - } + for _, info := range infos { + for _, listener := range listeners { + go listener.OnStatus(context.TODO(), info) + } + } + delete(evicted, tx.TxID()) + return len(evicted) == 0, nil + }) + if err != nil { + logger.Errorf("Failed scanning: %v", err) + } + }() } func (m *listenerManager[T]) start() {