Skip to content

Commit

Permalink
Refactor Register, fix log levels
Browse files Browse the repository at this point in the history
  • Loading branch information
hoffmaen committed Dec 13, 2024
1 parent e14d71e commit fae675a
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 60 deletions.
1 change: 0 additions & 1 deletion metrics/compositereporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type RouteRegistryReporter interface {
CaptureRegistryMessage(msg ComponentTagged, action string)
CaptureRouteRegistrationLatency(t time.Duration)
UnmuzzleRouteRegistrationLatency()
CaptureUnregistryMessage(msg ComponentTagged, action string)
}

type CompositeReporter struct {
Expand Down
10 changes: 0 additions & 10 deletions metrics/metricsreporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,6 @@ func (m *MetricsReporter) CaptureRegistryMessage(msg ComponentTagged, action str
m.Batcher.BatchIncrementCounter(componentName)
}

func (m *MetricsReporter) CaptureUnregistryMessage(msg ComponentTagged, action string) {
var componentName string
if msg.Component() == "" {
componentName = "unregistry_message." + action
} else {
componentName = "unregistry_message." + action + "." + msg.Component()
}
m.Batcher.BatchIncrementCounter(componentName)
}

func (m *MetricsReporter) CaptureWebSocketUpdate() {
m.Batcher.BatchIncrementCounter("websocket_upgrades")
}
Expand Down
14 changes: 7 additions & 7 deletions metrics/metricsreporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,33 +517,33 @@ var _ = Describe("MetricsReporter", func() {
BeforeEach(func() {
endpoint = new(route.Endpoint)
endpoint.Tags = map[string]string{"component": "oauth-server"}
metricReporter.CaptureUnregistryMessage(endpoint, "some-action")
metricReporter.CaptureRegistryMessage(endpoint, "some-action")
})

It("increments the counter metric", func() {
Expect(batcher.BatchIncrementCounterCallCount()).To(Equal(1))
Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("unregistry_message.some-action.oauth-server"))
Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("registry_message.some-action.oauth-server"))
})

It("increments the counter metric for each component unregistered", func() {
endpointTwo := new(route.Endpoint)
endpointTwo.Tags = map[string]string{"component": "api-server"}
metricReporter.CaptureUnregistryMessage(endpointTwo, "some-action")
metricReporter.CaptureRegistryMessage(endpointTwo, "some-action")

Expect(batcher.BatchIncrementCounterCallCount()).To(Equal(2))
Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("unregistry_message.some-action.oauth-server"))
Expect(batcher.BatchIncrementCounterArgsForCall(1)).To(Equal("unregistry_message.some-action.api-server"))
Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("registry_message.some-action.oauth-server"))
Expect(batcher.BatchIncrementCounterArgsForCall(1)).To(Equal("registry_message.some-action.api-server"))
})
})
Context("when unregister msg with empty component name is incremented", func() {
BeforeEach(func() {
endpoint = new(route.Endpoint)
endpoint.Tags = map[string]string{}
metricReporter.CaptureUnregistryMessage(endpoint, "some-action")
metricReporter.CaptureRegistryMessage(endpoint, "some-action")
})
It("increments the counter metric", func() {
Expect(batcher.BatchIncrementCounterCallCount()).To(Equal(1))
Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("unregistry_message.some-action"))
Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("registry_message.some-action"))
})
})
})
Expand Down
47 changes: 35 additions & 12 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,41 @@ func (r *RouteRegistry) Register(uri route.Uri, endpoint *route.Endpoint) {
if !r.endpointInRouterShard(endpoint) {
return
}
t := time.Now()
registerRouteResult, pool := r.registerRoute(uri)
if registerRouteResult == route.RouteRegistered {
r.reporter.CaptureRegistryMessage(endpoint, string(route.RouteRegistered))
if r.logger.Enabled(context.Background(), slog.LevelInfo) {
r.logger.Info(string(route.RouteRegistered), buildSlogAttrs(uri, endpoint)...)
}
}

endpointPutResult := r.register(uri, endpoint)
endpointPutResult := r.registerEndpoint(endpoint, pool)

if endpointPutResult == route.EndpointAdded && !endpoint.UpdatedAt.IsZero() {
r.reporter.CaptureRouteRegistrationLatency(time.Since(endpoint.UpdatedAt))
}

r.reporter.CaptureRegistryMessage(endpoint, string(endpointPutResult))
if r.logger.Enabled(context.Background(), slog.LevelInfo) {
r.logger.Info(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...)

switch endpointPutResult {
case route.EndpointAdded:
if r.logger.Enabled(context.Background(), slog.LevelInfo) {
r.logger.Info(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...)
}
default:
if r.logger.Enabled(context.Background(), slog.LevelDebug) {
r.logger.Debug(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...)
}
}

r.SetTimeOfLastUpdate(t)
}

func (r *RouteRegistry) register(uri route.Uri, endpoint *route.Endpoint) route.PoolPutResult {
func (r *RouteRegistry) registerRoute(uri route.Uri) (route.PoolRegisterRouteResult, *route.EndpointPool) {
r.RLock()
defer r.RUnlock()

t := time.Now()
poolRegisterRouteResult := route.RouteAlreadyExists
routekey := uri.RouteKey()
pool := r.byURI.Find(routekey)

Expand All @@ -112,18 +129,24 @@ func (r *RouteRegistry) register(uri route.Uri, endpoint *route.Endpoint) route.
r.RUnlock()
pool = r.insertRouteKey(routekey, uri)
r.RLock()
poolRegisterRouteResult = route.RouteRegistered
}
return poolRegisterRouteResult, pool
}

func (r *RouteRegistry) registerEndpoint(endpoint *route.Endpoint, pool *route.EndpointPool) route.PoolRegisterEndpointResult {
r.RLock()
defer r.RUnlock()

if endpoint.StaleThreshold > r.dropletStaleThreshold || endpoint.StaleThreshold == 0 {
endpoint.StaleThreshold = r.dropletStaleThreshold
}

endpointAdded := pool.Put(endpoint)
endpointAddedResult := pool.Put(endpoint)
// Overwrites the load balancing algorithm of a pool by that of a specified endpoint, if that is valid.
pool.SetPoolLoadBalancingAlgorithm(endpoint)
r.SetTimeOfLastUpdate(t)

return endpointAdded
return endpointAddedResult
}

// insertRouteKey acquires a write lock, inserts the route key into the registry and releases the write lock.
Expand All @@ -144,7 +167,7 @@ func (r *RouteRegistry) insertRouteKey(routekey route.Uri, uri route.Uri) *route
LoadBalancingAlgorithm: r.DefaultLoadBalancingAlgorithm,
})
r.byURI.Insert(routekey, pool)
r.logger.Info("route-registered", slog.Any("uri", routekey))
r.logger.Info(string(route.RouteRegistered), slog.Any("uri", routekey))
// for backward compatibility:
r.logger.Debug("uri-added", slog.Any("uri", routekey))
}
Expand All @@ -162,15 +185,15 @@ func (r *RouteRegistry) Unregister(uri route.Uri, endpoint *route.Endpoint) {
return
}

