Skip to content

Commit

Permalink
fixup!
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Jan 20, 2025
1 parent b5f29d2 commit 8fe051c
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions platform/fabric/core/generic/delivery/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Delivery struct {
tracer trace.Tracer
lastBlockReceived uint64
bufferSize int
stop chan bool
stop chan struct{}
}

func New(
Expand Down Expand Up @@ -120,7 +120,7 @@ func New(
callback: callback,
vault: vault,
bufferSize: max(bufferSize, 1),
stop: make(chan bool),
stop: make(chan struct{}),
}
return d, nil
}
Expand All @@ -133,7 +133,7 @@ func (d *Delivery) Start(ctx context.Context) {
}

func (d *Delivery) Stop() {
d.stop <- true
close(d.stop)
}

var ctr = atomic.Uint32{}
Expand All @@ -143,20 +143,21 @@ func (d *Delivery) Run(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}
ctx, cancel := context.WithCancel(ctx)
ch := make(chan blockResponse, d.bufferSize)
go d.readBlocks(ch, cancel)
go d.readBlocks(ch)
return d.runReceiver(ctx, ch)
}

func (d *Delivery) readBlocks(ch <-chan blockResponse, cancel func()) {
func (d *Delivery) readBlocks(ch <-chan blockResponse) {
for {
select {
case b := <-ch:
logger.Debugf("Invoking callback for block [%d]", b.block.Header)
if stop, err := d.callback(b.ctx, b.block); err != nil {
stop, err := d.callback(b.ctx, b.block)
if err != nil {
logger.Errorf("callback errored for block %d: %v", b.block.Header.Number, err)
} else if stop {
}
if stop {
logger.Infof("Stopping delivery at block [%d]", b.block.Header.Number)
close(d.stop)
return
Expand Down

0 comments on commit 8fe051c

Please sign in to comment.