From 0681b59b78ce4298ebad9f2ac126e9ceea4f50b5 Mon Sep 17 00:00:00 2001 From: Jack Chistyakov Date: Sun, 19 Jan 2025 19:16:28 -0800 Subject: [PATCH] Fix send on closed channel panic (#140) --- internal/common/common.go | 8 -------- rx/mono/block_subscriber.go | 37 ++++++++++++++++++------------------- rx/mono/utils.go | 4 +--- 3 files changed, 19 insertions(+), 30 deletions(-) diff --git a/internal/common/common.go b/internal/common/common.go index bffbe80..bb79b62 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -35,14 +35,6 @@ func CloneBytes(b []byte) []byte { return clone } -func SafeCloseDoneChan(c chan<- struct{}) (ok bool) { - defer func() { - ok = recover() == nil - }() - close(c) - return -} - func ToMilliseconds(duration time.Duration) int64 { return int64(duration) / 1e6 } diff --git a/rx/mono/block_subscriber.go b/rx/mono/block_subscriber.go index 0a19d25..9c4dd94 100644 --- a/rx/mono/block_subscriber.go +++ b/rx/mono/block_subscriber.go @@ -6,12 +6,15 @@ import ( "github.com/jjeffcaii/reactor-go" "github.com/rsocket/rsocket-go/internal/common" "github.com/rsocket/rsocket-go/payload" + "go.uber.org/atomic" ) type blockSubscriber struct { - done chan struct{} - vchan chan<- payload.Payload - echan chan<- error + // Atomic bool to ensure that 'done' is closed only once. + isDone *atomic.Bool + done chan struct{} + vchan chan<- payload.Payload + echan chan<- error } func newBlockSubscriber( @@ -20,34 +23,30 @@ func newBlockSubscriber( echan chan<- error, ) reactor.Subscriber { return blockSubscriber{ - done: done, - vchan: vchan, - echan: echan, + isDone: atomic.NewBool(false), + done: done, + vchan: vchan, + echan: echan, } } func (b blockSubscriber) OnComplete() { - select { - case <-b.done: - default: - _ = common.SafeCloseDoneChan(b.done) + swapped := b.isDone.CAS(false, true) + if swapped { + close(b.done) } } func (b blockSubscriber) OnError(err error) { - select { - case <-b.done: - default: - if common.SafeCloseDoneChan(b.done) { - b.echan <- err - } + swapped := b.isDone.CAS(false, true) + if swapped { + b.echan <- err + close(b.done) } } func (b blockSubscriber) OnNext(any reactor.Any) { - select { - case <-b.done: - default: + if !b.isDone.Load() { if r, ok := any.(common.Releasable); ok { r.IncRef() } diff --git a/rx/mono/utils.go b/rx/mono/utils.go index b6800ff..35269da 100644 --- a/rx/mono/utils.go +++ b/rx/mono/utils.go @@ -165,13 +165,11 @@ func toBlock(ctx context.Context, m mono.Mono) (payload.Payload, error) { done := make(chan struct{}) vchan := make(chan payload.Payload, 1) echan := make(chan error, 1) + // 'blockSubscriber' takes ownership of the above channels (w.r.t. closing them) b := newBlockSubscriber(done, vchan, echan) m.SubscribeWith(ctx, b) <-done - defer close(vchan) - defer close(echan) - select { case value := <-vchan: return value, nil