From bdb1148e9264ce2dc2f32195787e36a3ee05d524 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Fri, 6 Jan 2023 17:54:26 +0800 Subject: [PATCH 1/3] chann: support to consume all data after Close the channel. --- chann.go | 13 +++++++------ chann_test.go | 33 +++++++++++++++++++++++++++------ 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/chann.go b/chann.go index fc0140e..f21c4de 100644 --- a/chann.go +++ b/chann.go @@ -79,6 +79,7 @@ func Cap(n int) Opt { // one, and use Cap to configure the capacity of the channel. type Chann[T any] struct { in, out chan T + backlog chan T close chan struct{} cfg *config q []T @@ -129,6 +130,7 @@ func New[T any](opts ...Opt) *Chann[T] { case unbounded: ch.in = make(chan T, 16) ch.out = make(chan T, 16) + ch.backlog = make(chan T, 1024) go ch.unboundedProcessing() } return ch @@ -208,14 +210,13 @@ func (ch *Chann[T]) unboundedTerminate() { } for len(ch.q) > 0 { select { + // Note if receiver doesn't consume all data that has been sent to input + // channel, the `unboundedProcessing` goroutine will leak forever. + // Ref: https://github.com/golang-design/chann/issues/3 case ch.out <- ch.q[0]: - // The default branch exists because we need guarantee - // the loop can terminate. If there is a receiver, the - // first case will ways be selected. See #3. - default: + ch.q[0] = nilT // de-reference earlier to help GC + ch.q = ch.q[1:] } - ch.q[0] = nilT // de-reference earlier to help GC - ch.q = ch.q[1:] } close(ch.out) close(ch.close) diff --git a/chann_test.go b/chann_test.go index 75af718..a355faa 100644 --- a/chann_test.go +++ b/chann_test.go @@ -207,7 +207,6 @@ func TestNonblockRecvRace(t *testing.T) { for i := 0; i < n; i++ { c := chann.New[int](chann.Cap(1)) c.In() <- 1 - t.Log(i) go func() { select { case <-c.Out(): @@ -433,11 +432,8 @@ func TestUnboundedChannClose(t *testing.T) { } ch.Close() - // Theoretically, this is not a dead loop. If the channel - // is closed, then this loop must terminate at somepoint. - // If not, we will meet timeout in the test. - for !chann.IsClosed(ch) { - t.Log("unbounded channel is still not entirely closed") + if chann.IsClosed(ch) { + t.Fatal("channel should not be closed if data is not consumed") } }) @@ -492,6 +488,31 @@ func TestUnboundedChannClose(t *testing.T) { t.Fatalf("After close, not all elements are received, got %v, want %v", n, N) } }) + + // ref: https://github.com/golang-design/chann/issues/3#issuecomment-1150189421 + t.Run("consume-data-after-close", func(t *testing.T) { + var wg sync.WaitGroup + ch := chann.New[int]() + + wg.Add(1) + c := 0 + go func() { + for range ch.Out() { + c++ + } + wg.Done() + }() + + for i := 0; i < 2048; i++ { + ch.In() <- 42 + } + ch.Close() + + wg.Wait() + if c != 2048 { + t.Fatalf("not all elements are received after channel being closed, want %v got %v", 2048, c) + } + }) } func BenchmarkUnboundedChann(b *testing.B) { From 4d0857f004eeb0e589a4ed8e4e04fb91bf4b5b77 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 10 Jan 2023 10:01:39 +0800 Subject: [PATCH 2/3] address comment, remove backlog --- chann.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/chann.go b/chann.go index f21c4de..fbdd43c 100644 --- a/chann.go +++ b/chann.go @@ -79,7 +79,6 @@ func Cap(n int) Opt { // one, and use Cap to configure the capacity of the channel. type Chann[T any] struct { in, out chan T - backlog chan T close chan struct{} cfg *config q []T @@ -130,7 +129,6 @@ func New[T any](opts ...Opt) *Chann[T] { case unbounded: ch.in = make(chan T, 16) ch.out = make(chan T, 16) - ch.backlog = make(chan T, 1024) go ch.unboundedProcessing() } return ch From bca7d685828c2f100048dbb6a0083c55458ec242 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 10 Jan 2023 10:10:18 +0800 Subject: [PATCH 3/3] simplify code --- chann.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/chann.go b/chann.go index fbdd43c..5b11bcf 100644 --- a/chann.go +++ b/chann.go @@ -207,14 +207,12 @@ func (ch *Chann[T]) unboundedTerminate() { ch.q = append(ch.q, e) } for len(ch.q) > 0 { - select { // Note if receiver doesn't consume all data that has been sent to input // channel, the `unboundedProcessing` goroutine will leak forever. // Ref: https://github.com/golang-design/chann/issues/3 - case ch.out <- ch.q[0]: - ch.q[0] = nilT // de-reference earlier to help GC - ch.q = ch.q[1:] - } + ch.out <- ch.q[0] + ch.q[0] = nilT // de-reference earlier to help GC + ch.q = ch.q[1:] } close(ch.out) close(ch.close)