r.reporter.CaptureUnregistryMessage(endpoint, string(endpointUnregisteredResult))
r.reporter.CaptureRegistryMessage(endpoint, string(endpointUnregisteredResult))
if r.logger.Enabled(context.Background(), slog.LevelInfo) {
r.logger.Info(string(endpointUnregisteredResult), buildSlogAttrs(routeKey, endpoint)...)
}

routeUnregisteredResult := r.deleteRouteWithoutEndpoint(routeKey, pool)
switch routeUnregisteredResult {
case route.RouteUnregistered:
r.reporter.CaptureUnregistryMessage(endpoint, string(routeUnregisteredResult))
r.reporter.CaptureRegistryMessage(endpoint, string(routeUnregisteredResult))
if r.logger.Enabled(context.Background(), slog.LevelInfo) {
r.logger.Info(string(routeUnregisteredResult), slog.Any("uri", routeKey))
}
Expand Down
61 changes: 37 additions & 24 deletions registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@ var _ = Describe("RouteRegistry", func() {
Context("when a new endpoint is registered", func() {
It("emits endpoint-registered message_count metrics", func() {
r.Register("foo", fooEndpoint)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(1))
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2))
endpoint1, action1 := reporter.CaptureRegistryMessageArgsForCall(0)
Expect(endpoint1).To(Equal(fooEndpoint))
Expect(action1).To(Equal("endpoint-added"))
Expect(action1).To(Equal(string(route.RouteRegistered)))
endpoint2, action2 := reporter.CaptureRegistryMessageArgsForCall(1)
Expect(endpoint2).To(Equal(fooEndpoint))
Expect(action2).To(Equal(string(route.EndpointAdded)))
})
})

