Skip to content

Commit

Permalink
🐦 stats: add SnapshotAndReset and clean up a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
database64128 committed Feb 11, 2023
1 parent 5835dc6 commit cc9b3cc
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 95 deletions.
123 changes: 88 additions & 35 deletions stats/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync/atomic"
)

type traffic struct {
type trafficCollector struct {
downlinkPackets atomic.Uint64
downlinkBytes atomic.Uint64
uplinkPackets atomic.Uint64
Expand All @@ -14,21 +14,21 @@ type traffic struct {
udpSessions atomic.Uint64
}

func (t *traffic) collectTCPSession(downlinkBytes, uplinkBytes uint64) {
t.downlinkBytes.Add(downlinkBytes)
t.uplinkBytes.Add(uplinkBytes)
t.tcpSessions.Add(1)
func (tc *trafficCollector) collectTCPSession(downlinkBytes, uplinkBytes uint64) {
tc.downlinkBytes.Add(downlinkBytes)
tc.uplinkBytes.Add(uplinkBytes)
tc.tcpSessions.Add(1)
}

func (t *traffic) collectUDPSessionDownlink(downlinkPackets, downlinkBytes uint64) {
t.downlinkPackets.Add(downlinkPackets)
t.downlinkBytes.Add(downlinkBytes)
t.udpSessions.Add(1)
func (tc *trafficCollector) collectUDPSessionDownlink(downlinkPackets, downlinkBytes uint64) {
tc.downlinkPackets.Add(downlinkPackets)
tc.downlinkBytes.Add(downlinkBytes)
tc.udpSessions.Add(1)
}

func (t *traffic) collectUDPSessionUplink(uplinkPackets, uplinkBytes uint64) {
t.uplinkPackets.Add(uplinkPackets)
t.uplinkBytes.Add(uplinkBytes)
func (tc *trafficCollector) collectUDPSessionUplink(uplinkPackets, uplinkBytes uint64) {
tc.uplinkPackets.Add(uplinkPackets)
tc.uplinkBytes.Add(uplinkBytes)
}

// Traffic stores the traffic statistics.
Expand All @@ -41,19 +41,39 @@ type Traffic struct {
UDPSessions uint64 `json:"udpSessions"`
}

func (t *traffic) snapshot() Traffic {
func (t *Traffic) Add(u Traffic) {
t.DownlinkPackets += u.DownlinkPackets
t.DownlinkBytes += u.DownlinkBytes
t.UplinkPackets += u.UplinkPackets
t.UplinkBytes += u.UplinkBytes
t.TCPSessions += u.TCPSessions
t.UDPSessions += u.UDPSessions
}

func (tc *trafficCollector) snapshot() Traffic {
return Traffic{
DownlinkPackets: tc.downlinkPackets.Load(),
DownlinkBytes: tc.downlinkBytes.Load(),
UplinkPackets: tc.uplinkPackets.Load(),
UplinkBytes: tc.uplinkBytes.Load(),
TCPSessions: tc.tcpSessions.Load(),
UDPSessions: tc.udpSessions.Load(),
}
}

func (tc *trafficCollector) snapshotAndReset() Traffic {
return Traffic{
DownlinkPackets: t.downlinkPackets.Load(),
DownlinkBytes: t.downlinkBytes.Load(),
UplinkPackets: t.uplinkPackets.Load(),
UplinkBytes: t.uplinkBytes.Load(),
TCPSessions: t.tcpSessions.Load(),
UDPSessions: t.udpSessions.Load(),
DownlinkPackets: tc.downlinkPackets.Swap(0),
DownlinkBytes: tc.downlinkBytes.Swap(0),
UplinkPackets: tc.uplinkPackets.Swap(0),
UplinkBytes: tc.uplinkBytes.Swap(0),
TCPSessions: tc.tcpSessions.Swap(0),
UDPSessions: tc.udpSessions.Swap(0),
}
}

type userCollector struct {
traffic
trafficCollector
}

// User stores the user's traffic statistics.
Expand All @@ -65,12 +85,19 @@ type User struct {
func (uc *userCollector) snapshot(username string) User {
return User{
Name: username,
Traffic: uc.traffic.snapshot(),
Traffic: uc.trafficCollector.snapshot(),
}
}

func (uc *userCollector) snapshotAndReset(username string) User {
return User{
Name: username,
Traffic: uc.trafficCollector.snapshotAndReset(),
}
}

type serverCollector struct {
traffic
tc trafficCollector
ucs map[string]*userCollector
mu sync.RWMutex
}
Expand Down Expand Up @@ -98,42 +125,60 @@ func (sc *serverCollector) userCollector(username string) *userCollector {
return uc
}

func (sc *serverCollector) trafficCollector(username string) *trafficCollector {
if username == "" {
return &sc.tc
}
return &sc.userCollector(username).trafficCollector
}

// CollectTCPSession implements the Collector CollectTCPSession method.
func (sc *serverCollector) CollectTCPSession(username string, downlinkBytes, uplinkBytes uint64) {
sc.userCollector(username).collectTCPSession(downlinkBytes, uplinkBytes)
sc.collectTCPSession(downlinkBytes, uplinkBytes)
sc.trafficCollector(username).collectTCPSession(downlinkBytes, uplinkBytes)
}

// CollectUDPSessionDownlink implements the Collector CollectUDPSessionDownlink method.
func (sc *serverCollector) CollectUDPSessionDownlink(username string, downlinkPackets, downlinkBytes uint64) {
sc.userCollector(username).collectUDPSessionDownlink(downlinkPackets, downlinkBytes)
sc.collectUDPSessionDownlink(downlinkPackets, downlinkBytes)
sc.trafficCollector(username).collectUDPSessionDownlink(downlinkPackets, downlinkBytes)
}

// CollectUDPSessionUplink implements the Collector CollectUDPSessionUplink method.
func (sc *serverCollector) CollectUDPSessionUplink(username string, uplinkPackets, uplinkBytes uint64) {
sc.userCollector(username).collectUDPSessionUplink(uplinkPackets, uplinkBytes)
sc.collectUDPSessionUplink(uplinkPackets, uplinkBytes)
sc.trafficCollector(username).collectUDPSessionUplink(uplinkPackets, uplinkBytes)
}

// Server stores the server's traffic statistics.
type Server struct {
Traffic
Users []User `json:"users"`
Users []User `json:"users,omitempty"`
}

// Snapshot implements the Collector Snapshot method.
func (sc *serverCollector) Snapshot() Server {
func (sc *serverCollector) Snapshot() (s Server) {
s.Traffic = sc.tc.snapshot()
sc.mu.RLock()
users := make([]User, 0, len(sc.ucs))
s.Users = make([]User, 0, len(sc.ucs))
for username, uc := range sc.ucs {
users = append(users, uc.snapshot(username))
u := uc.snapshot(username)
s.Traffic.Add(u.Traffic)
s.Users = append(s.Users, u)
}
sc.mu.RUnlock()
return Server{
Traffic: sc.traffic.snapshot(),
Users: users,
return
}

// SnapshotAndReset implements the Collector SnapshotAndReset method.
func (sc *serverCollector) SnapshotAndReset() (s Server) {
s.Traffic = sc.tc.snapshotAndReset()
sc.mu.RLock()
s.Users = make([]User, 0, len(sc.ucs))
for username, uc := range sc.ucs {
u := uc.snapshotAndReset(username)
s.Traffic.Add(u.Traffic)
s.Users = append(s.Users, u)
}
sc.mu.RUnlock()
return
}

// Collector collects server traffic statistics.
Expand All @@ -149,6 +194,9 @@ type Collector interface {

// Snapshot returns the server's traffic statistics.
Snapshot() Server

// SnapshotAndReset returns the server's traffic statistics and resets the statistics.
SnapshotAndReset() Server
}

// NoopCollector is a no-op collector.
Expand All @@ -170,6 +218,11 @@ func (NoopCollector) Snapshot() Server {
return Server{}
}

// SnapshotAndReset implements the Collector SnapshotAndReset method.
func (NoopCollector) SnapshotAndReset() Server {
return Server{}
}

// Config stores configuration for the stats collector.
type Config struct {
Enabled bool `json:"enabled"`
Expand Down
141 changes: 81 additions & 60 deletions stats/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,91 +22,112 @@ func collect(t *testing.T, c Collector) {
c.CollectUDPSessionUplink("Alex", 2048, 20480)
}

func verify(t *testing.T, c Collector) {
func collectNoUsername(t *testing.T, c Collector) {
t.Helper()
s := c.Snapshot()
if s.Traffic.DownlinkPackets != 1142 {
t.Errorf("expected 1142 downlink packets, got %d", s.Traffic.DownlinkPackets)
}
if s.Traffic.DownlinkBytes != 100352 {
t.Errorf("expected 100352 downlink bytes, got %d", s.Traffic.DownlinkBytes)
}
if s.Traffic.UplinkPackets != 2953 {
t.Errorf("expected 2953 uplink packets, got %d", s.Traffic.UplinkPackets)
c.CollectTCPSession("", 1024, 2048)
c.CollectUDPSessionDownlink("", 1, 3072)
c.CollectUDPSessionUplink("", 2, 4096)
}

func verify(t *testing.T, s Server) {
t.Helper()
expectedServerTraffic := Traffic{
DownlinkPackets: 1142,
DownlinkBytes: 100352,
UplinkPackets: 2953,
UplinkBytes: 114688,
TCPSessions: 4,
UDPSessions: 6,
}
if s.Traffic.UplinkBytes != 114688 {
t.Errorf("expected 114688 uplink bytes, got %d", s.Traffic.UplinkBytes)
expectedSteveTraffic := Traffic{
DownlinkPackets: 34,
DownlinkBytes: 44032,
UplinkPackets: 136,
UplinkBytes: 53248,
TCPSessions: 3,
UDPSessions: 2,
}
if s.Traffic.TCPSessions != 4 {
t.Errorf("expected 4 TCP sessions, got %d", s.Traffic.TCPSessions)
expectedAlexTraffic := Traffic{
DownlinkPackets: 1108,
DownlinkBytes: 56320,
UplinkPackets: 2817,
UplinkBytes: 61440,
TCPSessions: 1,
UDPSessions: 4,
}
if s.Traffic.UDPSessions != 6 {
t.Errorf("expected 6 UDP sessions, got %d", s.Traffic.UDPSessions)
if s.Traffic != expectedServerTraffic {
t.Errorf("expected server traffic %+v, got %+v", expectedServerTraffic, s.Traffic)
}
if len(s.Users) != 2 {
t.Fatalf("expected 2 users, got %d", len(s.Users))
}
for _, u := range s.Users {
switch u.Name {
case "Alex":
if u.Traffic.DownlinkPackets != 1108 {
t.Errorf("expected 1108 downlink packets for Alex, got %d", u.Traffic.DownlinkPackets)
}
if u.Traffic.DownlinkBytes != 56320 {
t.Errorf("expected 56320 downlink bytes for Alex, got %d", u.Traffic.DownlinkBytes)
}
if u.Traffic.UplinkPackets != 2817 {
t.Errorf("expected 2817 uplink packets for Alex, got %d", u.Traffic.UplinkPackets)
}
if u.Traffic.UplinkBytes != 61440 {
t.Errorf("expected 61440 uplink bytes for Alex, got %d", u.Traffic.UplinkBytes)
}
if u.Traffic.TCPSessions != 1 {
t.Errorf("expected 1 TCP session for Alex, got %d", u.Traffic.TCPSessions)
}
if u.Traffic.UDPSessions != 4 {
t.Errorf("expected 4 UDP sessions for Alex, got %d", u.Traffic.UDPSessions)
}
case "Steve":
if u.Traffic.DownlinkPackets != 34 {
t.Errorf("expected 34 downlink packets for Steve, got %d", u.Traffic.DownlinkPackets)
}
if u.Traffic.DownlinkBytes != 44032 {
t.Errorf("expected 44032 downlink bytes for Steve, got %d", u.Traffic.DownlinkBytes)
}
if u.Traffic.UplinkPackets != 136 {
t.Errorf("expected 136 uplink packets for Steve, got %d", u.Traffic.UplinkPackets)
}
if u.Traffic.UplinkBytes != 53248 {
t.Errorf("expected 53248 uplink bytes for Steve, got %d", u.Traffic.UplinkBytes)
}
if u.Traffic.TCPSessions != 3 {
t.Errorf("expected 3 TCP sessions for Steve, got %d", u.Traffic.TCPSessions)
if u.Traffic != expectedSteveTraffic {
t.Errorf("expected Steve traffic %+v, got %+v", expectedSteveTraffic, u.Traffic)
}
if u.Traffic.UDPSessions != 2 {
t.Errorf("expected 2 UDP sessions for Steve, got %d", u.Traffic.UDPSessions)
case "Alex":
if u.Traffic != expectedAlexTraffic {
t.Errorf("expected Alex traffic %+v, got %+v", expectedAlexTraffic, u.Traffic)
}
default:
t.Errorf("unexpected user %s", u.Name)
}
}
}

func verifyNoUsername(t *testing.T, s Server) {
t.Helper()
expectedServerTraffic := Traffic{
DownlinkPackets: 1,
DownlinkBytes: 4096,
UplinkPackets: 2,
UplinkBytes: 6144,
TCPSessions: 1,
UDPSessions: 1,
}
if s.Traffic != expectedServerTraffic {
t.Errorf("expected server traffic %+v, got %+v", expectedServerTraffic, s.Traffic)
}
if len(s.Users) != 0 {
t.Errorf("expected zero users, got %d", len(s.Users))
}
}

func verifyEmpty(t *testing.T, s Server) {
t.Helper()
var zero Traffic
if s.Traffic != zero {
t.Errorf("expected zero traffic, got %+v", s.Traffic)
}
for _, u := range s.Users {
if u.Traffic != zero {
t.Errorf("expected zero traffic for user %s, got %+v", u.Name, u.Traffic)
}
}
}

func TestServerCollector(t *testing.T) {
c := Config{Enabled: true}.Collector()
collectNoUsername(t, c)
verifyNoUsername(t, c.Snapshot())
verifyNoUsername(t, c.SnapshotAndReset())
verifyEmpty(t, c.Snapshot())
collect(t, c)
verify(t, c)
verify(t, c.Snapshot())
verify(t, c.SnapshotAndReset())
verifyEmpty(t, c.Snapshot())
}

func TestNoopCollector(t *testing.T) {
c := Config{}.Collector()
collectNoUsername(t, c)
verifyEmpty(t, c.Snapshot())
verifyEmpty(t, c.SnapshotAndReset())
verifyEmpty(t, c.Snapshot())
collect(t, c)
s := c.Snapshot()
var zero Traffic
if s.Traffic != zero {
t.Errorf("expected zero traffic, got %+v", s.Traffic)
}
if len(s.Users) != 0 {
t.Errorf("expected zero users, got %d", len(s.Users))
}
verifyEmpty(t, c.Snapshot())
verifyEmpty(t, c.SnapshotAndReset())
verifyEmpty(t, c.Snapshot())
}

0 comments on commit cc9b3cc

Please sign in to comment.