From 8fe051c227862db360476edf406e1e67d9bacfd8 Mon Sep 17 00:00:00 2001 From: Alexandros Filios Date: Mon, 20 Jan 2025 15:28:45 +0100 Subject: [PATCH] fixup! Signed-off-by: Alexandros Filios --- .../fabric/core/generic/delivery/delivery.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/platform/fabric/core/generic/delivery/delivery.go b/platform/fabric/core/generic/delivery/delivery.go index 30576fc61..53913298a 100644 --- a/platform/fabric/core/generic/delivery/delivery.go +++ b/platform/fabric/core/generic/delivery/delivery.go @@ -81,7 +81,7 @@ type Delivery struct { tracer trace.Tracer lastBlockReceived uint64 bufferSize int - stop chan bool + stop chan struct{} } func New( @@ -120,7 +120,7 @@ func New( callback: callback, vault: vault, bufferSize: max(bufferSize, 1), - stop: make(chan bool), + stop: make(chan struct{}), } return d, nil } @@ -133,7 +133,7 @@ func (d *Delivery) Start(ctx context.Context) { } func (d *Delivery) Stop() { - d.stop <- true + close(d.stop) } var ctr = atomic.Uint32{} @@ -143,20 +143,21 @@ func (d *Delivery) Run(ctx context.Context) error { if ctx == nil { ctx = context.Background() } - ctx, cancel := context.WithCancel(ctx) ch := make(chan blockResponse, d.bufferSize) - go d.readBlocks(ch, cancel) + go d.readBlocks(ch) return d.runReceiver(ctx, ch) } -func (d *Delivery) readBlocks(ch <-chan blockResponse, cancel func()) { +func (d *Delivery) readBlocks(ch <-chan blockResponse) { for { select { case b := <-ch: logger.Debugf("Invoking callback for block [%d]", b.block.Header) - if stop, err := d.callback(b.ctx, b.block); err != nil { + stop, err := d.callback(b.ctx, b.block) + if err != nil { logger.Errorf("callback errored for block %d: %v", b.block.Header.Number, err) - } else if stop { + } + if stop { logger.Infof("Stopping delivery at block [%d]", b.block.Header.Number) close(d.stop) return