From ef4bebb26eec6a41036600d6a9d13cef1f9d8597 Mon Sep 17 00:00:00 2001 From: Tamara Boehm Date: Wed, 18 Dec 2024 10:43:34 +0100 Subject: [PATCH] fix: Refactor Register, fix log levels --- metrics/metricsreporter.go | 10 +++--- registry/registry.go | 63 +++++++++++++++++++++++--------------- registry/registry_test.go | 43 +++++++++++++++++--------- route/pool.go | 19 ++++++++---- 4 files changed, 84 insertions(+), 51 deletions(-) diff --git a/metrics/metricsreporter.go b/metrics/metricsreporter.go index 5dfb22fd..6ead6963 100644 --- a/metrics/metricsreporter.go +++ b/metrics/metricsreporter.go @@ -146,13 +146,11 @@ func (m *MetricsReporter) CaptureRegistryMessage(msg ComponentTagged, action str } func (m *MetricsReporter) CaptureUnregistryMessage(msg ComponentTagged, action string) { - var componentName string - if msg.Component() == "" { - componentName = "unregistry_message." + action - } else { - componentName = "unregistry_message." + action + "." + msg.Component() + unregisterMsg := "unregistry_message." + action + if msg.Component() != "" { + unregisterMsg = unregisterMsg + "." + msg.Component() } - m.Batcher.BatchIncrementCounter(componentName) + m.Batcher.BatchIncrementCounter(unregisterMsg) } func (m *MetricsReporter) CaptureWebSocketUpdate() { diff --git a/registry/registry.go b/registry/registry.go index 938bb4da..3701d794 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -87,23 +87,42 @@ func (r *RouteRegistry) Register(uri route.Uri, endpoint *route.Endpoint) { return } - endpointPutResult := r.register(uri, endpoint) + r.RLock() + defer r.RUnlock() + + t := time.Now() + registerRouteResult, pool := r.registerRoute(uri) + if registerRouteResult == route.RouteRegistered { + r.reporter.CaptureRegistryMessage(endpoint, string(route.RouteRegistered)) + if r.logger.Enabled(context.Background(), slog.LevelInfo) { + r.logger.Info(string(route.RouteRegistered), buildSlogAttrs(uri, endpoint)...) + } + } + + endpointPutResult := r.registerEndpoint(endpoint, pool) if endpointPutResult == route.EndpointAdded && !endpoint.UpdatedAt.IsZero() { r.reporter.CaptureRouteRegistrationLatency(time.Since(endpoint.UpdatedAt)) } r.reporter.CaptureRegistryMessage(endpoint, string(endpointPutResult)) - if r.logger.Enabled(context.Background(), slog.LevelInfo) { - r.logger.Info(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...) + + switch endpointPutResult { + case route.EndpointAdded: + if r.logger.Enabled(context.Background(), slog.LevelInfo) { + r.logger.Info(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...) + } + default: + if r.logger.Enabled(context.Background(), slog.LevelDebug) { + r.logger.Debug(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...) + } } -} -func (r *RouteRegistry) register(uri route.Uri, endpoint *route.Endpoint) route.PoolPutResult { - r.RLock() - defer r.RUnlock() + r.SetTimeOfLastUpdate(t) +} - t := time.Now() +func (r *RouteRegistry) registerRoute(uri route.Uri) (route.PoolRegisterRouteResult, *route.EndpointPool) { + poolRegisterRouteResult := route.RouteAlreadyExists routekey := uri.RouteKey() pool := r.byURI.Find(routekey) @@ -112,18 +131,21 @@ func (r *RouteRegistry) register(uri route.Uri, endpoint *route.Endpoint) route. r.RUnlock() pool = r.insertRouteKey(routekey, uri) r.RLock() + poolRegisterRouteResult = route.RouteRegistered } + return poolRegisterRouteResult, pool +} +func (r *RouteRegistry) registerEndpoint(endpoint *route.Endpoint, pool *route.EndpointPool) route.PoolRegisterEndpointResult { if endpoint.StaleThreshold > r.dropletStaleThreshold || endpoint.StaleThreshold == 0 { endpoint.StaleThreshold = r.dropletStaleThreshold } - endpointAdded := pool.Put(endpoint) + endpointAddedResult := pool.Put(endpoint) // Overwrites the load balancing algorithm of a pool by that of a specified endpoint, if that is valid. pool.SetPoolLoadBalancingAlgorithm(endpoint) - r.SetTimeOfLastUpdate(t) - return endpointAdded + return endpointAddedResult } // insertRouteKey acquires a write lock, inserts the route key into the registry and releases the write lock. @@ -144,7 +166,7 @@ func (r *RouteRegistry) insertRouteKey(routekey route.Uri, uri route.Uri) *route LoadBalancingAlgorithm: r.DefaultLoadBalancingAlgorithm, }) r.byURI.Insert(routekey, pool) - r.logger.Info("route-registered", slog.Any("uri", routekey)) + r.logger.Info(string(route.RouteRegistered), slog.Any("uri", routekey)) // for backward compatibility: r.logger.Debug("uri-added", slog.Any("uri", routekey)) } @@ -156,6 +178,9 @@ func (r *RouteRegistry) Unregister(uri route.Uri, endpoint *route.Endpoint) { return } + r.Lock() + defer r.Unlock() + routeKey := uri.RouteKey() endpointUnregisteredResult, pool := r.unregisterEndpoint(routeKey, endpoint) if pool == nil { @@ -178,9 +203,6 @@ func (r *RouteRegistry) Unregister(uri route.Uri, endpoint *route.Endpoint) { } func (r *RouteRegistry) unregisterEndpoint(routeKey route.Uri, endpoint *route.Endpoint) (route.PoolRemoveEndpointResult, *route.EndpointPool) { - r.Lock() - defer r.Unlock() - pool := r.byURI.Find(routeKey) if pool == nil { return route.EndpointNotUnregistered, nil @@ -189,16 +211,9 @@ func (r *RouteRegistry) unregisterEndpoint(routeKey route.Uri, endpoint *route.E } func (r *RouteRegistry) deleteRouteWithoutEndpoint(routeKey route.Uri, pool *route.EndpointPool) route.PoolRemoveRouteResult { - r.Lock() - defer r.Unlock() - if pool.IsEmpty() { - if r.EmptyPoolResponseCode503 && r.EmptyPoolTimeout > 0 { - if time.Since(pool.LastUpdated()) > r.EmptyPoolTimeout { - r.byURI.Delete(routeKey) - return route.RouteUnregistered - } - } else { + if !(r.EmptyPoolResponseCode503 && r.EmptyPoolTimeout > 0) || + (r.EmptyPoolResponseCode503 && r.EmptyPoolTimeout > 0 && time.Since(pool.LastUpdated()) > r.EmptyPoolTimeout) { r.byURI.Delete(routeKey) return route.RouteUnregistered } diff --git a/registry/registry_test.go b/registry/registry_test.go index c3163db3..f6939943 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -73,10 +73,13 @@ var _ = Describe("RouteRegistry", func() { Context("when a new endpoint is registered", func() { It("emits endpoint-registered message_count metrics", func() { r.Register("foo", fooEndpoint) - Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(1)) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2)) endpoint1, action1 := reporter.CaptureRegistryMessageArgsForCall(0) Expect(endpoint1).To(Equal(fooEndpoint)) - Expect(action1).To(Equal("endpoint-added")) + Expect(action1).To(Equal(string(route.RouteRegistered))) + endpoint2, action2 := reporter.CaptureRegistryMessageArgsForCall(1) + Expect(endpoint2).To(Equal(fooEndpoint)) + Expect(action2).To(Equal(string(route.EndpointAdded))) }) }) @@ -88,13 +91,16 @@ var _ = Describe("RouteRegistry", func() { endpoint2 := route.NewEndpoint(&route.EndpointOpts{ModificationTag: modTag2}) r.Register("foo", endpoint1) r.Register("foo", endpoint2) - Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2)) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3)) endpointR1, action1 := reporter.CaptureRegistryMessageArgsForCall(0) Expect(endpointR1).To(Equal(endpoint1)) - Expect(action1).To(Equal("endpoint-added")) + Expect(action1).To(Equal(string(route.RouteRegistered))) endpointR2, action2 := reporter.CaptureRegistryMessageArgsForCall(1) - Expect(endpointR2).To(Equal(endpoint2)) - Expect(action2).To(Equal("endpoint-updated")) + Expect(endpointR2).To(Equal(endpoint1)) + Expect(action2).To(Equal(string(route.EndpointAdded))) + endpointR3, action3 := reporter.CaptureRegistryMessageArgsForCall(2) + Expect(endpointR3).To(Equal(endpoint2)) + Expect(action3).To(Equal(string(route.EndpointUpdated))) }) }) @@ -106,13 +112,16 @@ var _ = Describe("RouteRegistry", func() { endpoint2 := route.NewEndpoint(&route.EndpointOpts{ModificationTag: modTag2}) r.Register("foo", endpoint1) r.Register("foo", endpoint2) - Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2)) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3)) endpointR1, action1 := reporter.CaptureRegistryMessageArgsForCall(0) Expect(endpointR1).To(Equal(endpoint1)) - Expect(action1).To(Equal("endpoint-added")) + Expect(action1).To(Equal(string(route.RouteRegistered))) endpointR2, action2 := reporter.CaptureRegistryMessageArgsForCall(1) - Expect(endpointR2).To(Equal(endpoint2)) - Expect(action2).To(Equal("endpoint-not-updated")) + Expect(endpointR2).To(Equal(endpoint1)) + Expect(action2).To(Equal(string(route.EndpointAdded))) + endpointR3, action3 := reporter.CaptureRegistryMessageArgsForCall(2) + Expect(endpointR3).To(Equal(endpoint2)) + Expect(action3).To(Equal(string(route.EndpointNotUpdated))) }) }) @@ -562,16 +571,18 @@ var _ = Describe("RouteRegistry", func() { BeforeEach(func() { fooEndpoint.Tags = map[string]string{"component": "oauth-server"} r.Register("foo", fooEndpoint) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2)) }) + It("emits counter metrics for unregister endpoint and route", func() { r.Unregister("foo", fooEndpoint) Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(2)) endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0) Expect(endpoint1).To(Equal(fooEndpoint)) - Expect(action1).To(Equal("endpoint-unregistered")) + Expect(action1).To(Equal(string(route.EndpointUnregistered))) endpoint2, action2 := reporter.CaptureUnregistryMessageArgsForCall(1) Expect(endpoint2).To(Equal(fooEndpoint)) - Expect(action2).To(Equal("route-unregistered")) + Expect(action2).To(Equal(string(route.RouteUnregistered))) }) }) @@ -579,16 +590,17 @@ var _ = Describe("RouteRegistry", func() { BeforeEach(func() { fooEndpoint.Tags = map[string]string{} r.Register("foo", fooEndpoint) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2)) }) It("emits counter metrics for unregister endpoint and route", func() { r.Unregister("foo", fooEndpoint) Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(2)) endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0) Expect(endpoint1).To(Equal(fooEndpoint)) - Expect(action1).To(Equal("endpoint-unregistered")) + Expect(action1).To(Equal(string(route.EndpointUnregistered))) endpoint2, action2 := reporter.CaptureUnregistryMessageArgsForCall(1) Expect(endpoint2).To(Equal(fooEndpoint)) - Expect(action2).To(Equal("route-unregistered")) + Expect(action2).To(Equal(string(route.RouteUnregistered))) }) }) }) @@ -604,13 +616,14 @@ var _ = Describe("RouteRegistry", func() { r.Register("foo", fooEndpoint) r.Register("foo", fooEndpoint2) + Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3)) }) It("emits counter metrics for unregister endpoint only", func() { r.Unregister("foo", fooEndpoint) Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(1)) endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0) Expect(endpoint1).To(Equal(fooEndpoint)) - Expect(action1).To(Equal("endpoint-unregistered")) + Expect(action1).To(Equal(string(route.EndpointUnregistered))) }) }) Context("when route is not registered", func() { diff --git a/route/pool.go b/route/pool.go index b3ec7893..8b50f66e 100644 --- a/route/pool.go +++ b/route/pool.go @@ -20,12 +20,19 @@ type Counter struct { value int64 } -type PoolPutResult string +type PoolRegisterEndpointResult string const ( - EndpointNotUpdated PoolPutResult = "endpoint-not-updated" - EndpointUpdated PoolPutResult = "endpoint-updated" - EndpointAdded PoolPutResult = "endpoint-added" + EndpointNotUpdated PoolRegisterEndpointResult = "endpoint-not-updated" + EndpointUpdated PoolRegisterEndpointResult = "endpoint-updated" + EndpointAdded PoolRegisterEndpointResult = "endpoint-added" +) + +type PoolRegisterRouteResult string + +const ( + RouteRegistered PoolRegisterRouteResult = "route-registered" + RouteAlreadyExists PoolRegisterRouteResult = "route-already-exists" ) type PoolRemoveEndpointResult string @@ -268,11 +275,11 @@ func (p *EndpointPool) Update() { p.updatedAt = time.Now() } -func (p *EndpointPool) Put(endpoint *Endpoint) PoolPutResult { +func (p *EndpointPool) Put(endpoint *Endpoint) PoolRegisterEndpointResult { p.Lock() defer p.Unlock() - var result PoolPutResult + var result PoolRegisterEndpointResult e, found := p.index[endpoint.CanonicalAddr()] if found { result = EndpointUpdated