-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimple.go
123 lines (105 loc) · 3.04 KB
/
simple.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright 2022 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package eventbus
// compile time check
var _ Channel[int] = (*simpleChannel[int])(nil)
type simpleChannel[T any] struct {
subscribers []chan T
subscriptionChannel chan chan T
unsubscribeChannel chan chan T
eventChannel chan T
eventBufferSize int
closeChannel chan struct{}
isClosed bool
}
// NewSimpleChannel creates a new simpleChannel with the given subscriberBufferSize and
// eventBufferSize.
//
// Should the buffers be filled subsequent calls to functions on this object may start to block.
func NewSimpleChannel[T any](subscriberBufferSize int, eventBufferSize int) Channel[T] {
c := simpleChannel[T]{
subscriptionChannel: make(chan chan T, subscriberBufferSize),
unsubscribeChannel: make(chan chan T, subscriberBufferSize),
eventChannel: make(chan T, eventBufferSize),
eventBufferSize: eventBufferSize,
closeChannel: make(chan struct{}),
}
go c.handleChannel()
return &c
}
func (c *simpleChannel[T]) Subscribe() (Subscription[T], error) {
if c.isClosed {
return nil, ErrSubscribedToClosedChan
}
// It is important to set this buffer size too, else we may end up blocked in the handleChannel func
ch := make(chan T, c.eventBufferSize)
c.subscriptionChannel <- ch
return ch, nil
}
func (c *simpleChannel[T]) Unsubscribe(ch Subscription[T]) {
if c.isClosed {
return
}
c.unsubscribeChannel <- ch
}
func (c *simpleChannel[T]) Publish(item T) {
log.Debugf("publishing item %T", item)
if c.isClosed {
log.Warnf("channel is closed for item %T", item)
return
}
c.eventChannel <- item
}
func (c *simpleChannel[T]) Close() {
if c.isClosed {
return
}
c.isClosed = true
c.closeChannel <- struct{}{}
}
func (c *simpleChannel[T]) handleChannel() {
for {
select {
case <-c.closeChannel:
close(c.closeChannel)
for _, subscriber := range c.subscribers {
close(subscriber)
}
close(c.subscriptionChannel)
close(c.unsubscribeChannel)
close(c.eventChannel)
return
case ch := <-c.unsubscribeChannel:
var isFound bool
var index int
for i, subscriber := range c.subscribers {
if ch == subscriber {
index = i
isFound = true
break
}
}
if !isFound {
continue
}
// Remove channel from list of subscribers
c.subscribers[index] = c.subscribers[len(c.subscribers)-1]
c.subscribers = c.subscribers[:len(c.subscribers)-1]
close(ch)
case newSubscriber := <-c.subscriptionChannel:
c.subscribers = append(c.subscribers, newSubscriber)
case item := <-c.eventChannel:
log.Debugf("sending published event %T to subscribers (%d)", item, len(c.subscribers))
for _, subscriber := range c.subscribers {
subscriber <- item
}
}
}
}