diff --git a/api/metrics.go b/api/metrics.go index a6655fa..e23f86b 100644 --- a/api/metrics.go +++ b/api/metrics.go @@ -1,32 +1,30 @@ package api import ( + "github.com/voc/srtrelay/config" "github.com/voc/srtrelay/srt" "github.com/prometheus/client_golang/prometheus" ) -const ( - namespace = "srtrelay" - srtSubsystem = "srt" -) +const srtSubsystem = "srt" var ( activeSocketsDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, srtSubsystem, "active_sockets"), + prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "active_sockets"), "The number of active SRT sockets", nil, nil, ) // Metrics from: https://pkg.go.dev/github.com/haivision/srtgo#SrtStats pktSentTotalDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, srtSubsystem, "packets_sent_total"), + prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "packets_sent_total"), "total number of sent data packets, including retransmissions", []string{"address", "stream_id"}, nil, ) pktRecvTotalDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, srtSubsystem, "packets_received_total"), + prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "packets_received_total"), "total number of received packets", []string{"address", "stream_id"}, nil, ) diff --git a/config/config.go b/config/config.go index cb96e2d..a965dcd 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,8 @@ import ( "github.com/voc/srtrelay/auth" ) +const MetricsNamespace = "srtrelay" + type Config struct { App AppConfig Auth AuthConfig diff --git a/relay/channel.go b/relay/channel.go index a7c0de1..716f79a 100644 --- a/relay/channel.go +++ b/relay/channel.go @@ -5,11 +5,36 @@ import ( "sync" "sync/atomic" "time" + + "github.com/voc/srtrelay/config" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const relaySubsystem = "relay" + +var ( + activeClients = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(config.MetricsNamespace, relaySubsystem, "active_clients"), + Help: "The number of active clients per channel", + }, + []string{"channel_name"}, + ) + channelCreatedTimestamp = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(config.MetricsNamespace, relaySubsystem, "created_timestamp_seconds"), + Help: "The UNIX timestamp when the channel was created", + }, + []string{"channel_name"}, + ) ) type UnsubscribeFunc func() type Channel struct { + name string mutex sync.Mutex subs Subs maxPackets uint @@ -17,6 +42,10 @@ type Channel struct { // statistics clients atomic.Value created time.Time + + // Prometheus metrics. + activeClients prometheus.Gauge + createdTimestamp prometheus.Gauge } type Subs []chan []byte @@ -47,13 +76,18 @@ func (subs Subs) Remove(sub chan []byte) Subs { return subs[:len(subs)-1] // Truncate slice. } -func NewChannel(maxPackets uint) *Channel { +func NewChannel(name string, maxPackets uint) *Channel { + channelActiveClients := activeClients.WithLabelValues(name) ch := &Channel{ - subs: make([]chan []byte, 0, 10), - maxPackets: maxPackets, - created: time.Now(), + name: name, + subs: make([]chan []byte, 0, 10), + maxPackets: maxPackets, + created: time.Now(), + activeClients: channelActiveClients, } ch.clients.Store(0) + ch.createdTimestamp = channelCreatedTimestamp.WithLabelValues(name) + ch.createdTimestamp.Set(float64(ch.created.UnixNano()) / 1000000.0) return ch } @@ -64,6 +98,7 @@ func (ch *Channel) Sub() (<-chan []byte, UnsubscribeFunc) { sub := make(chan []byte, ch.maxPackets) ch.subs = append(ch.subs, sub) ch.clients.Store(len(ch.subs)) + ch.activeClients.Inc() var unsub UnsubscribeFunc = func() { ch.mutex.Lock() @@ -76,6 +111,7 @@ func (ch *Channel) Sub() (<-chan []byte, UnsubscribeFunc) { ch.subs = ch.subs.Remove(sub) ch.clients.Store(len(ch.subs)) + ch.activeClients.Dec() } return sub, unsub } @@ -99,6 +135,7 @@ func (ch *Channel) Pub(b []byte) { } for _, sub := range toRemove { ch.subs = ch.subs.Remove(sub) + ch.activeClients.Dec() } ch.clients.Store(len(ch.subs)) } @@ -111,6 +148,8 @@ func (ch *Channel) Close() { close(ch.subs[i]) } ch.subs = nil + activeClients.DeleteLabelValues(ch.name) + channelCreatedTimestamp.DeleteLabelValues(ch.name) } func (ch *Channel) Stats() Stats { diff --git a/relay/channel_test.go b/relay/channel_test.go index f9c1051..9529d92 100644 --- a/relay/channel_test.go +++ b/relay/channel_test.go @@ -6,7 +6,7 @@ import ( ) func TestChannel_PubSub(t *testing.T) { - ch := NewChannel(uint(1316 * 50)) + ch := NewChannel("test", uint(1316 * 50)) // sub out, unsub := ch.Sub() @@ -33,7 +33,7 @@ func TestChannel_PubSub(t *testing.T) { } func TestChannel_DropOnOverflow(t *testing.T) { - ch := NewChannel(uint(1316 * 50)) + ch := NewChannel("test", uint(1316 * 50)) sub, _ := ch.Sub() capacity := cap(sub) + 1 @@ -60,7 +60,7 @@ func TestChannel_DropOnOverflow(t *testing.T) { } func TestChannel_Close(t *testing.T) { - ch := NewChannel(0) + ch := NewChannel("test", 0) sub1, _ := ch.Sub() _, _ = ch.Sub() @@ -80,7 +80,7 @@ func TestChannel_Close(t *testing.T) { } func TestChannel_Stats(t *testing.T) { - ch := NewChannel(0) + ch := NewChannel("test", 0) if num := ch.Stats().clients; num != 0 { t.Errorf("Expected 0 clients after create, got %d", num) } diff --git a/relay/relay.go b/relay/relay.go index 1235e76..6f05fa6 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -54,7 +54,7 @@ func (s *RelayImpl) Publish(name string) (chan<- []byte, error) { return nil, ErrStreamAlreadyExists } - channel := NewChannel(s.config.BufferSize / s.config.PacketSize) + channel := NewChannel(name, s.config.BufferSize/s.config.PacketSize) s.channels[name] = channel ch := make(chan []byte)