Skip to content

Commit

Permalink
fix: Refactor Register, fix log levels
Browse files Browse the repository at this point in the history
  • Loading branch information
b1tamara committed Dec 18, 2024
1 parent e14d71e commit ef4bebb
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 51 deletions.
10 changes: 4 additions & 6 deletions metrics/metricsreporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,11 @@ func (m *MetricsReporter) CaptureRegistryMessage(msg ComponentTagged, action str
}

func (m *MetricsReporter) CaptureUnregistryMessage(msg ComponentTagged, action string) {
var componentName string
if msg.Component() == "" {
componentName = "unregistry_message." + action
} else {
componentName = "unregistry_message." + action + "." + msg.Component()
unregisterMsg := "unregistry_message." + action
if msg.Component() != "" {
unregisterMsg = unregisterMsg + "." + msg.Component()
}
m.Batcher.BatchIncrementCounter(componentName)
m.Batcher.BatchIncrementCounter(unregisterMsg)
}

func (m *MetricsReporter) CaptureWebSocketUpdate() {
Expand Down
63 changes: 39 additions & 24 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,42 @@ func (r *RouteRegistry) Register(uri route.Uri, endpoint *route.Endpoint) {
return
}

endpointPutResult := r.register(uri, endpoint)
r.RLock()
defer r.RUnlock()

t := time.Now()
registerRouteResult, pool := r.registerRoute(uri)
if registerRouteResult == route.RouteRegistered {
r.reporter.CaptureRegistryMessage(endpoint, string(route.RouteRegistered))
if r.logger.Enabled(context.Background(), slog.LevelInfo) {
r.logger.Info(string(route.RouteRegistered), buildSlogAttrs(uri, endpoint)...)
}
}

endpointPutResult := r.registerEndpoint(endpoint, pool)

if endpointPutResult == route.EndpointAdded && !endpoint.UpdatedAt.IsZero() {
r.reporter.CaptureRouteRegistrationLatency(time.Since(endpoint.UpdatedAt))
}

r.reporter.CaptureRegistryMessage(endpoint, string(endpointPutResult))
if r.logger.Enabled(context.Background(), slog.LevelInfo) {
r.logger.Info(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...)

switch endpointPutResult {
case route.EndpointAdded:
if r.logger.Enabled(context.Background(), slog.LevelInfo) {
r.logger.Info(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...)
}
default:
if r.logger.Enabled(context.Background(), slog.LevelDebug) {
r.logger.Debug(string(endpointPutResult), buildSlogAttrs(uri, endpoint)...)
}
}
}

func (r *RouteRegistry) register(uri route.Uri, endpoint *route.Endpoint) route.PoolPutResult {
r.RLock()
defer r.RUnlock()
r.SetTimeOfLastUpdate(t)
}

t := time.Now()
func (r *RouteRegistry) registerRoute(uri route.Uri) (route.PoolRegisterRouteResult, *route.EndpointPool) {
poolRegisterRouteResult := route.RouteAlreadyExists
routekey := uri.RouteKey()
pool := r.byURI.Find(routekey)

Expand All @@ -112,18 +131,21 @@ func (r *RouteRegistry) register(uri route.Uri, endpoint *route.Endpoint) route.
r.RUnlock()
pool = r.insertRouteKey(routekey, uri)
r.RLock()
poolRegisterRouteResult = route.RouteRegistered
}
return poolRegisterRouteResult, pool
}

func (r *RouteRegistry) registerEndpoint(endpoint *route.Endpoint, pool *route.EndpointPool) route.PoolRegisterEndpointResult {
if endpoint.StaleThreshold > r.dropletStaleThreshold || endpoint.StaleThreshold == 0 {
endpoint.StaleThreshold = r.dropletStaleThreshold
}

endpointAdded := pool.Put(endpoint)
endpointAddedResult := pool.Put(endpoint)
// Overwrites the load balancing algorithm of a pool by that of a specified endpoint, if that is valid.
pool.SetPoolLoadBalancingAlgorithm(endpoint)
r.SetTimeOfLastUpdate(t)

return endpointAdded
return endpointAddedResult
}

// insertRouteKey acquires a write lock, inserts the route key into the registry and releases the write lock.
Expand All @@ -144,7 +166,7 @@ func (r *RouteRegistry) insertRouteKey(routekey route.Uri, uri route.Uri) *route
LoadBalancingAlgorithm: r.DefaultLoadBalancingAlgorithm,
})
r.byURI.Insert(routekey, pool)
r.logger.Info("route-registered", slog.Any("uri", routekey))
r.logger.Info(string(route.RouteRegistered), slog.Any("uri", routekey))
// for backward compatibility:
r.logger.Debug("uri-added", slog.Any("uri", routekey))
}
Expand All @@ -156,6 +178,9 @@ func (r *RouteRegistry) Unregister(uri route.Uri, endpoint *route.Endpoint) {
return
}

