Skip to content

Commit

Permalink
Merge pull request #10 from telekom/develop
Browse files Browse the repository at this point in the history
Changes for the next release
  • Loading branch information
Schnix84 authored Aug 15, 2024
2 parents 30b290d + 4c3aefe commit 6e5d25b
Show file tree
Hide file tree
Showing 16 changed files with 42 additions and 22 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.3
require (
github.com/1pkg/gohalt v0.10.0
github.com/IBM/sarama v1.43.2
github.com/burdiyan/kafkautil v0.0.0-20240215092415-7e6d3d0fc870
github.com/go-co-op/gocron v1.37.0
github.com/gofiber/fiber/v2 v2.52.5
github.com/hazelcast/hazelcast-go-client v1.4.1
Expand Down Expand Up @@ -33,7 +34,6 @@ require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/burdiyan/kafkautil v0.0.0-20240215092415-7e6d3d0fc870 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/continuity v0.4.3 // indirect
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hazelcast/hazelcast-go-client v1.4.1 h1:BSpJqqjbACI4MugfWXGxk+JdZR3JRELx0n769pa85kA=
github.com/hazelcast/hazelcast-go-client v1.4.1/go.mod h1:PJ38lqXJ18S0YpkrRznPDlUH8GnnMAQCx3jpQtBPZ6Q=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
Expand Down Expand Up @@ -178,7 +179,9 @@ github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
Expand Down Expand Up @@ -271,8 +274,6 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/telekom/pubsub-horizon-go v0.0.0-20240730110445-efd044ff0112 h1:/k7VRB+nCsJ1NtNmwA6o3EYDAAe3guZtralOTsT7X9Q=
github.com/telekom/pubsub-horizon-go v0.0.0-20240730110445-efd044ff0112/go.mod h1:HaSee6LL9z9yX6FxDkUFq+vOfAyAfe+5stk9MtFLUOw=
github.com/telekom/pubsub-horizon-go v0.0.0-20240731092116-af2ad8a86440 h1:2/6NueU4B7JIOUWKlRzEugPd4/A3eAJu56gPANJe1Dw=
github.com/telekom/pubsub-horizon-go v0.0.0-20240731092116-af2ad8a86440/go.mod h1:HaSee6LL9z9yX6FxDkUFq+vOfAyAfe+5stk9MtFLUOw=
github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek=
Expand Down Expand Up @@ -430,9 +431,11 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
1 change: 1 addition & 0 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type HazelcastMapInterface interface {
ContainsKey(ctx context.Context, key interface{}) (bool, error)
Clear(ctx context.Context) error
Lock(ctx context.Context, key interface{}) error
TryLockWithLeaseAndTimeout(ctx context.Context, key interface{}, lease time.Duration, timeout time.Duration) (bool, error)
}

var SubscriptionCache c.HazelcastBasedCache[resource.SubscriptionResource]
Expand Down
4 changes: 2 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ func setDefaults() {

// Processes
viper.SetDefault("circuitBreaker.openCheckInterval", "30s")
viper.SetDefault("circuitBreaker.openLoopDetectionPeriod", "300s")
viper.SetDefault("circuitBreaker.openLoopDetectionPeriod", "75m")
viper.SetDefault("circuitBreaker.exponentialBackoffBase", "1000ms")
viper.SetDefault("circuitBreaker.exponentialBackoffMax", "60m")
viper.SetDefault("healthCheck.successfulResponseCodes", []int{200, 201, 202, 204})
viper.SetDefault("healthCheck.coolDownTime", "30s")
viper.SetDefault("republishing.checkInterval", "30s")
viper.SetDefault("republishing.batchSize", 10)
viper.SetDefault("republishing.throttlingIntervalTime", "10s")
viper.SetDefault("republishing.deliveringStatesOffsetMins", 15)
viper.SetDefault("republishing.deliveringStatesOffset", "15m")

// Caches
viper.SetDefault("hazelcast.caches.subscriptionCache", "subscriptions.subscriber.horizon.telekom.de.v1")
Expand Down
8 changes: 4 additions & 4 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ type HealthCheck struct {
}

type Republishing struct {
CheckInterval time.Duration `mapstructure:"checkInterval"`
BatchSize int64 `mapstructure:"batchSize"`
ThrottlingIntervalTime time.Duration `mapstructure:"throttlingIntervalTime"`
DeliveringStatesOffsetMins int `mapstructure:"deliveringStatesOffsetMins"`
CheckInterval time.Duration `mapstructure:"checkInterval"`
BatchSize int64 `mapstructure:"batchSize"`
ThrottlingIntervalTime time.Duration `mapstructure:"throttlingIntervalTime"`
DeliveringStatesOffset time.Duration `mapstructure:"deliveringStatesOffset"`
}

type Hazelcast struct {
Expand Down
3 changes: 1 addition & 2 deletions internal/handler/delivering.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func CheckDeliveringEvents() {

batchSize := config.Current.Republishing.BatchSize

deliveringStatesOffsetMins := config.Current.Republishing.DeliveringStatesOffsetMins
upperThresholdTimestamp := time.Now().Add(-time.Duration(deliveringStatesOffsetMins) * time.Minute)
upperThresholdTimestamp := time.Now().Add(-config.Current.Republishing.DeliveringStatesOffset)

for {
var lastCursor any
Expand Down
5 changes: 3 additions & 2 deletions internal/handler/delivering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"pubsub-horizon-golaris/internal/mongo"
"pubsub-horizon-golaris/internal/test"
"testing"
"time"
)

func TestCheckDeliveringEvents_Success(t *testing.T) {
Expand All @@ -32,7 +33,7 @@ func TestCheckDeliveringEvents_Success(t *testing.T) {
deliveringHandler.On("Unlock", mock.Anything, mock.Anything).Return(nil)

config.Current.Republishing.BatchSize = 5
config.Current.Republishing.DeliveringStatesOffsetMins = 30
config.Current.Republishing.DeliveringStatesOffset = 30 * time.Minute

partitionValue1 := int32(1)
offsetValue1 := int64(100)
Expand Down Expand Up @@ -98,7 +99,7 @@ func TestCheckDeliveringEvents_NoEvents(t *testing.T) {
deliveringHandler.On("Unlock", mock.Anything, mock.Anything).Return(nil)

config.Current.Republishing.BatchSize = 5
config.Current.Republishing.DeliveringStatesOffsetMins = 30
config.Current.Republishing.DeliveringStatesOffset = 30 * time.Minute

mockMongo.On("FindDeliveringMessagesByDeliveryType", mock.Anything, mock.Anything).Return([]message.StatusMessage{}, nil, nil)

Expand Down
3 changes: 2 additions & 1 deletion internal/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func PrepareHealthCheck(subscription *resource.SubscriptionResource) (*PreparedH
}

// Attempt to acquire a lock for the health check key
isAcquired, _ := cache.HealthCheckCache.TryLockWithTimeout(ctx, healthCheckKey, 10*time.Millisecond)
//isAcquired, _ := cache.HealthCheckCache.TryLockWithTimeout(ctx, healthCheckKey, 10*time.Millisecond)
isAcquired, _ := cache.HealthCheckCache.TryLockWithLeaseAndTimeout(ctx, healthCheckKey, 30000*time.Millisecond, 10*time.Millisecond)

castedHealthCheckEntry := healthCheckEntry.(HealthCheckCacheEntry)
return &PreparedHealthCheckData{Ctx: ctx, HealthCheckKey: healthCheckKey, HealthCheckEntry: castedHealthCheckEntry, IsAcquired: isAcquired}, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/healthcheck/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func Test_ExecuteHealthRequestWithToken_ErrorCases(t *testing.T) {
expectedErrMsg := fmt.Sprintf("Failed to perform %s request to %s:", method, callbackUrl)
assertions.Contains(err.Error(), expectedErrMsg)

log.Info().Msgf("Error: %v", err)
log.Info().Err(err).Msgf("Error occured")
continue
}
}
Expand Down
8 changes: 4 additions & 4 deletions internal/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func Initialize() {
panic(err)
}

log.Info().Msgf("SubscriptionLister initialized")
log.Info().Msgf("SubscriptionListener initialized")
}

// OnAdd is not implemented for OnAdd event handling.
Expand Down Expand Up @@ -109,7 +109,7 @@ func handleDeliveryTypeChangeFromCallbackToSSE(obj resource.SubscriptionResource

republish.ForceDelete(context.Background(), obj.Spec.Subscription.SubscriptionId)

log.Info().Msgf("Waiting for 2 seconds before setting new entry to RepublishingCache for subscription %s", obj.Spec.Subscription.SubscriptionId)
log.Debug().Msgf("Waiting for 2 seconds before setting new entry to RepublishingCache for subscription %s", obj.Spec.Subscription.SubscriptionId)

// We need to wait for the goroutine to finish before setting a new entry in the republishing cache
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -151,7 +151,7 @@ func handleCallbackUrlChange(obj resource.SubscriptionResource, oldObj resource.
republish.ForceDelete(context.Background(), obj.Spec.Subscription.SubscriptionId)
log.Debug().Msgf("Successfully deleted RepublishingCache entry for subscriptionId %s", obj.Spec.Subscription.SubscriptionId)

log.Info().Msgf("Waiting for 2 seconds before setting new entry to RepublishingCache for subscription %s", obj.Spec.Subscription.SubscriptionId)
log.Debug().Msgf("Waiting for 2 seconds before setting new entry to RepublishingCache for subscription %s", obj.Spec.Subscription.SubscriptionId)

// We need to wait for the goroutine to finish before setting a new entry in the republishing cache
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -197,7 +197,7 @@ func handleRedeliveriesPerSecondChange(obj resource.SubscriptionResource, oldObj
republish.ForceDelete(context.Background(), obj.Spec.Subscription.SubscriptionId)
log.Debug().Msgf("Successfully deleted RepublishingCache entry for subscriptionId %s", obj.Spec.Subscription.SubscriptionId)

log.Info().Msgf("Waiting for 2 seconds before setting new entry to RepublishingCache for subscription %s", obj.Spec.Subscription.SubscriptionId)
log.Debug().Msgf("Waiting for 2 seconds before setting new entry to RepublishingCache for subscription %s", obj.Spec.Subscription.SubscriptionId)

// We need to wait for the goroutine to finish before setting a new entry in the republishing cache
time.Sleep(2 * time.Second)
Expand Down
2 changes: 1 addition & 1 deletion internal/mongo/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ func pingMongoNode(connection *mongo.Client) error {
return err
}

log.Info().Msg("Connected to MongoDB established")
log.Info().Msg("Connection to MongoDB established")
return nil
}
2 changes: 1 addition & 1 deletion internal/mongo/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (connection Connection) findMessagesByQuery(query bson.M, lastCursor any) (

if lastCursor != nil {
query["timestamp"] = bson.M{"$gt": lastCursor}
log.Info().Msgf("Querying for messages with timestamp > %v", lastCursor)
log.Debug().Msgf("Querying for messages with timestamp > %v", lastCursor)
}

collection := connection.Client.Database(connection.Config.Database).Collection(connection.Config.Collection)
Expand Down
2 changes: 1 addition & 1 deletion internal/republish/republish.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func RepublishPendingEvents(subscription *resource.SubscriptionResource, republi
log.Debug().Msgf("Found %d WAITING messages in MongoDb", len(dbMessages))
}

log.Info().Msgf("Last cursor: %v", lastCursor)
log.Debug().Msgf("Last cursor: %v", lastCursor)

if len(dbMessages) == 0 {
break
Expand Down
5 changes: 5 additions & 0 deletions internal/test/deliveringhandler_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func (d *DeliveringMockHandler) TryLockWithTimeout(ctx context.Context, key inte
return args.Bool(0), args.Error(1)
}

func (d *DeliveringMockHandler) TryLockWithLeaseAndTimeout(ctx context.Context, key interface{}, lease time.Duration, timeout time.Duration) (bool, error) {
args := d.Called(ctx, key, timeout)
return args.Bool(0), args.Error(1)
}

func (d *DeliveringMockHandler) GetEntrySet(ctx context.Context) ([]types.Entry, error) {
args := d.Called(ctx)
return args.Get(0).([]types.Entry), args.Error(1)
Expand Down
5 changes: 5 additions & 0 deletions internal/test/failedhandler_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func (f *FailedMockHandler) TryLockWithTimeout(ctx context.Context, key interfac
return args.Bool(0), args.Error(1)
}

func (d *FailedMockHandler) TryLockWithLeaseAndTimeout(ctx context.Context, key interface{}, lease time.Duration, timeout time.Duration) (bool, error) {
args := d.Called(ctx, key, timeout)
return args.Bool(0), args.Error(1)
}

func (f *FailedMockHandler) GetEntrySet(ctx context.Context) ([]types.Entry, error) {
args := f.Called(ctx)
return args.Get(0).([]types.Entry), args.Error(1)
Expand Down
5 changes: 5 additions & 0 deletions internal/test/republishing_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func (r *RepublishingMockMap) TryLockWithTimeout(ctx context.Context, key interf
return args.Bool(0), args.Error(1)
}

func (d *RepublishingMockMap) TryLockWithLeaseAndTimeout(ctx context.Context, key interface{}, lease time.Duration, timeout time.Duration) (bool, error) {
args := d.Called(ctx, key, timeout)
return args.Bool(0), args.Error(1)
}

func (r *RepublishingMockMap) GetEntrySet(ctx context.Context) ([]types.Entry, error) {
args := r.Called(ctx)
return args.Get(0).([]types.Entry), args.Error(1)
Expand Down

0 comments on commit 6e5d25b

Please sign in to comment.