Skip to content

Commit

Permalink
Merge pull request #11 from telekom/fix/kafka-picker
Browse files Browse the repository at this point in the history
Implement separate consumers for picking messages
  • Loading branch information
Schnix84 authored Aug 19, 2024
2 parents 6e5d25b + d45ea36 commit 3c18490
Show file tree
Hide file tree
Showing 22 changed files with 218 additions and 194 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
github.com/telekom/pubsub-horizon-go v0.0.0-20240731092116-af2ad8a86440
github.com/telekom/pubsub-horizon-go v0.0.0-20240819134043-281686d5002b
go.mongodb.org/mongo-driver v1.16.0
go.opentelemetry.io/contrib/propagators/b3 v1.28.0
go.opentelemetry.io/otel v1.28.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/telekom/pubsub-horizon-go v0.0.0-20240731092116-af2ad8a86440 h1:2/6NueU4B7JIOUWKlRzEugPd4/A3eAJu56gPANJe1Dw=
github.com/telekom/pubsub-horizon-go v0.0.0-20240731092116-af2ad8a86440/go.mod h1:HaSee6LL9z9yX6FxDkUFq+vOfAyAfe+5stk9MtFLUOw=
github.com/telekom/pubsub-horizon-go v0.0.0-20240819134043-281686d5002b h1:Q/MFQaIFQo+g36LFx+a4/xZ9q+pCgxGS8NuPnORN+Rs=
github.com/telekom/pubsub-horizon-go v0.0.0-20240819134043-281686d5002b/go.mod h1:HaSee6LL9z9yX6FxDkUFq+vOfAyAfe+5stk9MtFLUOw=
github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
Expand Down
1 change: 1 addition & 0 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func init() {
v1.Get("/circuit-breakers/:subscriptionId", getCircuitBreakerMessageById)
v1.Get("/circuit-breakers", getAllCircuitBreakerMessages)
v1.Put("/circuit-breakers/close/:subscriptionId", putCloseCircuitBreakerById)
v1.Get("/republishing-entries", getRepublishingEntries)
}

