diff --git a/chann.go b/chann.go index fc0140e..5b11bcf 100644 --- a/chann.go +++ b/chann.go @@ -207,13 +207,10 @@ func (ch *Chann[T]) unboundedTerminate() { ch.q = append(ch.q, e) } for len(ch.q) > 0 { - select { - case ch.out <- ch.q[0]: - // The default branch exists because we need guarantee - // the loop can terminate. If there is a receiver, the - // first case will ways be selected. See #3. - default: - } + // Note if receiver doesn't consume all data that has been sent to input + // channel, the `unboundedProcessing` goroutine will leak forever. + // Ref: https://github.com/golang-design/chann/issues/3 + ch.out <- ch.q[0] ch.q[0] = nilT // de-reference earlier to help GC ch.q = ch.q[1:] } diff --git a/chann_test.go b/chann_test.go index 75af718..a355faa 100644 --- a/chann_test.go +++ b/chann_test.go @@ -207,7 +207,6 @@ func TestNonblockRecvRace(t *testing.T) { for i := 0; i < n; i++ { c := chann.New[int](chann.Cap(1)) c.In() <- 1 - t.Log(i) go func() { select { case <-c.Out(): @@ -433,11 +432,8 @@ func TestUnboundedChannClose(t *testing.T) { } ch.Close() - // Theoretically, this is not a dead loop. If the channel - // is closed, then this loop must terminate at somepoint. - // If not, we will meet timeout in the test. - for !chann.IsClosed(ch) { - t.Log("unbounded channel is still not entirely closed") + if chann.IsClosed(ch) { + t.Fatal("channel should not be closed if data is not consumed") } }) @@ -492,6 +488,31 @@ func TestUnboundedChannClose(t *testing.T) { t.Fatalf("After close, not all elements are received, got %v, want %v", n, N) } }) + + // ref: https://github.com/golang-design/chann/issues/3#issuecomment-1150189421 + t.Run("consume-data-after-close", func(t *testing.T) { + var wg sync.WaitGroup + ch := chann.New[int]() + + wg.Add(1) + c := 0 + go func() { + for range ch.Out() { + c++ + } + wg.Done() + }() + + for i := 0; i < 2048; i++ { + ch.In() <- 42 + } + ch.Close() + + wg.Wait() + if c != 2048 { + t.Fatalf("not all elements are received after channel being closed, want %v got %v", 2048, c) + } + }) } func BenchmarkUnboundedChann(b *testing.B) {