-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.go
133 lines (107 loc) · 2.74 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package broadcast
import (
"sync"
"time"
)
const (
// DefaultBufferSize size of the queue that holds the streams messages.
DefaultBufferSize = 1024
// DefaultMaxInactivity of a stream
DefaultMaxInactivity = time.Second * 60
)
// Server Is our main struct
type Server struct {
// Specifies the size of the message buffer for each stream
BufferSize int
// Enables creation of a stream when a client connects
AutoStream bool
Streams map[string]*Stream
mu sync.Mutex
}
// New will create a server and setup defaults
func New() *Server {
return &Server{
BufferSize: DefaultBufferSize,
AutoStream: false,
Streams: make(map[string]*Stream),
}
}
// Close shuts down the server, closes all of the streams and connections
func (s *Server) Close() {
s.mu.Lock()
defer s.mu.Unlock()
for id := range s.Streams {
s.Streams[id].quit <- true
delete(s.Streams, id)
}
}
// GetStream returns a stream by id
func (s *Server) GetStream(id string) *Stream {
s.mu.Lock()
defer s.mu.Unlock()
return s.Streams[id]
}
// CreateStream will create a new stream and register it
func (s *Server) CreateStream(id string) *Stream {
// Register new stream
s.mu.Lock()
defer s.mu.Unlock()
if s.Streams[id] != nil {
return s.Streams[id]
}
s.Streams[id] = newStream(s.BufferSize)
return s.Streams[id]
}
// RemoveStream will remove a stream
func (s *Server) RemoveStream(id string) {
s.mu.Lock()
defer s.mu.Unlock()
if s.Streams[id] != nil {
s.Streams[id].close()
delete(s.Streams, id)
}
}
// StreamExists checks whether a stream by a given id exists
func (s *Server) StreamExists(id string) bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.Streams[id] != nil
}
// Publish sends a mesage to every client in a streamID
func (s *Server) Publish(id string, data []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if s.Streams[id] != nil {
s.Streams[id].event <- &Event{Data: data}
}
}
// Register a subscriber
func (s *Server) Register(id string, sub *Subscriber) {
s.mu.Lock()
defer s.mu.Unlock()
s.Streams[id].addSubscriber(sub)
}
// GetSubscriber will get an existing subscriber
func (s *Server) GetSubscriber(id string) *Subscriber {
s.mu.Lock()
defer s.mu.Unlock()
for _, stream := range s.Streams {
sub := stream.getSubscriber(id)
if sub != nil {
return sub
}
}
return nil
}
// GetStreamSubscriber will get an existing stream subscriber
func (s *Server) GetStreamSubscriber(stream, id string) *Subscriber {
s.mu.Lock()
defer s.mu.Unlock()
if s.Streams[stream] == nil {
return nil
}
return s.Streams[stream].getSubscriber(id)
}