func Listen(port int) {
Expand Down
32 changes: 32 additions & 0 deletions internal/api/republishing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 Deutsche Telekom IT GmbH
//
// SPDX-License-Identifier: Apache-2.0

package api

import (
"context"
"github.com/gofiber/fiber/v2"
"github.com/hazelcast/hazelcast-go-client"
"github.com/rs/zerolog/log"
"pubsub-horizon-golaris/internal/cache"
"pubsub-horizon-golaris/internal/republish"
)

func getRepublishingEntries(ctx *fiber.Ctx) error {
var body = struct {
Items []republish.RepublishingCacheEntry `json:"items"`
}{make([]republish.RepublishingCacheEntry, 0)}

var republishingCache = cache.RepublishingCache.(*hazelcast.Map)
values, err := republishingCache.GetValues(context.Background())
if err != nil {
log.Error().Err(err).Msg("Could not retrieve republishing cache entries")
}

for _, value := range values {
body.Items = append(body.Items, value.(republish.RepublishingCacheEntry))
}

return ctx.Status(fiber.StatusOK).JSON(body)
}
14 changes: 13 additions & 1 deletion internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/types"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
c "github.com/telekom/pubsub-horizon-go/cache"
"github.com/telekom/pubsub-horizon-go/message"
Expand Down Expand Up @@ -78,11 +79,22 @@ func createNewHazelcastConfig() hazelcast.Config {

cacheConfig.Cluster.Name = config.Current.Hazelcast.ClusterName
cacheConfig.Cluster.Network.SetAddresses(config.Current.Hazelcast.ServiceDNS)
cacheConfig.Logger.CustomLogger = new(util.HazelcastZerologLogger)
cacheConfig.Logger.CustomLogger = util.NewHazelcastZerologLogger(parseHazelcastLogLevel(config.Current.Hazelcast.LogLevel))

return cacheConfig
}

func parseHazelcastLogLevel(logLevel string) zerolog.Level {
var hazelcastLogLevel, err = zerolog.ParseLevel(logLevel)
if err != nil {
log.Error().Err(err).Fields(map[string]any{
"logLevel": config.Current.Hazelcast.LogLevel,
}).Msg("Could not parse log-level for hazelcast logger. Falling back to INFO...")
return zerolog.InfoLevel
}
return hazelcastLogLevel
}

// initializeCaches sets up the Hazelcast caches used in the application.
// It takes a Hazelcast configuration object as a parameter.
// The function initializes the SubscriptionCache, CircuitBreakerCache, HealthCheckCache, and RepublishingCache.
Expand Down
17 changes: 14 additions & 3 deletions internal/circuitbreaker/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func HandleOpenCircuitBreaker(cbMessage message.CircuitBreakerMessage, subscript

// Create republishing cache entry
republishingCacheEntry := republish.RepublishingCacheEntry{SubscriptionId: cbMessage.SubscriptionId, RepublishingUpTo: time.Now(), PostponedUntil: time.Now().Add(+exponentialBackoff)}

log.Debug().Msgf("postponedUntil for subscriptionId %s set to %v", republishingCacheEntry.SubscriptionId, republishingCacheEntry.PostponedUntil)

err := cache.RepublishingCache.Set(hcData.Ctx, cbMessage.SubscriptionId, republishingCacheEntry)
if err != nil {
log.Error().Err(err).Msgf("Error while creating RepublishingCacheEntry entry for subscriptionId %s", cbMessage.SubscriptionId)
Expand Down Expand Up @@ -127,8 +130,6 @@ func checkForCircuitBreakerLoop(cbMessage *message.CircuitBreakerMessage) error
cbMessage.LoopCounter = 0
log.Debug().Msgf("Circuit breaker opened outside loop detection period. Reseted loop counter for subscription %s: %v", cbMessage.SubscriptionId, cbMessage.LoopCounter)
}
// set last opened for the next loop detection
cbMessage.LastOpened = cbMessage.LastModified
cbMessage.LastModified = types.NewTimestamp(time.Now().UTC())

err := cache.CircuitBreakerCache.Put(config.Current.Hazelcast.Caches.CircuitBreakerCache, cbMessage.SubscriptionId, *cbMessage)
Expand Down Expand Up @@ -187,13 +188,23 @@ func calculateExponentialBackoff(cbMessage message.CircuitBreakerMessage) time.D
return 0
}

// Return max backoff if loop counter exceeds 17 to avoid overflow
if cbMessage.LoopCounter > 17 {
return exponentialBackoffMax
}

// Calculate the exponential backoff based on republishing count.
// If the circuit breaker counter is 2 it is the first retry, because the counter had already been incremented immediately before
exponentialBackoff := exponentialBackoffBase * time.Duration(math.Pow(2, float64(cbMessage.LoopCounter-1)))

log.Debug().Msgf("Math.Pow: %v", math.Pow(2, float64(cbMessage.LoopCounter-1)))

// Limit the exponential backoff to the max backoff
if exponentialBackoff > exponentialBackoffMax {
if (exponentialBackoff > exponentialBackoffMax) || (exponentialBackoff < 0) {
exponentialBackoff = exponentialBackoffMax
}

log.Debug().Msgf("Calculating exponential backoff, subscriptionId %s, loopCounter %d, exponentialBackoffBase %v, exponentialBackoffMax %v, exponentialBackoff %v", cbMessage.SubscriptionId, cbMessage.LoopCounter, exponentialBackoffBase, exponentialBackoffMax, exponentialBackoff)

return exponentialBackoff
}
20 changes: 20 additions & 0 deletions internal/circuitbreaker/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,26 @@ func TestCalculateExponentialBackoff(t *testing.T) {
cbLoopCounter: 13,
expectedBackoff: backoffMax,
},
{
name: "Max backoff 44 reached exponential backoff = 2^41 * 1000ms",
cbLoopCounter: 42,
expectedBackoff: backoffMax,
},
{
name: "Max backoff 44 reached exponential backoff = 2^42 * 1000ms",
cbLoopCounter: 43,
expectedBackoff: backoffMax,
},
{
name: "Max backoff 44 reached exponential backoff = 2^43 * 1000ms",
cbLoopCounter: 44,
expectedBackoff: backoffMax,
},
{
name: "Max backoff 44 reached exponential backoff = 2^183 * 1000ms",
cbLoopCounter: 184,
expectedBackoff: backoffMax,
},
}

