Skip to content

Commit

Permalink
update eventing from main (#16122)
Browse files Browse the repository at this point in the history
* Change http response code based on backend error (#16115)

* introduce common errors to allow returning custom http responses

* bump image

* evaluate API Error returned by nats

* fix code review comments

* update stream configuration according to env vars (#16111)

* update stream configuration according to env vars

* fix tests

* disable strict shadowing linting, to allow shadowing of `err`

* parse units in maxbytes environment

* fix more tests

* set discardPolicy

* fix review comments

* bump images
  • Loading branch information
k15r authored Nov 18, 2022
1 parent 9c20fac commit 89d62b2
Show file tree
Hide file tree
Showing 23 changed files with 381 additions and 84 deletions.
15 changes: 13 additions & 2 deletions components/event-publisher-proxy/pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handler

import (
"context"
"errors"
"net/http"
"time"

Expand Down Expand Up @@ -112,7 +113,13 @@ func (h *Handler) publishLegacyEventsAsCE(writer http.ResponseWriter, request *h
result, err := h.sendEventAndRecordMetrics(ctx, event, h.Sender.URL(), request.Header)
if err != nil {
h.namedLogger().With().Error(err)
h.LegacyTransformer.TransformsCEResponseToLegacyResponse(writer, http.StatusInternalServerError, event, err.Error())
httpStatus := http.StatusInternalServerError
if errors.Is(err, sender.ErrInsufficientStorage) {
httpStatus = http.StatusInsufficientStorage
} else if errors.Is(err, sender.ErrBackendTargetNotFound) {
httpStatus = http.StatusBadGateway
}
h.LegacyTransformer.TransformsCEResponseToLegacyResponse(writer, httpStatus, event, err.Error())
return
}
h.namedLogger().With().Debug(result)
Expand Down Expand Up @@ -150,7 +157,11 @@ func (h *Handler) publishCloudEvents(writer http.ResponseWriter, request *http.R

result, err := h.sendEventAndRecordMetrics(ctx, event, h.Sender.URL(), request.Header)
if err != nil {
writer.WriteHeader(http.StatusInternalServerError)
httpStatus := http.StatusInternalServerError
if errors.Is(err, sender.ErrInsufficientStorage) {
httpStatus = http.StatusInsufficientStorage
}
writer.WriteHeader(httpStatus)
h.namedLogger().With().Error(err)
return
}
Expand Down
63 changes: 63 additions & 0 deletions components/event-publisher-proxy/pkg/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,25 @@ func TestHandler_publishCloudEvents(t *testing.T) {
eventing_epp_errors_total 1
`,
},
{
name: "Publish binary CloudEvent but backend is full",
fields: fields{
Sender: &GenericSenderStub{
Err: fmt.Errorf("oh no, i cannot send: %w", sender.ErrInsufficientStorage),
},
collector: metrics.NewCollector(latency),
eventTypeCleaner: &eventtypetest.CleanerStub{},
},
args: args{
request: CreateValidBinaryRequest(t),
},
wantStatus: 507,
wantTEF: `
# HELP eventing_epp_errors_total The total number of errors while sending events to the messaging server
# TYPE eventing_epp_errors_total counter
eventing_epp_errors_total 1
`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -429,6 +448,50 @@ func TestHandler_publishLegacyEventsAsCE(t *testing.T) {
eventing_epp_requests_total{destination_service="FOO",response_code="204"} 1
`,
},
{
name: "Send valid legacy event but cannot send to backend due to target not found (e.g. stream missing)",
fields: fields{
Sender: &GenericSenderStub{
Err: fmt.Errorf("oh no, i cannot send: %w", sender.ErrBackendTargetNotFound),
BackendURL: "FOO",
},
LegacyTransformer: legacy.NewTransformer("namespace", "im.a.prefix", NewApplicationListerOrDie(context.Background(), "testapp")),
collector: metrics.NewCollector(latency),
eventTypeCleaner: eventtypetest.CleanerStub{},
},
args: args{
request: legacytest.ValidLegacyRequestOrDie(t, "v1", "testapp", "object.created"),
},
wantStatus: http.StatusBadGateway,
wantOk: false,
wantTEF: `
# HELP eventing_epp_errors_total The total number of errors while sending events to the messaging server
# TYPE eventing_epp_errors_total counter
eventing_epp_errors_total 1
`,
},
{
name: "Send valid legacy event but cannot send to backend due to full storage",
fields: fields{
Sender: &GenericSenderStub{
Err: fmt.Errorf("oh no, i cannot send: %w", sender.ErrInsufficientStorage),
BackendURL: "FOO",
},
LegacyTransformer: legacy.NewTransformer("namespace", "im.a.prefix", NewApplicationListerOrDie(context.Background(), "testapp")),
collector: metrics.NewCollector(latency),
eventTypeCleaner: eventtypetest.CleanerStub{},
},
args: args{
request: legacytest.ValidLegacyRequestOrDie(t, "v1", "testapp", "object.created"),
},
wantStatus: 507,
wantOk: false,
wantTEF: `
# HELP eventing_epp_errors_total The total number of errors while sending events to the messaging server
# TYPE eventing_epp_errors_total counter
eventing_epp_errors_total 1
`,
},
{
name: "Send valid legacy event but cannot send to backend",
fields: fields{
Expand Down
14 changes: 11 additions & 3 deletions components/event-publisher-proxy/pkg/sender/jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

const (
JSStoreFailedCode = 10077
natsBackend = "nats"
jestreamHandlerName = "jetstream-handler"
noSpaceLeftErrMessage = "no space left on device"
Expand Down Expand Up @@ -81,12 +82,19 @@ func (s *Sender) Send(_ context.Context, event *event.Event) (sender.PublishResu
if err != nil {
s.namedLogger().Errorw("Cannot send event to backend", "error", err)
if errors.Is(err, nats.ErrNoStreamResponse) {
return nil, fmt.Errorf("%w : %v", ErrCannotSendToStream, err)
return nil, fmt.Errorf("%w : %v", sender.ErrBackendTargetNotFound, fmt.Errorf("%w, %v", ErrCannotSendToStream, err))
}

var apiErr nats.JetStreamError
if ok := errors.As(err, &apiErr); ok {
if apiErr.APIError().ErrorCode == JSStoreFailedCode {
return nil, fmt.Errorf("%w: %v", sender.ErrInsufficientStorage, err)
}
}
if strings.Contains(err.Error(), noSpaceLeftErrMessage) {
return nil, err
return nil, fmt.Errorf("%w: %v", sender.ErrInsufficientStorage, err)
}
return nil, fmt.Errorf("%w: %v", ErrCannotSendToStream, err)
return nil, fmt.Errorf("%w : %v", sender.ErrInternalBackendError, fmt.Errorf("%w, %v", ErrCannotSendToStream, err))
}
return beb.HTTPPublishResult{Status: http.StatusNoContent}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,44 @@ import (
"github.com/stretchr/testify/assert"

"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/env"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/sender"
testingutils "github.com/kyma-project/kyma/components/event-publisher-proxy/testing"
)

func TestJetStreamMessageSender(t *testing.T) {
testCases := []struct {
name string
givenStream bool
givenStreamMaxBytes int64
givenNATSConnectionClosed bool
wantError bool
wantErr error
wantStatusCode int
}{
{
name: "send in jetstream mode should not succeed if stream doesn't exist",
givenStream: false,
givenNATSConnectionClosed: false,
wantError: true,
wantStatusCode: http.StatusNotFound,
wantErr: sender.ErrBackendTargetNotFound,
},
{
name: "send in jetstream mode should not succeed if stream is full",
givenStream: true,
givenStreamMaxBytes: 1,
givenNATSConnectionClosed: false,
wantErr: sender.ErrInsufficientStorage,
},
{
name: "send in jetstream mode should succeed if NATS connection is open and the stream exists",
givenStream: true,
givenStreamMaxBytes: 5000,
givenNATSConnectionClosed: false,
wantError: false,
wantErr: nil,
wantStatusCode: http.StatusNoContent,
},
{
name: "send in jetstream mode should fail if NATS connection is not open",
givenNATSConnectionClosed: true,
wantError: true,
wantStatusCode: http.StatusBadGateway,
wantErr: ErrNotConnected,
},
}

Expand All @@ -66,7 +74,10 @@ func TestJetStreamMessageSender(t *testing.T) {
}()

if tc.givenStream {
addStream(t, connection, getStreamConfig())
sc := getStreamConfig(tc.givenStreamMaxBytes)
cc := getConsumerConfig()
addStream(t, connection, sc)
addConsumer(t, connection, sc, cc)
}

ce := createCloudEvent(t)
Expand All @@ -84,8 +95,8 @@ func TestJetStreamMessageSender(t *testing.T) {
testEnv.Logger.WithContext().Errorf("err: %v", err)

// assert
assert.Equal(t, tc.wantError, err != nil)
if !tc.wantError {
assert.ErrorIs(t, err, tc.wantErr)
if tc.wantErr == nil {
assert.Equal(t, tc.wantStatusCode, status.HTTPStatus())
}
})
Expand Down Expand Up @@ -151,20 +162,40 @@ func createCloudEvent(t *testing.T) *event.Event {
}

// getStreamConfig inits a testing stream config.
func getStreamConfig() *nats.StreamConfig {
func getStreamConfig(maxBytes int64) *nats.StreamConfig {
return &nats.StreamConfig{
Name: testingutils.StreamName,
Subjects: []string{fmt.Sprintf("%s.>", env.JetStreamSubjectPrefix)},
Storage: nats.MemoryStorage,
Retention: nats.InterestPolicy,
Discard: nats.DiscardNew,
MaxBytes: maxBytes,
}
}

func getConsumerConfig() *nats.ConsumerConfig {
return &nats.ConsumerConfig{
Durable: "test",
DeliverPolicy: nats.DeliverAllPolicy,
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: fmt.Sprintf("%v.%v", env.JetStreamSubjectPrefix, testingutils.CloudEventTypeWithPrefix),
}
}

// addStream creates a stream for the test events.
func addStream(t *testing.T, connection *nats.Conn, config *nats.StreamConfig) {
js, err := connection.JetStream()
assert.NoError(t, err)
_, err = js.AddStream(config)
info, err := js.AddStream(config)
t.Logf("%+v", info)
assert.NoError(t, err)
}

func addConsumer(t *testing.T, connection *nats.Conn, sc *nats.StreamConfig, config *nats.ConsumerConfig) {
js, err := connection.JetStream()
assert.NoError(t, err)
info, err := js.AddConsumer(sc.Name, config)
t.Logf("%+v", info)
assert.NoError(t, err)
}

Expand Down
8 changes: 8 additions & 0 deletions components/event-publisher-proxy/pkg/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ package sender

import (
"context"
"errors"

"github.com/cloudevents/sdk-go/v2/event"
)

var (
ErrInsufficientStorage = errors.New("insufficient storage on backend")
ErrBackendTargetNotFound = errors.New("publishing target on backend not found")
ErrNoConnection = errors.New("no connection to backend")
ErrInternalBackendError = errors.New("internal error on backend")
)

type GenericSender interface {
Send(context.Context, *event.Event) (PublishResult, error)
URL() string
Expand Down
2 changes: 1 addition & 1 deletion components/eventing-controller/.golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ linters-settings:
shadow:
# Whether to be strict about shadowing; can be noisy.
# Default: false
strict: true
strict: false

nakedret:
# Make an issue if func has more lines of code than this setting, and it has naked returns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,10 @@ func startReconciler(eventTypePrefix string, ens *jetStreamTestEnsemble) *jetStr
EventTypePrefix: eventTypePrefix,
JSStreamName: reconcilertesting.JSStreamName,
JSStreamStorageType: "memory",
JSStreamMaxBytes: -1,
JSStreamMaxBytes: "-1",
JSStreamMaxMessages: -1,
JSStreamRetentionPolicy: "interest",
JSStreamDiscardPolicy: "new",
}

// prepare application-lister
Expand Down
Loading

0 comments on commit 89d62b2

Please sign in to comment.