From fae675ae9444704b560bde24a490f7dde6f46a81 Mon Sep 17 00:00:00 2001 From: Clemens Hoffmann Date: Fri, 13 Dec 2024 16:47:58 +0100 Subject: [PATCH] Refactor Register, fix log levels --- metrics/compositereporter.go | 1 - metrics/metricsreporter.go | 10 ------ metrics/metricsreporter_test.go | 14 ++++---- registry/registry.go | 47 ++++++++++++++++++------- registry/registry_test.go | 61 ++++++++++++++++++++------------- route/pool.go | 19 ++++++---- 6 files changed, 92 insertions(+), 60 deletions(-) diff --git a/metrics/compositereporter.go b/metrics/compositereporter.go index 56255f3b..c2c46543 100644 --- a/metrics/compositereporter.go +++ b/metrics/compositereporter.go @@ -47,7 +47,6 @@ type RouteRegistryReporter interface { CaptureRegistryMessage(msg ComponentTagged, action string) CaptureRouteRegistrationLatency(t time.Duration) UnmuzzleRouteRegistrationLatency() - CaptureUnregistryMessage(msg ComponentTagged, action string) } type CompositeReporter struct { diff --git a/metrics/metricsreporter.go b/metrics/metricsreporter.go index 5dfb22fd..219d534c 100644 --- a/metrics/metricsreporter.go +++ b/metrics/metricsreporter.go @@ -145,16 +145,6 @@ func (m *MetricsReporter) CaptureRegistryMessage(msg ComponentTagged, action str m.Batcher.BatchIncrementCounter(componentName) } -func (m *MetricsReporter) CaptureUnregistryMessage(msg ComponentTagged, action string) { - var componentName string - if msg.Component() == "" { - componentName = "unregistry_message." + action - } else { - componentName = "unregistry_message." + action + "." + msg.Component() - } - m.Batcher.BatchIncrementCounter(componentName) -} - func (m *MetricsReporter) CaptureWebSocketUpdate() { m.Batcher.BatchIncrementCounter("websocket_upgrades") } diff --git a/metrics/metricsreporter_test.go b/metrics/metricsreporter_test.go index aa584ce0..257bf440 100644 --- a/metrics/metricsreporter_test.go +++ b/metrics/metricsreporter_test.go @@ -517,33 +517,33 @@ var _ = Describe("MetricsReporter", func() { BeforeEach(func() { endpoint = new(route.Endpoint) endpoint.Tags = map[string]string{"component": "oauth-server"} - metricReporter.CaptureUnregistryMessage(endpoint, "some-action") + metricReporter.CaptureRegistryMessage(endpoint, "some-action") }) It("increments the counter metric", func() { Expect(batcher.BatchIncrementCounterCallCount()).To(Equal(1)) - Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("unregistry_message.some-action.oauth-server")) + Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("registry_message.some-action.oauth-server")) }) It("increments the counter metric for each component unregistered", func() { endpointTwo := new(route.Endpoint) endpointTwo.Tags = map[string]string{"component": "api-server"} - metricReporter.CaptureUnregistryMessage(endpointTwo, "some-action") + metricReporter.CaptureRegistryMessage(endpointTwo, "some-action") Expect(batcher.BatchIncrementCounterCallCount()).To(Equal(2)) - Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("unregistry_message.some-action.oauth-server")) - Expect(batcher.BatchIncrementCounterArgsForCall(1)).To(Equal("unregistry_message.some-action.api-server")) + Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("registry_message.some-action.oauth-server")) + Expect(batcher.BatchIncrementCounterArgsForCall(1)).To(Equal("registry_message.some-action.api-server")) }) }) Context("when unregister msg with empty component name is incremented", func() { BeforeEach(func() { endpoint = new(route.Endpoint) endpoint.Tags = map[string]string{} - metricReporter.CaptureUnregistryMessage(endpoint, "some-action") + metricReporter.CaptureRegistryMessage(endpoint, "some-action") }) It("increments the counter metric", func() { Expect(batcher.BatchIncrementCounterCallCount()).To(Equal(1)) - Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("unregistry_message.some-action")) + Expect(batcher.BatchIncrementCounterArgsForCall(0)).To(Equal("registry_message.some-action")) }) }) }) diff --git a/registry/registry.go b/registry/registry.go index 938bb4da..7c3b3f72 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -86,24 +86,41 @@ func (r *RouteRegistry) Register(uri route.Uri, endpoint *route.Endpoint) { if !r.endpointInRouterShard(endpoint) { return } + t := time.Now() + registerRouteResult, pool := r.registerRoute(uri) + if registerRouteResult == route.RouteRegistered { + r.reporter.CaptureRegistryMessage(endpoint, string(route.RouteRegistered)) + if r.logger.Enabled(context.Background(), slog.LevelInfo) { + r.logger.Info(string(route.RouteRegistered), buildSlogAttrs(uri, endpoint)...) + } + } - endpointPutResult := r.register(uri, endpoint) + endpointPutResult := r.registerEndpoint(endpoint, pool) if endpointPutResult == route.EndpointAdded && !endpoint.UpdatedAt.IsZero() { r.reporter.CaptureRouteRegistrationLatency(time.Since(endpoint.UpdatedAt)) } r.reporter.CaptureRegistryMessage(endpoint, string(endpointPutResult)) - if r.logger.Enabled(context.Background(), slog.LevelInfo) { - r.logger.Info(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...) + + switch endpointPutResult { + case route.EndpointAdded: + if r.logger.Enabled(context.Background(), slog.LevelInfo) { + r.logger.Info(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...) + } + default: + if r.logger.Enabled(context.Background(), slog.LevelDebug) { + r.logger.Debug(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...) + } } + + r.SetTimeOfLastUpdate(t) } -func (r *RouteRegistry) register(uri route.Uri, endpoint *route.Endpoint) route.PoolPutResult { +func (r *RouteRegistry) registerRoute(uri route.Uri) (route.PoolRegisterRouteResult, *route.EndpointPool) { r.RLock() defer r.RUnlock() - - t := time.Now() + poolRegisterRouteResult := route.RouteAlreadyExists routekey := uri.RouteKey() pool := r.byURI.Find(routekey) @@ -112,18 +129,24 @@ func (r *RouteRegistry) register(uri route.Uri, endpoint *route.Endpoint) route. r.RUnlock() pool = r.insertRouteKey(routekey, uri) r.RLock() + poolRegisterRouteResult = route.RouteRegistered } + return poolRegisterRouteResult, pool +} + +func (r *RouteRegistry) registerEndpoint(endpoint *route.Endpoint, pool *route.EndpointPool) route.PoolRegisterEndpointResult { + r.RLock() + defer r.RUnlock() if endpoint.StaleThreshold > r.dropletStaleThreshold || endpoint.StaleThreshold == 0 { endpoint.StaleThreshold = r.dropletStaleThreshold } - endpointAdded := pool.Put(endpoint) + endpointAddedResult := pool.Put(endpoint) // Overwrites the load balancing algorithm of a pool by that of a specified endpoint, if that is valid. pool.SetPoolLoadBalancingAlgorithm(endpoint) - r.SetTimeOfLastUpdate(t) - return endpointAdded + return endpointAddedResult } // insertRouteKey acquires a write lock, inserts the route key into the registry and releases the write lock. @@ -144,7 +167,7 @@ func (r *RouteRegistry) insertRouteKey(routekey route.Uri, uri route.Uri) *route LoadBalancingAlgorithm: r.DefaultLoadBalancingAlgorithm, }) r.byURI.Insert(routekey, pool) - r.logger.Info("route-registered", slog.Any("uri", routekey)) + r.logger.Info(string(route.RouteRegistered), slog.Any("uri", routekey)) // for backward compatibility: r.logger.Debug("uri-added", slog.Any("uri", routekey)) } @@ -162,7 +185,7 @@ func (r *RouteRegistry) Unregister(uri route.Uri, endpoint *route.Endpoint) { return } - r.reporter.CaptureUnregistryMessage(endpoint, string(endpointUnregisteredResult)) + r.reporter.CaptureRegistryMessage(endpoint, string(endpointUnregisteredResult)) if r.logger.Enabled(context.Background(), slog.LevelInfo) { r.logger.Info(string(endpointUnregisteredResult), buildSlogAttrs(routeKey, endpoint)...) } @@ -170,7 +193,7 @@ func (r *RouteRegistry) Unregister(uri route.Uri, endpoint *route.Endpoint) { routeUnregisteredResult := r.deleteRouteWithoutEndpoint(routeKey, pool) switch routeUnregisteredResult { case route.RouteUnregistered: - r.reporter.CaptureUnregistryMessage(endpoint, string(routeUnregisteredResult)) + r.reporter.CaptureRegistryMessage(endpoint, string(routeUnregisteredResult)) if r.logger.Enabled(context.Background(), slog.LevelInfo) { r.logger.Info(string(routeUnregisteredResult), slog.Any("uri", routeKey)) } diff --git a/registry/registry_test.go b/registry/registry_test.go index c3163db3..434fd3ad 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -73,10 +73,13 @@ var _ = Describe("RouteRegistry", func() { Context("when a new endpoint is registered", func() { It("emits endpoint-registered message_count metrics", func() { r.Register("foo", fooEndpoint) - Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(1)) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2)) endpoint1, action1 := reporter.CaptureRegistryMessageArgsForCall(0) Expect(endpoint1).To(Equal(fooEndpoint)) - Expect(action1).To(Equal("endpoint-added")) + Expect(action1).To(Equal(string(route.RouteRegistered))) + endpoint2, action2 := reporter.CaptureRegistryMessageArgsForCall(1) + Expect(endpoint2).To(Equal(fooEndpoint)) + Expect(action2).To(Equal(string(route.EndpointAdded))) }) }) @@ -88,13 +91,16 @@ var _ = Describe("RouteRegistry", func() { endpoint2 := route.NewEndpoint(&route.EndpointOpts{ModificationTag: modTag2}) r.Register("foo", endpoint1) r.Register("foo", endpoint2) - Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2)) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3)) endpointR1, action1 := reporter.CaptureRegistryMessageArgsForCall(0) Expect(endpointR1).To(Equal(endpoint1)) - Expect(action1).To(Equal("endpoint-added")) + Expect(action1).To(Equal(string(route.RouteRegistered))) endpointR2, action2 := reporter.CaptureRegistryMessageArgsForCall(1) - Expect(endpointR2).To(Equal(endpoint2)) - Expect(action2).To(Equal("endpoint-updated")) + Expect(endpointR2).To(Equal(endpoint1)) + Expect(action2).To(Equal(string(route.EndpointAdded))) + endpointR3, action3 := reporter.CaptureRegistryMessageArgsForCall(2) + Expect(endpointR3).To(Equal(endpoint2)) + Expect(action3).To(Equal(string(route.EndpointUpdated))) }) }) @@ -106,13 +112,16 @@ var _ = Describe("RouteRegistry", func() { endpoint2 := route.NewEndpoint(&route.EndpointOpts{ModificationTag: modTag2}) r.Register("foo", endpoint1) r.Register("foo", endpoint2) - Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2)) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3)) endpointR1, action1 := reporter.CaptureRegistryMessageArgsForCall(0) Expect(endpointR1).To(Equal(endpoint1)) - Expect(action1).To(Equal("endpoint-added")) + Expect(action1).To(Equal(string(route.RouteRegistered))) endpointR2, action2 := reporter.CaptureRegistryMessageArgsForCall(1) - Expect(endpointR2).To(Equal(endpoint2)) - Expect(action2).To(Equal("endpoint-not-updated")) + Expect(endpointR2).To(Equal(endpoint1)) + Expect(action2).To(Equal(string(route.EndpointAdded))) + endpointR3, action3 := reporter.CaptureRegistryMessageArgsForCall(2) + Expect(endpointR3).To(Equal(endpoint2)) + Expect(action3).To(Equal(string(route.EndpointNotUpdated))) }) }) @@ -562,16 +571,18 @@ var _ = Describe("RouteRegistry", func() { BeforeEach(func() { fooEndpoint.Tags = map[string]string{"component": "oauth-server"} r.Register("foo", fooEndpoint) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2)) }) + It("emits counter metrics for unregister endpoint and route", func() { r.Unregister("foo", fooEndpoint) - Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(2)) - endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(4)) + endpoint1, action1 := reporter.CaptureRegistryMessageArgsForCall(2) Expect(endpoint1).To(Equal(fooEndpoint)) - Expect(action1).To(Equal("endpoint-unregistered")) - endpoint2, action2 := reporter.CaptureUnregistryMessageArgsForCall(1) + Expect(action1).To(Equal(string(route.EndpointUnregistered))) + endpoint2, action2 := reporter.CaptureRegistryMessageArgsForCall(3) Expect(endpoint2).To(Equal(fooEndpoint)) - Expect(action2).To(Equal("route-unregistered")) + Expect(action2).To(Equal(string(route.RouteUnregistered))) }) }) @@ -579,16 +590,17 @@ var _ = Describe("RouteRegistry", func() { BeforeEach(func() { fooEndpoint.Tags = map[string]string{} r.Register("foo", fooEndpoint) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2)) }) It("emits counter metrics for unregister endpoint and route", func() { r.Unregister("foo", fooEndpoint) - Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(2)) - endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(4)) + endpoint1, action1 := reporter.CaptureRegistryMessageArgsForCall(2) Expect(endpoint1).To(Equal(fooEndpoint)) - Expect(action1).To(Equal("endpoint-unregistered")) - endpoint2, action2 := reporter.CaptureUnregistryMessageArgsForCall(1) + Expect(action1).To(Equal(string(route.EndpointUnregistered))) + endpoint2, action2 := reporter.CaptureRegistryMessageArgsForCall(3) Expect(endpoint2).To(Equal(fooEndpoint)) - Expect(action2).To(Equal("route-unregistered")) + Expect(action2).To(Equal(string(route.RouteUnregistered))) }) }) }) @@ -604,13 +616,14 @@ var _ = Describe("RouteRegistry", func() { r.Register("foo", fooEndpoint) r.Register("foo", fooEndpoint2) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3)) }) It("emits counter metrics for unregister endpoint only", func() { r.Unregister("foo", fooEndpoint) - Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(1)) - endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(4)) + endpoint1, action1 := reporter.CaptureRegistryMessageArgsForCall(3) Expect(endpoint1).To(Equal(fooEndpoint)) - Expect(action1).To(Equal("endpoint-unregistered")) + Expect(action1).To(Equal(string(route.EndpointUnregistered))) }) }) Context("when route is not registered", func() { @@ -619,7 +632,7 @@ var _ = Describe("RouteRegistry", func() { }) It("does not emit counter metrics for unregister", func() { r.Unregister("foo", fooEndpoint) - Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(0)) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(0)) }) }) diff --git a/route/pool.go b/route/pool.go index b3ec7893..8b50f66e 100644 --- a/route/pool.go +++ b/route/pool.go @@ -20,12 +20,19 @@ type Counter struct { value int64 } -type PoolPutResult string +type PoolRegisterEndpointResult string const ( - EndpointNotUpdated PoolPutResult = "endpoint-not-updated" - EndpointUpdated PoolPutResult = "endpoint-updated" - EndpointAdded PoolPutResult = "endpoint-added" + EndpointNotUpdated PoolRegisterEndpointResult = "endpoint-not-updated" + EndpointUpdated PoolRegisterEndpointResult = "endpoint-updated" + EndpointAdded PoolRegisterEndpointResult = "endpoint-added" +) + +type PoolRegisterRouteResult string + +const ( + RouteRegistered PoolRegisterRouteResult = "route-registered" + RouteAlreadyExists PoolRegisterRouteResult = "route-already-exists" ) type PoolRemoveEndpointResult string @@ -268,11 +275,11 @@ func (p *EndpointPool) Update() { p.updatedAt = time.Now() } -func (p *EndpointPool) Put(endpoint *Endpoint) PoolPutResult { +func (p *EndpointPool) Put(endpoint *Endpoint) PoolRegisterEndpointResult { p.Lock() defer p.Unlock() - var result PoolPutResult + var result PoolRegisterEndpointResult e, found := p.index[endpoint.CanonicalAddr()] if found { result = EndpointUpdated