Expand All @@ -88,13 +91,16 @@ var _ = Describe("RouteRegistry", func() {
endpoint2 := route.NewEndpoint(&route.EndpointOpts{ModificationTag: modTag2})
r.Register("foo", endpoint1)
r.Register("foo", endpoint2)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2))
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3))
endpointR1, action1 := reporter.CaptureRegistryMessageArgsForCall(0)
Expect(endpointR1).To(Equal(endpoint1))
Expect(action1).To(Equal("endpoint-added"))
Expect(action1).To(Equal(string(route.RouteRegistered)))
endpointR2, action2 := reporter.CaptureRegistryMessageArgsForCall(1)
Expect(endpointR2).To(Equal(endpoint2))
Expect(action2).To(Equal("endpoint-updated"))
Expect(endpointR2).To(Equal(endpoint1))
Expect(action2).To(Equal(string(route.EndpointAdded)))
endpointR3, action3 := reporter.CaptureRegistryMessageArgsForCall(2)
Expect(endpointR3).To(Equal(endpoint2))
Expect(action3).To(Equal(string(route.EndpointUpdated)))
})
})

Expand All @@ -106,13 +112,16 @@ var _ = Describe("RouteRegistry", func() {
endpoint2 := route.NewEndpoint(&route.EndpointOpts{ModificationTag: modTag2})
r.Register("foo", endpoint1)
r.Register("foo", endpoint2)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2))
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3))
endpointR1, action1 := reporter.CaptureRegistryMessageArgsForCall(0)
Expect(endpointR1).To(Equal(endpoint1))
Expect(action1).To(Equal("endpoint-added"))
Expect(action1).To(Equal(string(route.RouteRegistered)))
endpointR2, action2 := reporter.CaptureRegistryMessageArgsForCall(1)
Expect(endpointR2).To(Equal(endpoint2))
Expect(action2).To(Equal("endpoint-not-updated"))
Expect(endpointR2).To(Equal(endpoint1))
Expect(action2).To(Equal(string(route.EndpointAdded)))
endpointR3, action3 := reporter.CaptureRegistryMessageArgsForCall(2)
Expect(endpointR3).To(Equal(endpoint2))
Expect(action3).To(Equal(string(route.EndpointNotUpdated)))
})
})

Expand Down Expand Up @@ -562,33 +571,36 @@ var _ = Describe("RouteRegistry", func() {
BeforeEach(func() {
fooEndpoint.Tags = map[string]string{"component": "oauth-server"}
r.Register("foo", fooEndpoint)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2))
})