for _, tc := range tests {
Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func setDefaults() {
// Hazelcast
viper.SetDefault("hazelcast.clusterName", "dev")
viper.SetDefault("hazelcast.serviceDNS", "localhost:5701")
viper.SetDefault("hazelcast.logLevel", "info")

// Kafka
viper.SetDefault("kafka.brokers", "localhost:9092")
Expand Down
1 change: 1 addition & 0 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Hazelcast struct {
ServiceDNS string `mapstructure:"serviceDNS"`
ClusterName string `mapstructure:"clusterName"`
Caches Caches `mapstructure:"caches"`
LogLevel string `mapstructure:"logLevel"`
}

type Caches struct {
Expand Down
15 changes: 11 additions & 4 deletions internal/handler/delivering.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ func CheckDeliveringEvents() {

upperThresholdTimestamp := time.Now().Add(-config.Current.Republishing.DeliveringStatesOffset)

picker, err := kafka.NewPicker()
if err != nil {
log.Error().Err(err).Msg("Could not initialize picker for handling events in state DELIVERING")
return
}
defer picker.Close()

for {
var lastCursor any

Expand All @@ -54,23 +61,23 @@ func CheckDeliveringEvents() {
return
}

kafkaMessage, err := kafka.CurrentHandler.PickMessage(dbMessage)
message, err := picker.Pick(&dbMessage)
if err != nil {
log.Printf("Error while fetching message from kafka for subscriptionId %s: %v", dbMessage.SubscriptionId, err)
return
}

var b3Ctx = tracing.WithB3FromMessage(context.Background(), kafkaMessage)
var b3Ctx = tracing.WithB3FromMessage(context.Background(), message)
var traceCtx = tracing.NewTraceContext(b3Ctx, "golaris", config.Current.Tracing.DebugEnabled)

traceCtx.StartSpan("republish delivering message")
traceCtx.SetAttribute("component", "Horizon Golaris")
traceCtx.SetAttribute("eventId", dbMessage.Event.Id)
traceCtx.SetAttribute("eventType", dbMessage.Event.Type)
traceCtx.SetAttribute("subscriptionId", dbMessage.SubscriptionId)
traceCtx.SetAttribute("uuid", string(kafkaMessage.Key))
traceCtx.SetAttribute("uuid", string(message.Key))

err = kafka.CurrentHandler.RepublishMessage(traceCtx, kafkaMessage, "", "", false)
err = kafka.CurrentHandler.RepublishMessage(traceCtx, message, "", "", false)
if err != nil {
log.Printf("Error while republishing message for subscriptionId %s: %v", dbMessage.SubscriptionId, err)
return
Expand Down
12 changes: 9 additions & 3 deletions internal/handler/delivering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func TestCheckDeliveringEvents_Success(t *testing.T) {
mockKafka := new(test.MockKafkaHandler)
kafka.CurrentHandler = mockKafka

mockPicker := new(test.MockPicker)
test.InjectMockPicker(mockPicker)

deliveringHandler := new(test.DeliveringMockHandler)
cache.DeliveringHandler = deliveringHandler

Expand Down Expand Up @@ -71,7 +74,7 @@ func TestCheckDeliveringEvents_Success(t *testing.T) {
Value: []byte(`{"uuid": "12345", "event": {"id": "67890"}}`),
}

mockKafka.On("PickMessage", mock.AnythingOfType("message.StatusMessage")).Return(expectedKafkaMessage, nil)
mockPicker.On("Pick", mock.AnythingOfType("*message.StatusMessage")).Return(expectedKafkaMessage, nil)
mockKafka.On("RepublishMessage", expectedKafkaMessage, "", "").Return(nil)

CheckDeliveringEvents()
Expand All @@ -80,8 +83,8 @@ func TestCheckDeliveringEvents_Success(t *testing.T) {
mockMongo.AssertCalled(t, "FindDeliveringMessagesByDeliveryType", mock.Anything, mock.Anything)

mockKafka.AssertExpectations(t)
mockKafka.AssertCalled(t, "PickMessage", mock.AnythingOfType("message.StatusMessage"))
mockKafka.AssertCalled(t, "RepublishMessage", expectedKafkaMessage, "", "")
mockPicker.AssertCalled(t, "Pick", mock.AnythingOfType("*message.StatusMessage"))
}

func TestCheckDeliveringEvents_NoEvents(t *testing.T) {
Expand All @@ -94,6 +97,9 @@ func TestCheckDeliveringEvents_NoEvents(t *testing.T) {
deliveringHandler := new(test.DeliveringMockHandler)
cache.DeliveringHandler = deliveringHandler

mockPicker := new(test.MockPicker)
test.InjectMockPicker(mockPicker)

deliveringHandler.On("NewLockContext", mock.Anything).Return(context.Background())
deliveringHandler.On("TryLockWithTimeout", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
deliveringHandler.On("Unlock", mock.Anything, mock.Anything).Return(nil)
Expand All @@ -105,8 +111,8 @@ func TestCheckDeliveringEvents_NoEvents(t *testing.T) {

CheckDeliveringEvents()

mockKafka.AssertNotCalled(t, "PickMessage", mock.AnythingOfType("message.StatusMessage"))
mockKafka.AssertNotCalled(t, "RepublishMessage", mock.Anything, "", "")
mockPicker.AssertNotCalled(t, "Pick", mock.AnythingOfType("*message.StatusMessage"))

mockMongo.AssertExpectations(t)
mockMongo.AssertCalled(t, "FindDeliveringMessagesByDeliveryType", mock.Anything, mock.Anything)
Expand Down
9 changes: 8 additions & 1 deletion internal/handler/failed.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ func CheckFailedEvents() {
var dbMessages []message.StatusMessage
var err error

picker, err := kafka.NewPicker()
if err != nil {
log.Error().Err(err).Msg("Could not initialize picker for handling events in state DELIVERING")
return
}
defer picker.Close()

for {
var lastCursor any
dbMessages, _, err = mongo.CurrentConnection.FindFailedMessagesWithCallbackUrlNotFoundException(time.Now(), lastCursor)
Expand Down Expand Up @@ -65,7 +72,7 @@ func CheckFailedEvents() {
return
}

kafkaMessage, err := kafka.CurrentHandler.PickMessage(dbMessage)
kafkaMessage, err := picker.Pick(&dbMessage)
if err != nil {
log.Printf("Error while fetching message from kafka for subscriptionId %s: %v", subscriptionId, err)
return
Expand Down
7 changes: 5 additions & 2 deletions internal/handler/failed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func TestCheckFailedEvents(t *testing.T) {
failedHandler := new(test.FailedMockHandler)
cache.FailedHandler = failedHandler

mockPicker := new(test.MockPicker)
test.InjectMockPicker(mockPicker)

failedHandler.On("NewLockContext", mock.Anything).Return(context.Background())
failedHandler.On("TryLockWithTimeout", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
failedHandler.On("Unlock", mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -76,7 +79,7 @@ func TestCheckFailedEvents(t *testing.T) {
Value: []byte(`{"uuid": "12345", "event": {"id": "67890"}}`),
}

mockKafka.On("PickMessage", mock.AnythingOfType("message.StatusMessage")).Return(expectedKafkaMessage, nil)
mockPicker.On("Pick", mock.AnythingOfType("*message.StatusMessage")).Return(expectedKafkaMessage, nil)
mockKafka.On("RepublishMessage", mock.Anything, "SERVER_SENT_EVENT", "").Return(nil)

CheckFailedEvents()
Expand All @@ -88,6 +91,6 @@ func TestCheckFailedEvents(t *testing.T) {
mockCache.AssertCalled(t, "Get", config.Current.Hazelcast.Caches.SubscriptionCache, "sub123")

mockKafka.AssertExpectations(t)
mockKafka.AssertCalled(t, "PickMessage", mock.AnythingOfType("message.StatusMessage"))
mockKafka.AssertCalled(t, "RepublishMessage", expectedKafkaMessage, "SERVER_SENT_EVENT", "")
mockPicker.AssertCalled(t, "Pick", mock.AnythingOfType("*message.StatusMessage"))
}
29 changes: 0 additions & 29 deletions internal/kafka/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/burdiyan/kafkautil"
"github.com/rs/zerolog/log"
"github.com/telekom/pubsub-horizon-go/enum"
"github.com/telekom/pubsub-horizon-go/message"
"github.com/telekom/pubsub-horizon-go/tracing"
"pubsub-horizon-golaris/internal/config"
)
Expand All @@ -31,13 +30,6 @@ func Initialize() {
func newKafkaHandler() (*Handler, error) {
kafkaConfig := sarama.NewConfig()

// Initialize the Kafka Consumer to read messages from Kafka
consumer, err := sarama.NewConsumer(config.Current.Kafka.Brokers, kafkaConfig)
if err != nil {
log.Error().Err(err).Msg("Could not create Kafka consumer")
return nil, err
}

// Initialize the Kafka Producer to send the updated messages back to Kafka (resetMessage)
kafkaConfig.Producer.Partitioner = kafkautil.NewJVMCompatiblePartitioner
kafkaConfig.Producer.Return.Successes = true
Expand All @@ -48,31 +40,10 @@ func newKafkaHandler() (*Handler, error) {
}

return &Handler{
Consumer: consumer,
Producer: producer,
}, nil
}

func (kafkaHandler Handler) PickMessage(message message.StatusMessage) (*sarama.ConsumerMessage, error) {
log.Debug().Msgf("Picking message at partition %d with offset %d", *message.Coordinates.Partition, *message.Coordinates.Offset)

consumer, err := kafkaHandler.Consumer.ConsumePartition(message.Topic, *message.Coordinates.Partition, *message.Coordinates.Offset)
if err != nil {
log.Debug().Msgf("KafkaPick for partition %d and topic %s and offset %d failed: %v", *message.Coordinates.Partition, message.Topic, *message.Coordinates.Offset, err)
return nil, err
}

defer func() {
err = consumer.Close()
if err != nil {
log.Error().Err(err).Msg("Could not close consumer")
}
}()

msg := <-consumer.Messages()
return msg, nil
}

func (kafkaHandler Handler) RepublishMessage(traceCtx *tracing.TraceContext, message *sarama.ConsumerMessage, newDeliveryType string, newCallbackUrl string, errorParams bool) error {
var kafkaMessages = make([]*sarama.ProducerMessage, 0)

Expand Down
Loading

0 comments on commit 3c18490

Please sign in to comment.