Skip to content

Commit

Permalink
Fix send on closed channel panic (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
echistyakov authored Jan 20, 2025
1 parent 099cb5b commit 0681b59
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 30 deletions.
8 changes: 0 additions & 8 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@ func CloneBytes(b []byte) []byte {
return clone
}

func SafeCloseDoneChan(c chan<- struct{}) (ok bool) {
defer func() {
ok = recover() == nil
}()
close(c)
return
}

func ToMilliseconds(duration time.Duration) int64 {
return int64(duration) / 1e6
}
37 changes: 18 additions & 19 deletions rx/mono/block_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (
"github.com/jjeffcaii/reactor-go"
"github.com/rsocket/rsocket-go/internal/common"
"github.com/rsocket/rsocket-go/payload"
"go.uber.org/atomic"
)

type blockSubscriber struct {
done chan struct{}
vchan chan<- payload.Payload
echan chan<- error
// Atomic bool to ensure that 'done' is closed only once.
isDone *atomic.Bool
done chan struct{}
vchan chan<- payload.Payload
echan chan<- error
}

func newBlockSubscriber(
Expand All @@ -20,34 +23,30 @@ func newBlockSubscriber(
echan chan<- error,
) reactor.Subscriber {
return blockSubscriber{
done: done,
vchan: vchan,
echan: echan,
isDone: atomic.NewBool(false),
done: done,
vchan: vchan,
echan: echan,
}
}

func (b blockSubscriber) OnComplete() {
select {
case <-b.done:
default:
_ = common.SafeCloseDoneChan(b.done)
swapped := b.isDone.CAS(false, true)
if swapped {
close(b.done)
}
}

func (b blockSubscriber) OnError(err error) {
select {
case <-b.done:
default:
if common.SafeCloseDoneChan(b.done) {
b.echan <- err
}
swapped := b.isDone.CAS(false, true)
if swapped {
b.echan <- err
close(b.done)
}
}

func (b blockSubscriber) OnNext(any reactor.Any) {
select {
case <-b.done:
default:
if !b.isDone.Load() {
if r, ok := any.(common.Releasable); ok {
r.IncRef()
}
Expand Down
4 changes: 1 addition & 3 deletions rx/mono/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,11 @@ func toBlock(ctx context.Context, m mono.Mono) (payload.Payload, error) {
done := make(chan struct{})
vchan := make(chan payload.Payload, 1)
echan := make(chan error, 1)
// 'blockSubscriber' takes ownership of the above channels (w.r.t. closing them)
b := newBlockSubscriber(done, vchan, echan)
m.SubscribeWith(ctx, b)
<-done

defer close(vchan)
defer close(echan)

select {
case value := <-vchan:
return value, nil
Expand Down

0 comments on commit 0681b59

Please sign in to comment.