Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chann: support to consume all data after Close the channel. #5

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions chann.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,10 @@ func (ch *Chann[T]) unboundedTerminate() {
ch.q = append(ch.q, e)
}
for len(ch.q) > 0 {
select {
case ch.out <- ch.q[0]:
// The default branch exists because we need guarantee
// the loop can terminate. If there is a receiver, the
// first case will ways be selected. See #3.
default:
}
// Note if receiver doesn't consume all data that has been sent to input
// channel, the `unboundedProcessing` goroutine will leak forever.
// Ref: https://github.com/golang-design/chann/issues/3
ch.out <- ch.q[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you documented, if there is no receiver, we will leak a goroutine, and it seems not ideal. Isn't it? Or at least do we need to inform the users in the documentation?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @amyangfei can you take a look at this comment? I want to use this library soon

And as for

Considering the channel behavior in standard library, if user doesn't receive data from a buffered channel, data in the buffered channel is still kept in memory and has memory leak too, even if the channel is closed.
GC should be able to recycle the unreachable objects when we use built-in channel

Or maybe we can add another function NewUnbounded[T]() (*Chann[T], func()) that returns a cleanup function? like context's cancel function, it will remind the user that don't forget to invoke the function.

ch.q[0] = nilT // de-reference earlier to help GC
ch.q = ch.q[1:]
}
Expand Down
33 changes: 27 additions & 6 deletions chann_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ func TestNonblockRecvRace(t *testing.T) {
for i := 0; i < n; i++ {
c := chann.New[int](chann.Cap(1))
c.In() <- 1
t.Log(i)
go func() {
select {
case <-c.Out():
Expand Down Expand Up @@ -433,11 +432,8 @@ func TestUnboundedChannClose(t *testing.T) {
}
ch.Close()

// Theoretically, this is not a dead loop. If the channel
// is closed, then this loop must terminate at somepoint.
// If not, we will meet timeout in the test.
for !chann.IsClosed(ch) {
t.Log("unbounded channel is still not entirely closed")
if chann.IsClosed(ch) {
t.Fatal("channel should not be closed if data is not consumed")
}
})

Expand Down Expand Up @@ -492,6 +488,31 @@ func TestUnboundedChannClose(t *testing.T) {
t.Fatalf("After close, not all elements are received, got %v, want %v", n, N)
}
})

// ref: https://github.com/golang-design/chann/issues/3#issuecomment-1150189421
t.Run("consume-data-after-close", func(t *testing.T) {
var wg sync.WaitGroup
ch := chann.New[int]()

wg.Add(1)
c := 0
go func() {
for range ch.Out() {
c++
}
wg.Done()
}()

for i := 0; i < 2048; i++ {
ch.In() <- 42
}
ch.Close()

wg.Wait()
if c != 2048 {
t.Fatalf("not all elements are received after channel being closed, want %v got %v", 2048, c)
}
})
}

func BenchmarkUnboundedChann(b *testing.B) {
Expand Down