Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add relay metrics #49

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions api/metrics.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
package api

import (
"github.com/voc/srtrelay/config"
"github.com/voc/srtrelay/srt"

"github.com/prometheus/client_golang/prometheus"
)

const (
namespace = "srtrelay"
srtSubsystem = "srt"
)
const srtSubsystem = "srt"

var (
activeSocketsDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, srtSubsystem, "active_sockets"),
prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "active_sockets"),
"The number of active SRT sockets",
nil, nil,
)

// Metrics from: https://pkg.go.dev/github.com/haivision/srtgo#SrtStats
pktSentTotalDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, srtSubsystem, "packets_sent_total"),
prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "packets_sent_total"),
"total number of sent data packets, including retransmissions",
[]string{"address", "stream_id"}, nil,
)

pktRecvTotalDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, srtSubsystem, "packets_received_total"),
prometheus.BuildFQName(config.MetricsNamespace, srtSubsystem, "packets_received_total"),
"total number of received packets",
[]string{"address", "stream_id"}, nil,
)
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/voc/srtrelay/auth"
)

const MetricsNamespace = "srtrelay"

type Config struct {
App AppConfig
Auth AuthConfig
Expand Down
47 changes: 43 additions & 4 deletions relay/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,47 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/voc/srtrelay/config"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const relaySubsystem = "relay"

var (
activeClients = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: prometheus.BuildFQName(config.MetricsNamespace, relaySubsystem, "active_clients"),
Help: "The number of active clients per channel",
},
[]string{"channel_name"},
)
channelCreatedTimestamp = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: prometheus.BuildFQName(config.MetricsNamespace, relaySubsystem, "created_timestamp_seconds"),
Help: "The UNIX timestamp when the channel was created",
},
[]string{"channel_name"},
)
)

type UnsubscribeFunc func()

type Channel struct {
name string
mutex sync.Mutex
subs Subs
maxPackets uint

// statistics
clients atomic.Value
created time.Time

// Prometheus metrics.
activeClients prometheus.Gauge
createdTimestamp prometheus.Gauge
}
type Subs []chan []byte

Expand Down Expand Up @@ -47,13 +76,18 @@ func (subs Subs) Remove(sub chan []byte) Subs {
return subs[:len(subs)-1] // Truncate slice.
}

func NewChannel(maxPackets uint) *Channel {
func NewChannel(name string, maxPackets uint) *Channel {
channelActiveClients := activeClients.WithLabelValues(name)
ch := &Channel{
subs: make([]chan []byte, 0, 10),
maxPackets: maxPackets,
created: time.Now(),
name: name,
subs: make([]chan []byte, 0, 10),
maxPackets: maxPackets,
created: time.Now(),
activeClients: channelActiveClients,
}
ch.clients.Store(0)
ch.createdTimestamp = channelCreatedTimestamp.WithLabelValues(name)
ch.createdTimestamp.Set(float64(ch.created.UnixNano()) / 1000000.0)
return ch
}

Expand All @@ -64,6 +98,7 @@ func (ch *Channel) Sub() (<-chan []byte, UnsubscribeFunc) {
sub := make(chan []byte, ch.maxPackets)
ch.subs = append(ch.subs, sub)
ch.clients.Store(len(ch.subs))
ch.activeClients.Inc()

var unsub UnsubscribeFunc = func() {
ch.mutex.Lock()
Expand All @@ -76,6 +111,7 @@ func (ch *Channel) Sub() (<-chan []byte, UnsubscribeFunc) {

ch.subs = ch.subs.Remove(sub)
ch.clients.Store(len(ch.subs))
ch.activeClients.Dec()
}
return sub, unsub
}
Expand All @@ -99,6 +135,7 @@ func (ch *Channel) Pub(b []byte) {
}
for _, sub := range toRemove {
ch.subs = ch.subs.Remove(sub)
ch.activeClients.Dec()
}
ch.clients.Store(len(ch.subs))
}
Expand All @@ -111,6 +148,8 @@ func (ch *Channel) Close() {
close(ch.subs[i])
}
ch.subs = nil
activeClients.DeleteLabelValues(ch.name)
channelCreatedTimestamp.DeleteLabelValues(ch.name)
}

func (ch *Channel) Stats() Stats {
Expand Down
8 changes: 4 additions & 4 deletions relay/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func TestChannel_PubSub(t *testing.T) {
ch := NewChannel(uint(1316 * 50))
ch := NewChannel("test", uint(1316*50))

// sub
out, unsub := ch.Sub()
Expand All @@ -33,7 +33,7 @@ func TestChannel_PubSub(t *testing.T) {
}

func TestChannel_DropOnOverflow(t *testing.T) {
ch := NewChannel(uint(1316 * 50))
ch := NewChannel("test", uint(1316*50))

sub, _ := ch.Sub()
capacity := cap(sub) + 1
Expand All @@ -60,7 +60,7 @@ func TestChannel_DropOnOverflow(t *testing.T) {
}

func TestChannel_Close(t *testing.T) {
ch := NewChannel(0)
ch := NewChannel("test", 0)
sub1, _ := ch.Sub()
_, _ = ch.Sub()

Expand All @@ -80,7 +80,7 @@ func TestChannel_Close(t *testing.T) {
}

func TestChannel_Stats(t *testing.T) {
ch := NewChannel(0)
ch := NewChannel("test", 0)
if num := ch.Stats().clients; num != 0 {
t.Errorf("Expected 0 clients after create, got %d", num)
}
Expand Down
2 changes: 1 addition & 1 deletion relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *RelayImpl) Publish(name string) (chan<- []byte, error) {
return nil, ErrStreamAlreadyExists
}

channel := NewChannel(s.config.BufferSize / s.config.PacketSize)
channel := NewChannel(name, s.config.BufferSize/s.config.PacketSize)
s.channels[name] = channel

ch := make(chan []byte)
Expand Down
Loading