Skip to content

Commit

Permalink
dspinner: RecursiveKeys(): do not hang on cancellations (#727)
Browse files Browse the repository at this point in the history
Per ipfs/kubo#10593, if no one
is reading from the channel returned by RecursiveKeys() and the context is cancelled, streamIndex will hang indefinitely.

Proposed fix is to always select when attempting to write to the `out` channel. If the context is done and there is no one to read, we can abort.

Co-authored-by: Andrew Gillis <[email protected]>
  • Loading branch information
hsanjuan and gammazero authored Nov 26, 2024
1 parent 91c4d50 commit 8673560
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ The following emojis are used to highlight certain changes:

### Fixed

- `unixfs/hamt` Log error instead of panic if both link and shard are nil [#393](https://github.com/ipfs/boxo/pull/393)
- `unixfs/hamt`: Log error instead of panic if both link and shard are nil [#393](https://github.com/ipfs/boxo/pull/393)
- `pinner/dspinner`: do not hang when listing keys and the `out` channel is no longer read [#727](https://github.com/ipfs/boxo/pull/727)

### Security

Expand Down
19 changes: 13 additions & 6 deletions pinning/pinner/dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,19 +707,27 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile
defer p.lock.RUnlock()

cidSet := cid.NewSet()
send := func(sp ipfspinner.StreamedPin) (ok bool) {
select {
case <-ctx.Done():
return false
case out <- sp:
return true
}
}

err := index.ForEach(ctx, "", func(key, value string) bool {
c, err := cid.Cast([]byte(key))
if err != nil {
out <- ipfspinner.StreamedPin{Err: err}
send(ipfspinner.StreamedPin{Err: err})
return false
}

var pin ipfspinner.Pinned
if detailed {
pp, err := p.loadPin(ctx, value)
if err != nil {
out <- ipfspinner.StreamedPin{Err: err}
send(ipfspinner.StreamedPin{Err: err})
return false
}

Expand All @@ -731,17 +739,16 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile
}

if !cidSet.Has(c) {
select {
case <-ctx.Done():
if !send(ipfspinner.StreamedPin{Pin: pin}) {
return false
case out <- ipfspinner.StreamedPin{Pin: pin}:
}
cidSet.Add(c)
}
return true
})
if err != nil {
out <- ipfspinner.StreamedPin{Err: err}
send(ipfspinner.StreamedPin{Err: err})
return
}
}()

Expand Down

0 comments on commit 8673560

Please sign in to comment.