r.Lock()
defer r.Unlock()

routeKey := uri.RouteKey()
endpointUnregisteredResult, pool := r.unregisterEndpoint(routeKey, endpoint)
if pool == nil {
Expand All @@ -178,9 +203,6 @@ func (r *RouteRegistry) Unregister(uri route.Uri, endpoint *route.Endpoint) {
}

func (r *RouteRegistry) unregisterEndpoint(routeKey route.Uri, endpoint *route.Endpoint) (route.PoolRemoveEndpointResult, *route.EndpointPool) {
r.Lock()
defer r.Unlock()

pool := r.byURI.Find(routeKey)
if pool == nil {
return route.EndpointNotUnregistered, nil
Expand All @@ -189,16 +211,9 @@ func (r *RouteRegistry) unregisterEndpoint(routeKey route.Uri, endpoint *route.E
}

func (r *RouteRegistry) deleteRouteWithoutEndpoint(routeKey route.Uri, pool *route.EndpointPool) route.PoolRemoveRouteResult {
r.Lock()
defer r.Unlock()

if pool.IsEmpty() {
if r.EmptyPoolResponseCode503 && r.EmptyPoolTimeout > 0 {
if time.Since(pool.LastUpdated()) > r.EmptyPoolTimeout {
r.byURI.Delete(routeKey)
return route.RouteUnregistered
}
} else {
if !(r.EmptyPoolResponseCode503 && r.EmptyPoolTimeout > 0) ||
(r.EmptyPoolResponseCode503 && r.EmptyPoolTimeout > 0 && time.Since(pool.LastUpdated()) > r.EmptyPoolTimeout) {
r.byURI.Delete(routeKey)
return route.RouteUnregistered
}
Expand Down
43 changes: 28 additions & 15 deletions registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@ var _ = Describe("RouteRegistry", func() {
Context("when a new endpoint is registered", func() {
It("emits endpoint-registered message_count metrics", func() {
r.Register("foo", fooEndpoint)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(1))
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2))
endpoint1, action1 := reporter.CaptureRegistryMessageArgsForCall(0)
Expect(endpoint1).To(Equal(fooEndpoint))
Expect(action1).To(Equal("endpoint-added"))
Expect(action1).To(Equal(string(route.RouteRegistered)))
endpoint2, action2 := reporter.CaptureRegistryMessageArgsForCall(1)
Expect(endpoint2).To(Equal(fooEndpoint))
Expect(action2).To(Equal(string(route.EndpointAdded)))
})
})

Expand All @@ -88,13 +91,16 @@ var _ = Describe("RouteRegistry", func() {
endpoint2 := route.NewEndpoint(&route.EndpointOpts{ModificationTag: modTag2})
r.Register("foo", endpoint1)
r.Register("foo", endpoint2)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2))
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3))
endpointR1, action1 := reporter.CaptureRegistryMessageArgsForCall(0)
Expect(endpointR1).To(Equal(endpoint1))
Expect(action1).To(Equal("endpoint-added"))
Expect(action1).To(Equal(string(route.RouteRegistered)))
endpointR2, action2 := reporter.CaptureRegistryMessageArgsForCall(1)
Expect(endpointR2).To(Equal(endpoint2))
Expect(action2).To(Equal("endpoint-updated"))
Expect(endpointR2).To(Equal(endpoint1))
Expect(action2).To(Equal(string(route.EndpointAdded)))
endpointR3, action3 := reporter.CaptureRegistryMessageArgsForCall(2)
Expect(endpointR3).To(Equal(endpoint2))
Expect(action3).To(Equal(string(route.EndpointUpdated)))
})
})

Expand All @@ -106,13 +112,16 @@ var _ = Describe("RouteRegistry", func() {
endpoint2 := route.NewEndpoint(&route.EndpointOpts{ModificationTag: modTag2})
r.Register("foo", endpoint1)
r.Register("foo", endpoint2)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2))
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3))
endpointR1, action1 := reporter.CaptureRegistryMessageArgsForCall(0)
Expect(endpointR1).To(Equal(endpoint1))
Expect(action1).To(Equal("endpoint-added"))
Expect(action1).To(Equal(string(route.RouteRegistered)))
endpointR2, action2 := reporter.CaptureRegistryMessageArgsForCall(1)
Expect(endpointR2).To(Equal(endpoint2))
Expect(action2).To(Equal("endpoint-not-updated"))
Expect(endpointR2).To(Equal(endpoint1))
Expect(action2).To(Equal(string(route.EndpointAdded)))
endpointR3, action3 := reporter.CaptureRegistryMessageArgsForCall(2)
Expect(endpointR3).To(Equal(endpoint2))
Expect(action3).To(Equal(string(route.EndpointNotUpdated)))
})
})

