From 89d62b244d463cbeb0b7b6187121d13da3cff1b4 Mon Sep 17 00:00:00 2001 From: Korbinian Stoemmer Date: Fri, 18 Nov 2022 09:25:23 +0100 Subject: [PATCH] update eventing from main (#16122) * Change http response code based on backend error (#16115) * introduce common errors to allow returning custom http responses * bump image * evaluate API Error returned by nats * fix code review comments * update stream configuration according to env vars (#16111) * update stream configuration according to env vars * fix tests * disable strict shadowing linting, to allow shadowing of `err` * parse units in maxbytes environment * fix more tests * set discardPolicy * fix review comments * bump images --- .../pkg/handler/handler.go | 15 +++- .../pkg/handler/handler_test.go | 63 +++++++++++++++++ .../pkg/sender/jetstream/jetstream.go | 14 +++- .../pkg/sender/jetstream/jetstream_test.go | 53 +++++++++++--- .../pkg/sender/sender.go | 8 +++ components/eventing-controller/.golangci.yaml | 2 +- .../reconciler_internal_integration_test.go | 3 +- .../pkg/backend/jetstreamv2/jetstream.go | 58 +++++++++++----- .../jetstreamv2/jetstream_integration_test.go | 6 +- .../pkg/backend/jetstreamv2/utils.go | 42 +++++++++-- .../backend/jetstreamv2/utils_unit_test.go | 62 +++++++++++++++-- .../pkg/backend/nats/jetstream/jetstream.go | 69 ++++++++++++++----- .../jetstream/jetstream_integration_test.go | 6 +- .../backend/nats/jetstream/jetstream_util.go | 19 ++++- .../jetstream_v1v2_integration_test.go | 5 +- .../pkg/env/nats_config.go | 6 +- .../pkg/env/nats_config_test.go | 7 +- .../jetstream/jetstream_test.go | 1 + .../controller/templates/deployment.yaml | 4 +- .../charts/nats/templates/statefulset.yaml | 2 +- resources/eventing/profile-evaluation.yaml | 6 +- resources/eventing/profile-production.yaml | 6 +- resources/eventing/values.yaml | 8 ++- 23 files changed, 381 insertions(+), 84 deletions(-) diff --git a/components/event-publisher-proxy/pkg/handler/handler.go b/components/event-publisher-proxy/pkg/handler/handler.go index 414d5f21bbda..0cdd91a88e5d 100644 --- a/components/event-publisher-proxy/pkg/handler/handler.go +++ b/components/event-publisher-proxy/pkg/handler/handler.go @@ -2,6 +2,7 @@ package handler import ( "context" + "errors" "net/http" "time" @@ -112,7 +113,13 @@ func (h *Handler) publishLegacyEventsAsCE(writer http.ResponseWriter, request *h result, err := h.sendEventAndRecordMetrics(ctx, event, h.Sender.URL(), request.Header) if err != nil { h.namedLogger().With().Error(err) - h.LegacyTransformer.TransformsCEResponseToLegacyResponse(writer, http.StatusInternalServerError, event, err.Error()) + httpStatus := http.StatusInternalServerError + if errors.Is(err, sender.ErrInsufficientStorage) { + httpStatus = http.StatusInsufficientStorage + } else if errors.Is(err, sender.ErrBackendTargetNotFound) { + httpStatus = http.StatusBadGateway + } + h.LegacyTransformer.TransformsCEResponseToLegacyResponse(writer, httpStatus, event, err.Error()) return } h.namedLogger().With().Debug(result) @@ -150,7 +157,11 @@ func (h *Handler) publishCloudEvents(writer http.ResponseWriter, request *http.R result, err := h.sendEventAndRecordMetrics(ctx, event, h.Sender.URL(), request.Header) if err != nil { - writer.WriteHeader(http.StatusInternalServerError) + httpStatus := http.StatusInternalServerError + if errors.Is(err, sender.ErrInsufficientStorage) { + httpStatus = http.StatusInsufficientStorage + } + writer.WriteHeader(httpStatus) h.namedLogger().With().Error(err) return } diff --git a/components/event-publisher-proxy/pkg/handler/handler_test.go b/components/event-publisher-proxy/pkg/handler/handler_test.go index 1b37fce656e8..94257dcfb087 100644 --- a/components/event-publisher-proxy/pkg/handler/handler_test.go +++ b/components/event-publisher-proxy/pkg/handler/handler_test.go @@ -329,6 +329,25 @@ func TestHandler_publishCloudEvents(t *testing.T) { eventing_epp_errors_total 1 `, }, + { + name: "Publish binary CloudEvent but backend is full", + fields: fields{ + Sender: &GenericSenderStub{ + Err: fmt.Errorf("oh no, i cannot send: %w", sender.ErrInsufficientStorage), + }, + collector: metrics.NewCollector(latency), + eventTypeCleaner: &eventtypetest.CleanerStub{}, + }, + args: args{ + request: CreateValidBinaryRequest(t), + }, + wantStatus: 507, + wantTEF: ` + # HELP eventing_epp_errors_total The total number of errors while sending events to the messaging server + # TYPE eventing_epp_errors_total counter + eventing_epp_errors_total 1 + `, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -429,6 +448,50 @@ func TestHandler_publishLegacyEventsAsCE(t *testing.T) { eventing_epp_requests_total{destination_service="FOO",response_code="204"} 1 `, }, + { + name: "Send valid legacy event but cannot send to backend due to target not found (e.g. stream missing)", + fields: fields{ + Sender: &GenericSenderStub{ + Err: fmt.Errorf("oh no, i cannot send: %w", sender.ErrBackendTargetNotFound), + BackendURL: "FOO", + }, + LegacyTransformer: legacy.NewTransformer("namespace", "im.a.prefix", NewApplicationListerOrDie(context.Background(), "testapp")), + collector: metrics.NewCollector(latency), + eventTypeCleaner: eventtypetest.CleanerStub{}, + }, + args: args{ + request: legacytest.ValidLegacyRequestOrDie(t, "v1", "testapp", "object.created"), + }, + wantStatus: http.StatusBadGateway, + wantOk: false, + wantTEF: ` + # HELP eventing_epp_errors_total The total number of errors while sending events to the messaging server + # TYPE eventing_epp_errors_total counter + eventing_epp_errors_total 1 + `, + }, + { + name: "Send valid legacy event but cannot send to backend due to full storage", + fields: fields{ + Sender: &GenericSenderStub{ + Err: fmt.Errorf("oh no, i cannot send: %w", sender.ErrInsufficientStorage), + BackendURL: "FOO", + }, + LegacyTransformer: legacy.NewTransformer("namespace", "im.a.prefix", NewApplicationListerOrDie(context.Background(), "testapp")), + collector: metrics.NewCollector(latency), + eventTypeCleaner: eventtypetest.CleanerStub{}, + }, + args: args{ + request: legacytest.ValidLegacyRequestOrDie(t, "v1", "testapp", "object.created"), + }, + wantStatus: 507, + wantOk: false, + wantTEF: ` + # HELP eventing_epp_errors_total The total number of errors while sending events to the messaging server + # TYPE eventing_epp_errors_total counter + eventing_epp_errors_total 1 + `, + }, { name: "Send valid legacy event but cannot send to backend", fields: fields{ diff --git a/components/event-publisher-proxy/pkg/sender/jetstream/jetstream.go b/components/event-publisher-proxy/pkg/sender/jetstream/jetstream.go index 1c103e30578a..1f4aa1c89d1e 100644 --- a/components/event-publisher-proxy/pkg/sender/jetstream/jetstream.go +++ b/components/event-publisher-proxy/pkg/sender/jetstream/jetstream.go @@ -23,6 +23,7 @@ import ( ) const ( + JSStoreFailedCode = 10077 natsBackend = "nats" jestreamHandlerName = "jetstream-handler" noSpaceLeftErrMessage = "no space left on device" @@ -81,12 +82,19 @@ func (s *Sender) Send(_ context.Context, event *event.Event) (sender.PublishResu if err != nil { s.namedLogger().Errorw("Cannot send event to backend", "error", err) if errors.Is(err, nats.ErrNoStreamResponse) { - return nil, fmt.Errorf("%w : %v", ErrCannotSendToStream, err) + return nil, fmt.Errorf("%w : %v", sender.ErrBackendTargetNotFound, fmt.Errorf("%w, %v", ErrCannotSendToStream, err)) + } + + var apiErr nats.JetStreamError + if ok := errors.As(err, &apiErr); ok { + if apiErr.APIError().ErrorCode == JSStoreFailedCode { + return nil, fmt.Errorf("%w: %v", sender.ErrInsufficientStorage, err) + } } if strings.Contains(err.Error(), noSpaceLeftErrMessage) { - return nil, err + return nil, fmt.Errorf("%w: %v", sender.ErrInsufficientStorage, err) } - return nil, fmt.Errorf("%w: %v", ErrCannotSendToStream, err) + return nil, fmt.Errorf("%w : %v", sender.ErrInternalBackendError, fmt.Errorf("%w, %v", ErrCannotSendToStream, err)) } return beb.HTTPPublishResult{Status: http.StatusNoContent}, nil } diff --git a/components/event-publisher-proxy/pkg/sender/jetstream/jetstream_test.go b/components/event-publisher-proxy/pkg/sender/jetstream/jetstream_test.go index 712d90fe2dfe..e8cfe18cf4d3 100644 --- a/components/event-publisher-proxy/pkg/sender/jetstream/jetstream_test.go +++ b/components/event-publisher-proxy/pkg/sender/jetstream/jetstream_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/env" + "github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/sender" testingutils "github.com/kyma-project/kyma/components/event-publisher-proxy/testing" ) @@ -27,29 +28,36 @@ func TestJetStreamMessageSender(t *testing.T) { testCases := []struct { name string givenStream bool + givenStreamMaxBytes int64 givenNATSConnectionClosed bool - wantError bool + wantErr error wantStatusCode int }{ { name: "send in jetstream mode should not succeed if stream doesn't exist", givenStream: false, givenNATSConnectionClosed: false, - wantError: true, - wantStatusCode: http.StatusNotFound, + wantErr: sender.ErrBackendTargetNotFound, + }, + { + name: "send in jetstream mode should not succeed if stream is full", + givenStream: true, + givenStreamMaxBytes: 1, + givenNATSConnectionClosed: false, + wantErr: sender.ErrInsufficientStorage, }, { name: "send in jetstream mode should succeed if NATS connection is open and the stream exists", givenStream: true, + givenStreamMaxBytes: 5000, givenNATSConnectionClosed: false, - wantError: false, + wantErr: nil, wantStatusCode: http.StatusNoContent, }, { name: "send in jetstream mode should fail if NATS connection is not open", givenNATSConnectionClosed: true, - wantError: true, - wantStatusCode: http.StatusBadGateway, + wantErr: ErrNotConnected, }, } @@ -66,7 +74,10 @@ func TestJetStreamMessageSender(t *testing.T) { }() if tc.givenStream { - addStream(t, connection, getStreamConfig()) + sc := getStreamConfig(tc.givenStreamMaxBytes) + cc := getConsumerConfig() + addStream(t, connection, sc) + addConsumer(t, connection, sc, cc) } ce := createCloudEvent(t) @@ -84,8 +95,8 @@ func TestJetStreamMessageSender(t *testing.T) { testEnv.Logger.WithContext().Errorf("err: %v", err) // assert - assert.Equal(t, tc.wantError, err != nil) - if !tc.wantError { + assert.ErrorIs(t, err, tc.wantErr) + if tc.wantErr == nil { assert.Equal(t, tc.wantStatusCode, status.HTTPStatus()) } }) @@ -151,12 +162,23 @@ func createCloudEvent(t *testing.T) *event.Event { } // getStreamConfig inits a testing stream config. -func getStreamConfig() *nats.StreamConfig { +func getStreamConfig(maxBytes int64) *nats.StreamConfig { return &nats.StreamConfig{ Name: testingutils.StreamName, Subjects: []string{fmt.Sprintf("%s.>", env.JetStreamSubjectPrefix)}, Storage: nats.MemoryStorage, Retention: nats.InterestPolicy, + Discard: nats.DiscardNew, + MaxBytes: maxBytes, + } +} + +func getConsumerConfig() *nats.ConsumerConfig { + return &nats.ConsumerConfig{ + Durable: "test", + DeliverPolicy: nats.DeliverAllPolicy, + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: fmt.Sprintf("%v.%v", env.JetStreamSubjectPrefix, testingutils.CloudEventTypeWithPrefix), } } @@ -164,7 +186,16 @@ func getStreamConfig() *nats.StreamConfig { func addStream(t *testing.T, connection *nats.Conn, config *nats.StreamConfig) { js, err := connection.JetStream() assert.NoError(t, err) - _, err = js.AddStream(config) + info, err := js.AddStream(config) + t.Logf("%+v", info) + assert.NoError(t, err) +} + +func addConsumer(t *testing.T, connection *nats.Conn, sc *nats.StreamConfig, config *nats.ConsumerConfig) { + js, err := connection.JetStream() + assert.NoError(t, err) + info, err := js.AddConsumer(sc.Name, config) + t.Logf("%+v", info) assert.NoError(t, err) } diff --git a/components/event-publisher-proxy/pkg/sender/sender.go b/components/event-publisher-proxy/pkg/sender/sender.go index 173a2c877049..3a86d8ab6ee0 100644 --- a/components/event-publisher-proxy/pkg/sender/sender.go +++ b/components/event-publisher-proxy/pkg/sender/sender.go @@ -2,10 +2,18 @@ package sender import ( "context" + "errors" "github.com/cloudevents/sdk-go/v2/event" ) +var ( + ErrInsufficientStorage = errors.New("insufficient storage on backend") + ErrBackendTargetNotFound = errors.New("publishing target on backend not found") + ErrNoConnection = errors.New("no connection to backend") + ErrInternalBackendError = errors.New("internal error on backend") +) + type GenericSender interface { Send(context.Context, *event.Event) (PublishResult, error) URL() string diff --git a/components/eventing-controller/.golangci.yaml b/components/eventing-controller/.golangci.yaml index 028693821bcf..be97aef15a68 100644 --- a/components/eventing-controller/.golangci.yaml +++ b/components/eventing-controller/.golangci.yaml @@ -111,7 +111,7 @@ linters-settings: shadow: # Whether to be strict about shadowing; can be noisy. # Default: false - strict: true + strict: false nakedret: # Make an issue if func has more lines of code than this setting, and it has naked returns. diff --git a/components/eventing-controller/controllers/subscription/jetstream/reconciler_internal_integration_test.go b/components/eventing-controller/controllers/subscription/jetstream/reconciler_internal_integration_test.go index f159506a4deb..81695b8e565c 100644 --- a/components/eventing-controller/controllers/subscription/jetstream/reconciler_internal_integration_test.go +++ b/components/eventing-controller/controllers/subscription/jetstream/reconciler_internal_integration_test.go @@ -765,9 +765,10 @@ func startReconciler(eventTypePrefix string, ens *jetStreamTestEnsemble) *jetStr EventTypePrefix: eventTypePrefix, JSStreamName: reconcilertesting.JSStreamName, JSStreamStorageType: "memory", - JSStreamMaxBytes: -1, + JSStreamMaxBytes: "-1", JSStreamMaxMessages: -1, JSStreamRetentionPolicy: "interest", + JSStreamDiscardPolicy: "new", } // prepare application-lister diff --git a/components/eventing-controller/pkg/backend/jetstreamv2/jetstream.go b/components/eventing-controller/pkg/backend/jetstreamv2/jetstream.go index 6451adacd725..da9735dc9835 100644 --- a/components/eventing-controller/pkg/backend/jetstreamv2/jetstream.go +++ b/components/eventing-controller/pkg/backend/jetstreamv2/jetstream.go @@ -4,12 +4,17 @@ import ( "context" "fmt" "net/http" + "reflect" "time" backendutils "github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/utils" cev2 "github.com/cloudevents/sdk-go/v2" cev2protocol "github.com/cloudevents/sdk-go/v2/protocol" + "github.com/nats-io/nats.go" + "github.com/pkg/errors" + "go.uber.org/zap" + eventingv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" "github.com/kyma-project/kyma/components/eventing-controller/logger" "github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/cleaner" @@ -18,9 +23,6 @@ import ( "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "github.com/kyma-project/kyma/components/eventing-controller/pkg/tracing" "github.com/kyma-project/kyma/components/eventing-controller/utils" - "github.com/nats-io/nats.go" - "github.com/pkg/errors" - "go.uber.org/zap" ) var _ Backend = &JetStream{} @@ -58,7 +60,7 @@ func (js *JetStream) Initialize(connCloseHandler backendutilsv2.ConnClosedHandle if err := js.initCloudEventClient(js.Config); err != nil { return err } - return js.ensureStreamExists() + return js.ensureStreamExistsAndIsConfiguredCorrectly() } func (js *JetStream) SyncSubscription(subscription *eventingv1alpha2.Subscription) error { @@ -153,6 +155,9 @@ func (js *JetStream) validateConfig() error { if _, err := toJetStreamRetentionPolicy(js.Config.JSStreamRetentionPolicy); err != nil { return err } + if _, err := toJetStreamDiscardPolicy(js.Config.JSStreamDiscardPolicy); err != nil { + return err + } return nil } @@ -179,30 +184,45 @@ func (js *JetStream) initNATSConn(connCloseHandler backendutilsv2.ConnClosedHand func (js *JetStream) handleReconnect(_ *nats.Conn) { js.namedLogger().Infow("Called reconnect handler for JetStream") - if err := js.ensureStreamExists(); err != nil { + if err := js.ensureStreamExistsAndIsConfiguredCorrectly(); err != nil { js.namedLogger().Errorw("Failed to ensure the stream exists", "error", err) } } -func (js *JetStream) ensureStreamExists() error { - if info, err := js.jsCtx.StreamInfo(js.Config.JSStreamName); err == nil { - // TODO: in case the stream exists, should we make sure all of its configs matches the stream config in the chart? - js.namedLogger().Infow("Reusing existing Stream", "stream-info", info) - return nil - // if nats server was restarted, the stream needs to be recreated for memory storage type - // and hence we get ErrConnectionClosed for the lost stream - } else if !errors.Is(err, nats.ErrStreamNotFound) { - js.namedLogger().Debugw("The connection to NATS server is not established!") +func (js *JetStream) ensureStreamExistsAndIsConfiguredCorrectly() error { + streamConfig, err := getStreamConfig(js.Config) + if err != nil { return err } - streamConfig, err := getStreamConfig(js.Config) + info, err := js.jsCtx.StreamInfo(js.Config.JSStreamName) + if errors.Is(err, nats.ErrStreamNotFound) { + info, err = js.jsCtx.AddStream(streamConfig) + if err != nil { + return err + } + js.namedLogger().Infow("Stream not found, created a new Stream", + "stream-info", info) + return nil + } if err != nil { return err } - js.namedLogger().Infow("Stream not found, creating a new Stream", - "streamName", js.Config.JSStreamName, "streamStorageType", streamConfig.Storage, "subjects", streamConfig.Subjects) - _, err = js.jsCtx.AddStream(streamConfig) - return err + + if !streamIsConfiguredCorrectly(info.Config, *streamConfig) { + newInfo, err := js.jsCtx.UpdateStream(streamConfig) + if err != nil { + return err + } + js.namedLogger().Infow("Updated existing Stream:", "stream-info", newInfo) + return nil + } + + js.namedLogger().Infow("Reusing existing Stream", "stream-info", info) + return nil +} + +func streamIsConfiguredCorrectly(got nats.StreamConfig, want nats.StreamConfig) bool { + return reflect.DeepEqual(got, want) } func (js *JetStream) initJSContext() error { diff --git a/components/eventing-controller/pkg/backend/jetstreamv2/jetstream_integration_test.go b/components/eventing-controller/pkg/backend/jetstreamv2/jetstream_integration_test.go index 6bd7d22f7134..2c79de3770c5 100644 --- a/components/eventing-controller/pkg/backend/jetstreamv2/jetstream_integration_test.go +++ b/components/eventing-controller/pkg/backend/jetstreamv2/jetstream_integration_test.go @@ -8,6 +8,9 @@ import ( "github.com/stretchr/testify/assert" kymalogger "github.com/kyma-project/kyma/common/logging/logger" + "github.com/nats-io/nats.go" + "github.com/stretchr/testify/require" + eventingv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" "github.com/kyma-project/kyma/components/eventing-controller/logger" "github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/cleaner" @@ -17,8 +20,6 @@ import ( "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" evtesting "github.com/kyma-project/kyma/components/eventing-controller/testing" evtestingv2 "github.com/kyma-project/kyma/components/eventing-controller/testing/v2" - "github.com/nats-io/nats.go" - "github.com/stretchr/testify/require" ) // TestJetStream_SubscriptionDeletion tests the creation and deletion @@ -1017,6 +1018,7 @@ func defaultNatsConfig(url string) env.NatsConfig { JSStreamName: DefaultStreamName, JSStreamStorageType: StorageTypeMemory, JSStreamRetentionPolicy: RetentionPolicyInterest, + JSStreamDiscardPolicy: DiscardPolicyNew, } } diff --git a/components/eventing-controller/pkg/backend/jetstreamv2/utils.go b/components/eventing-controller/pkg/backend/jetstreamv2/utils.go index a29ce3d5ebe7..6bff74cd7926 100644 --- a/components/eventing-controller/pkg/backend/jetstreamv2/utils.go +++ b/components/eventing-controller/pkg/backend/jetstreamv2/utils.go @@ -6,11 +6,13 @@ import ( "fmt" "strings" + "github.com/nats-io/nats.go" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" + eventingv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" "github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/cleaner" "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" - "github.com/nats-io/nats.go" - "k8s.io/apimachinery/pkg/types" ) const ( @@ -20,6 +22,9 @@ const ( RetentionPolicyLimits = "limits" RetentionPolicyInterest = "interest" + DiscardPolicyNew = "new" + DiscardPolicyOld = "old" + ConsumerDeliverPolicyAll = "all" ConsumerDeliverPolicyLast = "last" ConsumerDeliverPolicyLastPerSubject = "last_per_subject" @@ -64,6 +69,16 @@ func toJetStreamRetentionPolicy(s string) (nats.RetentionPolicy, error) { return nats.LimitsPolicy, fmt.Errorf("invalid stream retention policy %q", s) } +func toJetStreamDiscardPolicy(s string) (nats.DiscardPolicy, error) { + switch s { + case DiscardPolicyOld: + return nats.DiscardOld, nil + case DiscardPolicyNew: + return nats.DiscardNew, nil + } + return nats.DiscardNew, fmt.Errorf("invalid stream discard policy %q", s) +} + // toJetStreamConsumerDeliverPolicyOpt returns a nats.DeliverPolicy opt based on the given deliver policy string value. // It returns "DeliverNew" as the default nats.DeliverPolicy opt, if the given deliver policy value is not supported. // Supported deliver policy values are ("all", "last", "last_per_subject" and "new"). @@ -103,17 +118,34 @@ func getStreamConfig(natsConfig env.NatsConfig) (*nats.StreamConfig, error) { if err != nil { return nil, err } + retentionPolicy, err := toJetStreamRetentionPolicy(natsConfig.JSStreamRetentionPolicy) if err != nil { return nil, err } + + // Quantities must not be empty. So we default here to "-1" + if natsConfig.JSStreamMaxBytes == "" { + natsConfig.JSStreamMaxBytes = "-1" + } + maxBytes, err := resource.ParseQuantity(natsConfig.JSStreamMaxBytes) + if err != nil { + return nil, err + } + + discardPolicy, err := toJetStreamDiscardPolicy(natsConfig.JSStreamDiscardPolicy) + if err != nil { + return nil, err + } + streamConfig := &nats.StreamConfig{ Name: natsConfig.JSStreamName, Storage: storage, Replicas: natsConfig.JSStreamReplicas, Retention: retentionPolicy, MaxMsgs: natsConfig.JSStreamMaxMessages, - MaxBytes: natsConfig.JSStreamMaxBytes, + MaxBytes: maxBytes.Value(), + Discard: discardPolicy, // Since one stream is used to store events of all types, the stream has to match all event types, and therefore // we use the wildcard char >. However, to avoid matching internal JetStream and non-Kyma-related subjects, we // use a prefix. This prefix is handled only on the JetStream level (i.e. JetStream handler @@ -212,9 +244,9 @@ func isJsSubAssociatedWithKymaSub(jsSubKey SubscriptionSubjectIdentifier, return createKeyPrefix(subscription) == jsSubKey.NamespacedName() } -//---------------------------------------- +// ---------------------------------------- // SubscriptionSubjectIdentifier utils -//---------------------------------------- +// ---------------------------------------- // NamespacedName returns the Kubernetes namespaced name. func (s SubscriptionSubjectIdentifier) NamespacedName() string { diff --git a/components/eventing-controller/pkg/backend/jetstreamv2/utils_unit_test.go b/components/eventing-controller/pkg/backend/jetstreamv2/utils_unit_test.go index 6177097779cf..b90dcd901c2e 100644 --- a/components/eventing-controller/pkg/backend/jetstreamv2/utils_unit_test.go +++ b/components/eventing-controller/pkg/backend/jetstreamv2/utils_unit_test.go @@ -5,15 +5,17 @@ import ( "reflect" "testing" - "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "github.com/nats-io/nats.go" + "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" + kymalogger "github.com/kyma-project/kyma/common/logging/logger" + "github.com/stretchr/testify/require" + eventingv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" "github.com/kyma-project/kyma/components/eventing-controller/logger" "github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/cleaner" evtestingv2 "github.com/kyma-project/kyma/components/eventing-controller/testing/v2" - "github.com/stretchr/testify/require" ) // maxJetStreamConsumerNameLength is the maximum preferred length for the JetStream consumer names @@ -132,10 +134,12 @@ func TestGetStreamConfig(t *testing.T) { JSStreamRetentionPolicy: RetentionPolicyLimits, JSStreamReplicas: 3, JSStreamMaxMessages: -1, - JSStreamMaxBytes: -1, + JSStreamMaxBytes: "-1", + JSStreamDiscardPolicy: DiscardPolicyNew, }, wantStreamConfig: &nats.StreamConfig{ Name: DefaultStreamName, + Discard: nats.DiscardNew, Storage: nats.MemoryStorage, Replicas: 3, Retention: nats.LimitsPolicy, @@ -145,13 +149,63 @@ func TestGetStreamConfig(t *testing.T) { }, wantError: false, }, + { + name: "Should parse MaxBytes correctly without unit", + givenNatsConfig: env.NatsConfig{ + JSStreamName: DefaultStreamName, + JSStreamStorageType: StorageTypeMemory, + JSStreamRetentionPolicy: RetentionPolicyLimits, + JSStreamDiscardPolicy: DiscardPolicyNew, + JSStreamReplicas: 3, + JSStreamMaxMessages: -1, + JSStreamMaxBytes: "10485760", + }, + wantStreamConfig: &nats.StreamConfig{ + Name: DefaultStreamName, + Discard: nats.DiscardNew, + Storage: nats.MemoryStorage, + Replicas: 3, + Retention: nats.LimitsPolicy, + MaxMsgs: -1, + MaxBytes: 10485760, + Subjects: []string{fmt.Sprintf("%s.>", env.JetStreamSubjectPrefix)}, + }, + wantError: false, + }, + { + name: "Should parse MaxBytes correctly with unit", + givenNatsConfig: env.NatsConfig{ + JSStreamName: DefaultStreamName, + JSStreamStorageType: StorageTypeMemory, + JSStreamDiscardPolicy: DiscardPolicyNew, + JSStreamRetentionPolicy: RetentionPolicyLimits, + JSStreamReplicas: 3, + JSStreamMaxMessages: -1, + JSStreamMaxBytes: "10Mi", + }, + wantStreamConfig: &nats.StreamConfig{ + Name: DefaultStreamName, + Discard: nats.DiscardNew, + Storage: nats.MemoryStorage, + Replicas: 3, + Retention: nats.LimitsPolicy, + MaxMsgs: -1, + MaxBytes: 10485760, + Subjects: []string{fmt.Sprintf("%s.>", env.JetStreamSubjectPrefix)}, + }, + wantError: false, + }, } for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() streamConfig, err := getStreamConfig(tc.givenNatsConfig) - require.Equal(t, tc.wantError, err != nil) + if tc.wantError { + require.Error(t, err) + } else { + require.NoError(t, err) + } require.Equal(t, tc.wantStreamConfig, streamConfig) }) } diff --git a/components/eventing-controller/pkg/backend/nats/jetstream/jetstream.go b/components/eventing-controller/pkg/backend/nats/jetstream/jetstream.go index 1ccabd9d3f7f..4846decdac07 100644 --- a/components/eventing-controller/pkg/backend/nats/jetstream/jetstream.go +++ b/components/eventing-controller/pkg/backend/nats/jetstream/jetstream.go @@ -6,10 +6,13 @@ import ( "encoding/hex" "fmt" "net/http" + "reflect" "strings" "sync" "time" + "k8s.io/apimachinery/pkg/api/resource" + backendutils "github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/utils" cev2 "github.com/cloudevents/sdk-go/v2" @@ -147,7 +150,7 @@ func (js *JetStream) Initialize(connCloseHandler backendnats.ConnClosedHandler) if err := js.initCloudEventClient(js.Config); err != nil { return err } - return js.ensureStreamExists() + return js.ensureStreamExistsAndIsConfiguredCorrectly() } func (js *JetStream) SyncSubscription(subscription *eventingv1alpha1.Subscription) error { @@ -286,12 +289,15 @@ func (js *JetStream) validateConfig() error { if _, err := toJetStreamRetentionPolicy(js.Config.JSStreamRetentionPolicy); err != nil { return err } + if _, err := toJetStreamDiscardPolicy(js.Config.JSStreamDiscardPolicy); err != nil { + return err + } return nil } func (js *JetStream) handleReconnect(_ *nats.Conn) { js.namedLogger().Infow("Called reconnect handler for JetStream") - if err := js.ensureStreamExists(); err != nil { + if err := js.ensureStreamExistsAndIsConfiguredCorrectly(); err != nil { js.namedLogger().Errorw("Failed to ensure the stream exists", "error", err) } } @@ -326,25 +332,40 @@ func (js *JetStream) initJSContext() error { return nil } -func (js *JetStream) ensureStreamExists() error { - if info, err := js.jsCtx.StreamInfo(js.Config.JSStreamName); err == nil { - // TODO: in case the stream exists, should we make sure all of its configs matches the stream config in the chart? - js.namedLogger().Infow("Reusing existing Stream", "stream-info", info) - return nil - // if nats server was restarted, the stream needs to be recreated for memory storage type - // and hence we get ErrConnectionClosed for the lost stream - } else if err != nats.ErrStreamNotFound { - js.namedLogger().Debugw("The connection to NATS server is not established!") +func (js *JetStream) ensureStreamExistsAndIsConfiguredCorrectly() error { + streamConfig, err := getStreamConfig(js.Config) + if err != nil { return err } - streamConfig, err := getStreamConfig(js.Config) + info, err := js.jsCtx.StreamInfo(js.Config.JSStreamName) + if errors.Is(err, nats.ErrStreamNotFound) { + info, err = js.jsCtx.AddStream(streamConfig) + if err != nil { + return err + } + js.namedLogger().Infow("Stream not found, created a new Stream", + "stream-info", info) + return nil + } if err != nil { return err } - js.namedLogger().Infow("Stream not found, creating a new Stream", - "streamName", js.Config.JSStreamName, "streamStorageType", streamConfig.Storage, "subjects", streamConfig.Subjects) - _, err = js.jsCtx.AddStream(streamConfig) - return err + + if !streamIsConfiguredCorrectly(info.Config, *streamConfig) { + newInfo, err := js.jsCtx.UpdateStream(streamConfig) + if err != nil { + return err + } + js.namedLogger().Infow("Updated existing Stream:", "stream-info", newInfo) + return nil + } + + js.namedLogger().Infow("Reusing existing Stream", "stream-info", info) + return nil +} + +func streamIsConfiguredCorrectly(got nats.StreamConfig, want nats.StreamConfig) bool { + return reflect.DeepEqual(got, want) } func getStreamConfig(natsConfig env.NatsConfig) (*nats.StreamConfig, error) { @@ -356,13 +377,27 @@ func getStreamConfig(natsConfig env.NatsConfig) (*nats.StreamConfig, error) { if err != nil { return nil, err } + + // Quantities must not be empty. So we default here to "-1" + if natsConfig.JSStreamMaxBytes == "" { + natsConfig.JSStreamMaxBytes = "-1" + } + maxBytes, err := resource.ParseQuantity(natsConfig.JSStreamMaxBytes) + if err != nil { + return nil, err + } + discardPolicy, err := toJetStreamDiscardPolicy(natsConfig.JSStreamDiscardPolicy) + if err != nil { + return nil, err + } streamConfig := &nats.StreamConfig{ Name: natsConfig.JSStreamName, Storage: storage, Replicas: natsConfig.JSStreamReplicas, Retention: retentionPolicy, MaxMsgs: natsConfig.JSStreamMaxMessages, - MaxBytes: natsConfig.JSStreamMaxBytes, + MaxBytes: maxBytes.Value(), + Discard: discardPolicy, // Since one stream is used to store events of all types, the stream has to match all event types, and therefore // we use the wildcard char >. However, to avoid matching internal JetStream and non-Kyma-related subjects, we // use a prefix. This prefix is handled only on the JetStream level (i.e. JetStream handler diff --git a/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_integration_test.go b/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_integration_test.go index 9579d913f3bd..aa827b71e83f 100644 --- a/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_integration_test.go +++ b/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_integration_test.go @@ -8,13 +8,14 @@ import ( "time" kymalogger "github.com/kyma-project/kyma/common/logging/logger" - cleanerv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/cleaner" - "github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/jetstreamv2" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + cleanerv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/cleaner" + "github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/jetstreamv2" + eventingv1alpha1 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1" "github.com/kyma-project/kyma/components/eventing-controller/logger" "github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/eventtype" @@ -1004,6 +1005,7 @@ func defaultNatsConfig(url string) env.NatsConfig { JSStreamName: defaultStreamName, JSStreamStorageType: StorageTypeMemory, JSStreamRetentionPolicy: RetentionPolicyInterest, + JSStreamDiscardPolicy: DiscardPolicyNew, } } diff --git a/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_util.go b/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_util.go index 83d3e7822995..391bc452333e 100644 --- a/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_util.go +++ b/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_util.go @@ -13,12 +13,17 @@ const ( RetentionPolicyLimits = "limits" RetentionPolicyInterest = "interest" + DiscardPolicyNew = "new" + DiscardPolicyOld = "old" + ConsumerDeliverPolicyAll = "all" ConsumerDeliverPolicyLast = "last" ConsumerDeliverPolicyLastPerSubject = "last_per_subject" ConsumerDeliverPolicyNew = "new" ) +var ErrInvalidConfiguration = fmt.Errorf("invalid jetstream configuration") + func toJetStreamStorageType(s string) (nats.StorageType, error) { switch s { case StorageTypeMemory: @@ -26,7 +31,7 @@ func toJetStreamStorageType(s string) (nats.StorageType, error) { case StorageTypeFile: return nats.FileStorage, nil } - return nats.MemoryStorage, fmt.Errorf("invalid stream storage type %q", s) + return nats.MemoryStorage, fmt.Errorf("%w: storage type: %q", ErrInvalidConfiguration, s) } func toJetStreamRetentionPolicy(s string) (nats.RetentionPolicy, error) { @@ -36,7 +41,17 @@ func toJetStreamRetentionPolicy(s string) (nats.RetentionPolicy, error) { case RetentionPolicyInterest: return nats.InterestPolicy, nil } - return nats.LimitsPolicy, fmt.Errorf("invalid stream retention policy %q", s) + return nats.LimitsPolicy, fmt.Errorf("%w: stream retention policy: %q", ErrInvalidConfiguration, s) +} + +func toJetStreamDiscardPolicy(s string) (nats.DiscardPolicy, error) { + switch s { + case DiscardPolicyOld: + return nats.DiscardOld, nil + case DiscardPolicyNew: + return nats.DiscardNew, nil + } + return nats.DiscardNew, fmt.Errorf("%w: stream discard policy: %q", ErrInvalidConfiguration, s) } // toJetStreamConsumerDeliverPolicyOpt returns a nats.DeliverPolicy opt based on the given deliver policy string value. diff --git a/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_v1v2_integration_test.go b/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_v1v2_integration_test.go index b4c0330794f1..2a4bda9cfec2 100644 --- a/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_v1v2_integration_test.go +++ b/components/eventing-controller/pkg/backend/nats/jetstream/jetstream_v1v2_integration_test.go @@ -63,8 +63,9 @@ func TestJetStreamInitialize_StreamExists(t *testing.T) { // A stream already exists createdStreamInfo, err := jsClient.AddStream(&nats.StreamConfig{ - Name: natsConfig.JSStreamName, - Storage: nats.MemoryStorage, + Name: natsConfig.JSStreamName, + Storage: nats.MemoryStorage, + Retention: nats.InterestPolicy, // retention policies cannot be changed on an existing stream }) require.NotNil(t, createdStreamInfo) require.NoError(t, err) diff --git a/components/eventing-controller/pkg/env/nats_config.go b/components/eventing-controller/pkg/env/nats_config.go index 022cadc5d4fc..3a187c6c5bc6 100644 --- a/components/eventing-controller/pkg/env/nats_config.go +++ b/components/eventing-controller/pkg/env/nats_config.go @@ -40,7 +40,11 @@ type NatsConfig struct { // configured via JSStreamMaxMessages and JSStreamMaxBytes. JSStreamRetentionPolicy string `envconfig:"JS_STREAM_RETENTION_POLICY" default:"interest"` JSStreamMaxMessages int64 `envconfig:"JS_STREAM_MAX_MSGS" default:"-1"` - JSStreamMaxBytes int64 `envconfig:"JS_STREAM_MAX_BYTES" default:"-1"` + JSStreamMaxBytes string `envconfig:"JS_STREAM_MAX_BYTES" default:"-1"` + // JSStreamDiscardPolicy specifies wich events to discard from the stream in case limits are reached + // new: reject new messages for the stream + // old: discard old messages from the stream to make room for new messages + JSStreamDiscardPolicy string `envconfig:"JS_STREAM_DISCARD_POLICY" default:"new"` // Deliver Policy determines for a consumer where in the stream it starts receiving messages // (more info https://docs.nats.io/nats-concepts/jetstream/consumers#deliverpolicy-optstartseq-optstarttime): // - all: The consumer starts receiving from the earliest available message. diff --git a/components/eventing-controller/pkg/env/nats_config_test.go b/components/eventing-controller/pkg/env/nats_config_test.go index 5fab9c707d1a..0ea0adfb656c 100644 --- a/components/eventing-controller/pkg/env/nats_config_test.go +++ b/components/eventing-controller/pkg/env/nats_config_test.go @@ -47,8 +47,9 @@ func TestGetNatsConfig(t *testing.T) { JSStreamReplicas: 1, JSStreamRetentionPolicy: "interest", JSStreamMaxMessages: -1, - JSStreamMaxBytes: -1, + JSStreamMaxBytes: "-1", JSConsumerDeliverPolicy: "new", + JSStreamDiscardPolicy: "new", EnableNewCRDVersion: false, }, wantErr: false, @@ -70,6 +71,7 @@ func TestGetNatsConfig(t *testing.T) { "JS_STREAM_MAX_BYTES": "6", "JS_CONSUMER_DELIVER_POLICY": "jcdp", "ENABLE_NEW_CRD_VERSION": "true", + "JS_STREAM_DISCARD_POLICY": "jsdp", }, maxReconnects: 1, reconnectWait: 1 * time.Second, @@ -88,9 +90,10 @@ func TestGetNatsConfig(t *testing.T) { JSStreamReplicas: 4, JSStreamRetentionPolicy: "jsrp", JSStreamMaxMessages: 5, - JSStreamMaxBytes: 6, + JSStreamMaxBytes: "6", JSConsumerDeliverPolicy: "jcdp", EnableNewCRDVersion: true, + JSStreamDiscardPolicy: "jsdp", }, wantErr: false, }, diff --git a/components/eventing-controller/pkg/subscriptionmanager/jetstream/jetstream_test.go b/components/eventing-controller/pkg/subscriptionmanager/jetstream/jetstream_test.go index d6599191ada2..cbdafb0d8cb8 100644 --- a/components/eventing-controller/pkg/subscriptionmanager/jetstream/jetstream_test.go +++ b/components/eventing-controller/pkg/subscriptionmanager/jetstream/jetstream_test.go @@ -91,6 +91,7 @@ func getNATSConf(natsURL string) env.NatsConfig { JSStreamName: controllertesting.EventTypePrefix, JSStreamStorageType: backendjetstream.StorageTypeMemory, JSStreamRetentionPolicy: backendjetstream.RetentionPolicyInterest, + JSStreamDiscardPolicy: backendjetstream.DiscardPolicyNew, } } diff --git a/resources/eventing/charts/controller/templates/deployment.yaml b/resources/eventing/charts/controller/templates/deployment.yaml index 2be2cd3a9009..46b33c027386 100644 --- a/resources/eventing/charts/controller/templates/deployment.yaml +++ b/resources/eventing/charts/controller/templates/deployment.yaml @@ -72,6 +72,8 @@ spec: value: {{ .Values.global.jetstream.storage | quote }} - name: JS_STREAM_REPLICAS value: {{ .Values.jetstream.streamReplicas | quote }} + - name: JS_STREAM_DISCARD_POLICY + value: {{ .Values.global.jetstream.discardPolicy | quote }} - name: JS_STREAM_RETENTION_POLICY value: {{ .Values.jetstream.retentionPolicy | quote }} - name: JS_CONSUMER_DELIVER_POLICY @@ -79,7 +81,7 @@ spec: - name: JS_STREAM_MAX_MSGS value: {{ .Values.jetstream.maxMessages | quote }} - name: JS_STREAM_MAX_BYTES - value: {{ .Values.jetstream.maxBytes | quote }} + value: {{ .Values.global.jetstream.maxBytes | quote }} - name: WEBHOOK_SECRET_NAME value: {{ .Values.webhook.secretName | quote }} - name: MUTATING_WEBHOOK_NAME diff --git a/resources/eventing/charts/nats/templates/statefulset.yaml b/resources/eventing/charts/nats/templates/statefulset.yaml index 00f81b3639b4..5bd235d804b6 100644 --- a/resources/eventing/charts/nats/templates/statefulset.yaml +++ b/resources/eventing/charts/nats/templates/statefulset.yaml @@ -285,7 +285,7 @@ spec: {{- toYaml .Values.nats.jetstream.fileStorage.accessModes | nindent 10 }} resources: requests: - storage: {{ .Values.nats.jetstream.fileStorage.size }} + storage: {{ .Values.global.jetstream.fileStorage.size }} {{- if .Values.nats.jetstream.fileStorage.storageClassName }} storageClassName: {{ .Values.nats.jetstream.fileStorage.storageClassName | quote }} {{- end }} diff --git a/resources/eventing/profile-evaluation.yaml b/resources/eventing/profile-evaluation.yaml index 71de46d5c06e..eaa014c4c3e1 100644 --- a/resources/eventing/profile-evaluation.yaml +++ b/resources/eventing/profile-evaluation.yaml @@ -1,6 +1,9 @@ global: jetstream: storage: file + maxBytes: 900Mi + fileStorage: + size: 1Gi controller: jetstream: @@ -8,7 +11,6 @@ controller: streamReplicas: 1 consumerDeliverPolicy: new maxMessages: -1 - maxBytes: -1 resources: limits: cpu: 20m @@ -36,8 +38,6 @@ nats: memStorage: enabled: true size: 1Gi - fileStorage: - size: 1Gi resources: limits: cpu: 20m diff --git a/resources/eventing/profile-production.yaml b/resources/eventing/profile-production.yaml index d29bb45dce2e..db7f1ae51da1 100644 --- a/resources/eventing/profile-production.yaml +++ b/resources/eventing/profile-production.yaml @@ -1,6 +1,9 @@ global: jetstream: storage: file + maxBytes: 900Mi + fileStorage: + size: 1Gi controller: jetstream: @@ -8,7 +11,6 @@ controller: streamReplicas: 3 consumerDeliverPolicy: new maxMessages: -1 - maxBytes: -1 resources: limits: cpu: 500m @@ -36,8 +38,6 @@ nats: memStorage: enabled: true size: 1Gi - fileStorage: - size: 1Gi resources: limits: cpu: 500m diff --git a/resources/eventing/values.yaml b/resources/eventing/values.yaml index 34320839e16e..5bcdfe0c02da 100644 --- a/resources/eventing/values.yaml +++ b/resources/eventing/values.yaml @@ -5,11 +5,11 @@ global: images: eventing_controller: name: eventing-controller - version: v20221111-830eca7c + version: v20221117-fb645c67 pullPolicy: "IfNotPresent" publisher_proxy: name: event-publisher-proxy - version: v20221108-efd0d663 + version: v20221117-ae62e1ed nats: name: nats version: 2.9.6-alpine3.16 @@ -26,6 +26,10 @@ global: jetstream: # Storage type of the stream, memory or file. storage: file + fileStorage: + size: 1Gi + maxBytes: 900Mi + discardPolicy: new # secretName defines optionally another name than the default secret name secretName: ""