Skip to content

Commit

Permalink
fix: panic caused by sending data to closed channel under extreme con…
Browse files Browse the repository at this point in the history
…dition (#29)
  • Loading branch information
liushv0 authored Jan 18, 2024
1 parent 0ed4a18 commit 155ec8b
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions pkg/backends/btree/btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (b *btreeCache) Get(ctx context.Context, key, rangeEnd string, limit, revis
type watcher struct {
startRev int64
ch chan []*server.Event
closed bool
}

// item is the wrapper of user object so that we can implement the
Expand Down Expand Up @@ -282,6 +283,7 @@ func (b *btreeCache) Watch(ctx context.Context, key string, startRevision int64)
// use the current revision as the historical events will be handled at the first time.
startRev: b.currentRevision + 1,
ch: make(chan []*server.Event, 1),
closed: false,
}
if group, ok := b.watcherHub[key]; ok {
group[w] = struct{}{}
Expand Down Expand Up @@ -342,6 +344,7 @@ func (b *btreeCache) removeWatcher(ctx context.Context, key string, w *watcher)
b.watcherHub[key] = group
}
close(w.ch)
w.closed = true
log.Debug("removed a watcher",
zap.String("key", key),
)
Expand Down Expand Up @@ -436,8 +439,12 @@ func (b *btreeCache) sendEvents(ctx context.Context) {
}
if len(filtered) > 0 {
go func(w *watcher) {
// TODO we may deep-copy events if users want to modify them.
w.ch <- filtered
b.RLock()
defer b.RUnlock()
if !w.closed {
// TODO we may deep-copy events if users want to modify them.
w.ch <- filtered
}
}(w)
}
}
Expand Down

0 comments on commit 155ec8b

Please sign in to comment.