Expand Down Expand Up @@ -562,33 +571,36 @@ var _ = Describe("RouteRegistry", func() {
BeforeEach(func() {
fooEndpoint.Tags = map[string]string{"component": "oauth-server"}
r.Register("foo", fooEndpoint)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2))
})

It("emits counter metrics for unregister endpoint and route", func() {
r.Unregister("foo", fooEndpoint)
Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(2))
endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0)
Expect(endpoint1).To(Equal(fooEndpoint))
Expect(action1).To(Equal("endpoint-unregistered"))
Expect(action1).To(Equal(string(route.EndpointUnregistered)))
endpoint2, action2 := reporter.CaptureUnregistryMessageArgsForCall(1)
Expect(endpoint2).To(Equal(fooEndpoint))
Expect(action2).To(Equal("route-unregistered"))
Expect(action2).To(Equal(string(route.RouteUnregistered)))
})
})

Context("when endpoint does not have component tag", func() {
BeforeEach(func() {
fooEndpoint.Tags = map[string]string{}
r.Register("foo", fooEndpoint)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(2))
})
It("emits counter metrics for unregister endpoint and route", func() {
r.Unregister("foo", fooEndpoint)
Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(2))
endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0)
Expect(endpoint1).To(Equal(fooEndpoint))
Expect(action1).To(Equal("endpoint-unregistered"))
Expect(action1).To(Equal(string(route.EndpointUnregistered)))
endpoint2, action2 := reporter.CaptureUnregistryMessageArgsForCall(1)
Expect(endpoint2).To(Equal(fooEndpoint))
Expect(action2).To(Equal("route-unregistered"))
Expect(action2).To(Equal(string(route.RouteUnregistered)))
})
})
})
Expand All @@ -604,13 +616,14 @@ var _ = Describe("RouteRegistry", func() {

r.Register("foo", fooEndpoint)
r.Register("foo", fooEndpoint2)
Expect(reporter.CaptureRegistryMessageCallCount()).To(Equal(3))
})
It("emits counter metrics for unregister endpoint only", func() {
r.Unregister("foo", fooEndpoint)
Expect(reporter.CaptureUnregistryMessageCallCount()).To(Equal(1))
endpoint1, action1 := reporter.CaptureUnregistryMessageArgsForCall(0)
Expect(endpoint1).To(Equal(fooEndpoint))
Expect(action1).To(Equal("endpoint-unregistered"))
Expect(action1).To(Equal(string(route.EndpointUnregistered)))
})
})
Context("when route is not registered", func() {
Expand Down
19 changes: 13 additions & 6 deletions route/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@ type Counter struct {
value int64
}

type PoolPutResult string
type PoolRegisterEndpointResult string

const (
EndpointNotUpdated PoolPutResult = "endpoint-not-updated"
EndpointUpdated PoolPutResult = "endpoint-updated"
EndpointAdded PoolPutResult = "endpoint-added"
EndpointNotUpdated PoolRegisterEndpointResult = "endpoint-not-updated"
EndpointUpdated PoolRegisterEndpointResult = "endpoint-updated"
EndpointAdded PoolRegisterEndpointResult = "endpoint-added"
)

type PoolRegisterRouteResult string

const (
RouteRegistered PoolRegisterRouteResult = "route-registered"
RouteAlreadyExists PoolRegisterRouteResult = "route-already-exists"
)

type PoolRemoveEndpointResult string
Expand Down Expand Up @@ -268,11 +275,11 @@ func (p *EndpointPool) Update() {
p.updatedAt = time.Now()
}

func (p *EndpointPool) Put(endpoint *Endpoint) PoolPutResult {
func (p *EndpointPool) Put(endpoint *Endpoint) PoolRegisterEndpointResult {
p.Lock()
defer p.Unlock()

var result PoolPutResult
var result PoolRegisterEndpointResult
e, found := p.index[endpoint.CanonicalAddr()]
if found {
result = EndpointUpdated
Expand Down

0 comments on commit ef4bebb

Please sign in to comment.