It("emits counter metrics for unregister endpoint and route", func() {
r.Unregister("foo", fooEndpoint)
Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(2))
endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(4))
endpoint1, action1 := reporter.CaptureRegistryMessageArgsForCall(2)
Expect(endpoint1).To(Equal(fooEndpoint))
Expect(action1).To(Equal("endpoint-unregistered"))
endpoint2, action2 := reporter.CaptureUnregistryMessageArgsForCall(1)
Expect(action1).To(Equal(string(route.EndpointUnregistered)))
endpoint2, action2 := reporter.CaptureRegistryMessageArgsForCall(3)
Expect(endpoint2).To(Equal(fooEndpoint))
Expect(action2).To(Equal("route-unregistered"))
Expect(action2).To(Equal(string(route.RouteUnregistered)))
})
})

Context("when endpoint does not have component tag", func() {
BeforeEach(func() {
fooEndpoint.Tags = map[string]string{}
r.Register("foo", fooEndpoint)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2))
})
It("emits counter metrics for unregister endpoint and route", func() {
r.Unregister("foo", fooEndpoint)
Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(2))
endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(4))
endpoint1, action1 := reporter.CaptureRegistryMessageArgsForCall(2)
Expect(endpoint1).To(Equal(fooEndpoint))
Expect(action1).To(Equal("endpoint-unregistered"))
endpoint2, action2 := reporter.CaptureUnregistryMessageArgsForCall(1)
Expect(action1).To(Equal(string(route.EndpointUnregistered)))
endpoint2, action2 := reporter.CaptureRegistryMessageArgsForCall(3)
Expect(endpoint2).To(Equal(fooEndpoint))
Expect(action2).To(Equal("route-unregistered"))
Expect(action2).To(Equal(string(route.RouteUnregistered)))
})
})
})
Expand All @@ -604,13 +616,14 @@ var _ = Describe("RouteRegistry", func() {

r.Register("foo", fooEndpoint)
r.Register("foo", fooEndpoint2)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3))
})
It("emits counter metrics for unregister endpoint only", func() {
r.Unregister("foo", fooEndpoint)
Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(1))
endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(4))
endpoint1, action1 := reporter.CaptureRegistryMessageArgsForCall(3)
Expect(endpoint1).To(Equal(fooEndpoint))
Expect(action1).To(Equal("endpoint-unregistered"))
Expect(action1).To(Equal(string(route.EndpointUnregistered)))
})
})
Context("when route is not registered", func() {
Expand All @@ -619,7 +632,7 @@ var _ = Describe("RouteRegistry", func() {
})
It("does not emit counter metrics for unregister", func() {
r.Unregister("foo", fooEndpoint)
Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(0))
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(0))
})
})

Expand Down
19 changes: 13 additions & 6 deletions route/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@ type Counter struct {
value int64
}

type PoolPutResult string
type PoolRegisterEndpointResult string

const (
EndpointNotUpdated PoolPutResult = "endpoint-not-updated"
EndpointUpdated PoolPutResult = "endpoint-updated"
EndpointAdded PoolPutResult = "endpoint-added"
EndpointNotUpdated PoolRegisterEndpointResult = "endpoint-not-updated"
EndpointUpdated PoolRegisterEndpointResult = "endpoint-updated"
EndpointAdded PoolRegisterEndpointResult = "endpoint-added"
)

type PoolRegisterRouteResult string

const (
RouteRegistered PoolRegisterRouteResult = "route-registered"
RouteAlreadyExists PoolRegisterRouteResult = "route-already-exists"
)

type PoolRemoveEndpointResult string
Expand Down Expand Up @@ -268,11 +275,11 @@ func (p *EndpointPool) Update() {
p.updatedAt = time.Now()
}

func (p *EndpointPool) Put(endpoint *Endpoint) PoolPutResult {
func (p *EndpointPool) Put(endpoint *Endpoint) PoolRegisterEndpointResult {
p.Lock()
defer p.Unlock()

var result PoolPutResult
var result PoolRegisterEndpointResult
e, found := p.index[endpoint.CanonicalAddr()]
if found {
result = EndpointUpdated
Expand Down

0 comments on commit fae675a

Please sign in to comment.