From 85f14c3e72913bfe79db2c077e80136582ddb066 Mon Sep 17 00:00:00 2001 From: SuperQ Date: Tue, 24 Dec 2024 12:15:06 +0100 Subject: [PATCH] Add relay metrics Add Prometheus metrics to the relay package. THe new metrics match the existing Stats struct. * Pass the channel name to the NewChannel func so it can be used as a label. * Refactor metrics namespace into the config package. Signed-off-by: SuperQ --- api/metrics.go | 12 +++++------- config/config.go | 2 ++ relay/channel.go | 47 +++++++++++++++++++++++++++++++++++++++++++---- relay/relay.go | 2 +- 4 files changed, 51 insertions(+), 12 deletions(-) diff --git a/api/metrics.go b/api/metrics.go index a6655fa..e23f86b 100644 --- a/api/metrics.go +++ b/api/metrics.go @@ -1,32 +1,30 @@ package api import ( + "github.com/voc/srtrelay/config" "github.com/voc/srtrelay/srt" "github.com/prometheus/client_golang/prometheus" ) -const ( - namespace = "srtrelay" - srtSubsystem = "srt" -) +const srtSubsystem = "srt" var ( activeSocketsDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, srtSubsystem, "active_sockets"), + prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "active_sockets"), "The number of active SRT sockets", nil, nil, ) // Metrics from: https://pkg.go.dev/github.com/haivision/srtgo#SrtStats pktSentTotalDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, srtSubsystem, "packets_sent_total"), + prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "packets_sent_total"), "total number of sent data packets, including retransmissions", []string{"address", "stream_id"}, nil, ) pktRecvTotalDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, srtSubsystem, "packets_received_total"), + prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "packets_received_total"), "total number of received packets", []string{"address", "stream_id"}, nil, ) diff --git a/config/config.go b/config/config.go index cb96e2d..a965dcd 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,8 @@ import ( "github.com/voc/srtrelay/auth" ) +const MetricsNamespace = "srtrelay" + type Config struct { App AppConfig Auth AuthConfig diff --git a/relay/channel.go b/relay/channel.go index a7c0de1..716f79a 100644 --- a/relay/channel.go +++ b/relay/channel.go @@ -5,11 +5,36 @@ import ( "sync" "sync/atomic" "time" + + "github.com/voc/srtrelay/config" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const relaySubsystem = "relay" + +var ( + activeClients = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(config.MetricsNamespace, relaySubsystem, "active_clients"), + Help: "The number of active clients per channel", + }, + []string{"channel_name"}, + ) + channelCreatedTimestamp = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(config.MetricsNamespace, relaySubsystem, "created_timestamp_seconds"), + Help: "The UNIX timestamp when the channel was created", + }, + []string{"channel_name"}, + ) ) type UnsubscribeFunc func() type Channel struct { + name string mutex sync.Mutex subs Subs maxPackets uint @@ -17,6 +42,10 @@ type Channel struct { // statistics clients atomic.Value created time.Time + + // Prometheus metrics. + activeClients prometheus.Gauge + createdTimestamp prometheus.Gauge } type Subs []chan []byte @@ -47,13 +76,18 @@ func (subs Subs) Remove(sub chan []byte) Subs { return subs[:len(subs)-1] // Truncate slice. } -func NewChannel(maxPackets uint) *Channel { +func NewChannel(name string, maxPackets uint) *Channel { + channelActiveClients := activeClients.WithLabelValues(name) ch := &Channel{ - subs: make([]chan []byte, 0, 10), - maxPackets: maxPackets, - created: time.Now(), + name: name, + subs: make([]chan []byte, 0, 10), + maxPackets: maxPackets, + created: time.Now(), + activeClients: channelActiveClients, } ch.clients.Store(0) + ch.createdTimestamp = channelCreatedTimestamp.WithLabelValues(name) + ch.createdTimestamp.Set(float64(ch.created.UnixNano()) / 1000000.0) return ch } @@ -64,6 +98,7 @@ func (ch *Channel) Sub() (<-chan []byte, UnsubscribeFunc) { sub := make(chan []byte, ch.maxPackets) ch.subs = append(ch.subs, sub) ch.clients.Store(len(ch.subs)) + ch.activeClients.Inc() var unsub UnsubscribeFunc = func() { ch.mutex.Lock() @@ -76,6 +111,7 @@ func (ch *Channel) Sub() (<-chan []byte, UnsubscribeFunc) { ch.subs = ch.subs.Remove(sub) ch.clients.Store(len(ch.subs)) + ch.activeClients.Dec() } return sub, unsub } @@ -99,6 +135,7 @@ func (ch *Channel) Pub(b []byte) { } for _, sub := range toRemove { ch.subs = ch.subs.Remove(sub) + ch.activeClients.Dec() } ch.clients.Store(len(ch.subs)) } @@ -111,6 +148,8 @@ func (ch *Channel) Close() { close(ch.subs[i]) } ch.subs = nil + activeClients.DeleteLabelValues(ch.name) + channelCreatedTimestamp.DeleteLabelValues(ch.name) } func (ch *Channel) Stats() Stats { diff --git a/relay/relay.go b/relay/relay.go index 1235e76..6f05fa6 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -54,7 +54,7 @@ func (s *RelayImpl) Publish(name string) (chan<- []byte, error) { return nil, ErrStreamAlreadyExists } - channel := NewChannel(s.config.BufferSize / s.config.PacketSize) + channel := NewChannel(name, s.config.BufferSize/s.config.PacketSize) s.channels[name] = channel ch := make(chan []byte)