Skip to content

Commit

Permalink
Add relay metrics
Browse files Browse the repository at this point in the history
Add Prometheus metrics to the relay package. The new metrics match the
existing Stats struct.
* Pass the channel name to the NewChannel func so it can be used as a
  label.
* Refactor metrics namespace into the config package.

Signed-off-by: SuperQ <[email protected]>
  • Loading branch information
SuperQ committed Dec 24, 2024
1 parent dcf322b commit fff6fa8
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 16 deletions.
12 changes: 5 additions & 7 deletions api/metrics.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
package api

import (
"github.com/voc/srtrelay/config"
"github.com/voc/srtrelay/srt"

"github.com/prometheus/client_golang/prometheus"
)

const (
namespace = "srtrelay"
srtSubsystem = "srt"
)
const srtSubsystem = "srt"

var (
activeSocketsDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, srtSubsystem, "active_sockets"),
prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "active_sockets"),
"The number of active SRT sockets",
nil, nil,
)

// Metrics from: https://pkg.go.dev/github.com/haivision/srtgo#SrtStats
pktSentTotalDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, srtSubsystem, "packets_sent_total"),
prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "packets_sent_total"),
"total number of sent data packets, including retransmissions",
[]string{"address", "stream_id"}, nil,
)

pktRecvTotalDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, srtSubsystem, "packets_received_total"),
prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "packets_received_total"),
"total number of received packets",
[]string{"address", "stream_id"}, nil,
)
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/voc/srtrelay/auth"
)

const MetricsNamespace = "srtrelay"

type Config struct {
App AppConfig
Auth AuthConfig
Expand Down
47 changes: 43 additions & 4 deletions relay/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,47 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/voc/srtrelay/config"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const relaySubsystem = "relay"

var (
activeClients = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: prometheus.BuildFQName(config.MetricsNamespace, relaySubsystem, "active_clients"),
Help: "The number of active clients per channel",
},
[]string{"channel_name"},
)
channelCreatedTimestamp = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: prometheus.BuildFQName(config.MetricsNamespace, relaySubsystem, "created_timestamp_seconds"),
Help: "The UNIX timestamp when the channel was created",
},
[]string{"channel_name"},
)
)

type UnsubscribeFunc func()

type Channel struct {
name string
mutex sync.Mutex
subs Subs
maxPackets uint

// statistics
clients atomic.Value
created time.Time

// Prometheus metrics.
activeClients prometheus.Gauge
createdTimestamp prometheus.Gauge
}
type Subs []chan []byte

Expand Down Expand Up @@ -47,13 +76,18 @@ func (subs Subs) Remove(sub chan []byte) Subs {
return subs[:len(subs)-1] // Truncate slice.
}

func NewChannel(maxPackets uint) *Channel {
func NewChannel(name string, maxPackets uint) *Channel {
channelActiveClients := activeClients.WithLabelValues(name)
ch := &Channel{
subs: make([]chan []byte, 0, 10),
maxPackets: maxPackets,
created: time.Now(),
name: name,
subs: make([]chan []byte, 0, 10),
maxPackets: maxPackets,
created: time.Now(),
activeClients: channelActiveClients,
}
ch.clients.Store(0)
ch.createdTimestamp = channelCreatedTimestamp.WithLabelValues(name)
ch.createdTimestamp.Set(float64(ch.created.UnixNano()) / 1000000.0)
return ch
}

Expand All @@ -64,6 +98,7 @@ func (ch *Channel) Sub() (<-chan []byte, UnsubscribeFunc) {
sub := make(chan []byte, ch.maxPackets)
ch.subs = append(ch.subs, sub)
ch.clients.Store(len(ch.subs))
ch.activeClients.Inc()

var unsub UnsubscribeFunc = func() {
ch.mutex.Lock()
Expand All @@ -76,6 +111,7 @@ func (ch *Channel) Sub() (<-chan []byte, UnsubscribeFunc) {

ch.subs = ch.subs.Remove(sub)
ch.clients.Store(len(ch.subs))
ch.activeClients.Dec()
}
return sub, unsub
}
Expand All @@ -99,6 +135,7 @@ func (ch *Channel) Pub(b []byte) {
}
for _, sub := range toRemove {
ch.subs = ch.subs.Remove(sub)
ch.activeClients.Dec()
}
ch.clients.Store(len(ch.subs))
}
Expand All @@ -111,6 +148,8 @@ func (ch *Channel) Close() {
close(ch.subs[i])
}
ch.subs = nil
activeClients.DeleteLabelValues(ch.name)
channelCreatedTimestamp.DeleteLabelValues(ch.name)
}

func (ch *Channel) Stats() Stats {
Expand Down
8 changes: 4 additions & 4 deletions relay/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func TestChannel_PubSub(t *testing.T) {
ch := NewChannel(uint(1316 * 50))
ch := NewChannel("test", uint(1316 * 50))

// sub
out, unsub := ch.Sub()
Expand All @@ -33,7 +33,7 @@ func TestChannel_PubSub(t *testing.T) {
}

func TestChannel_DropOnOverflow(t *testing.T) {
ch := NewChannel(uint(1316 * 50))
ch := NewChannel("test", uint(1316 * 50))

sub, _ := ch.Sub()
capacity := cap(sub) + 1
Expand All @@ -60,7 +60,7 @@ func TestChannel_DropOnOverflow(t *testing.T) {
}

func TestChannel_Close(t *testing.T) {
ch := NewChannel(0)
ch := NewChannel("test", 0)
sub1, _ := ch.Sub()
_, _ = ch.Sub()

Expand All @@ -80,7 +80,7 @@ func TestChannel_Close(t *testing.T) {
}

func TestChannel_Stats(t *testing.T) {
ch := NewChannel(0)
ch := NewChannel("test", 0)
if num := ch.Stats().clients; num != 0 {
t.Errorf("Expected 0 clients after create, got %d", num)
}
Expand Down
2 changes: 1 addition & 1 deletion relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *RelayImpl) Publish(name string) (chan<- []byte, error) {
return nil, ErrStreamAlreadyExists
}

channel := NewChannel(s.config.BufferSize / s.config.PacketSize)
channel := NewChannel(name, s.config.BufferSize/s.config.PacketSize)
s.channels[name] = channel

ch := make(chan []byte)
Expand Down

0 comments on commit fff6fa8

Please sign in to comment.