From c57a28ce700d617ef065d440301002b6ddce51d0 Mon Sep 17 00:00:00 2001 From: Saylor Berman Date: Fri, 31 Jan 2025 13:37:38 -0700 Subject: [PATCH] CP/DP Split: Fix empty plus file, blocking calls (#3078) Problem: The NGINX Plus API conf file was empty when sending using OSS, which caused an error applying config. This also revealed an issue where we received multiple messages from agent, causing some channel blocking. Solution: Don't send the empty NGINX conf file if not running N+. Ignore responses from agent about rollbacks, so we only ever process a single response as expected. --- internal/mode/static/nginx/agent/agent.go | 5 +-- internal/mode/static/nginx/agent/command.go | 31 +++++++++++++++---- internal/mode/static/nginx/agent/file.go | 2 ++ internal/mode/static/nginx/config/plus_api.go | 6 ++-- .../mode/static/nginx/config/plus_api_test.go | 18 ++--------- 5 files changed, 35 insertions(+), 27 deletions(-) diff --git a/internal/mode/static/nginx/agent/agent.go b/internal/mode/static/nginx/agent/agent.go index 28a20f1872..58fad509db 100644 --- a/internal/mode/static/nginx/agent/agent.go +++ b/internal/mode/static/nginx/agent/agent.go @@ -85,10 +85,11 @@ func (n *NginxUpdaterImpl) UpdateConfig( deployment *Deployment, files []File, ) bool { - n.logger.Info("Sending nginx configuration to agent") - msg := deployment.SetFiles(files) applied := deployment.GetBroadcaster().Send(msg) + if applied { + n.logger.Info("Sent nginx configuration to agent") + } deployment.SetLatestConfigError(deployment.GetConfigurationStatus()) diff --git a/internal/mode/static/nginx/agent/command.go b/internal/mode/static/nginx/agent/command.go index 04b482ffba..236a34f57d 100644 --- a/internal/mode/static/nginx/agent/command.go +++ b/internal/mode/static/nginx/agent/command.go @@ -119,6 +119,8 @@ func (cs *commandService) CreateConnection( // If any connection or unrecoverable errors occur, return and agent should re-establish a subscription. // If errors occur with applying the config, log and put those errors into the status queue to be written // to the Gateway status. +// +//nolint:gocyclo // could be room for improvement here func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error { ctx := in.Context() @@ -179,6 +181,7 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error panic(fmt.Sprintf("unknown request type %d", msg.Type)) } + cs.logger.V(1).Info("Sending configuration to agent", "requestType", msg.Type) if err := msgr.Send(ctx, req); err != nil { cs.logger.Error(err, "error sending request to agent") deployment.SetPodErrorStatus(conn.PodName, err) @@ -189,7 +192,10 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error case err = <-msgr.Errors(): cs.logger.Error(err, "connection error", "pod", conn.PodName) deployment.SetPodErrorStatus(conn.PodName, err) - channels.ResponseCh <- struct{}{} + select { + case channels.ResponseCh <- struct{}{}: + default: + } if errors.Is(err, io.EOF) { return grpcStatus.Error(codes.Aborted, err.Error()) @@ -198,7 +204,11 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error case msg := <-msgr.Messages(): res := msg.GetCommandResponse() if res.GetStatus() != pb.CommandResponse_COMMAND_STATUS_OK { - err := fmt.Errorf("bad response from agent: msg: %s; error: %s", res.GetMessage(), res.GetError()) + if isRollbackMessage(res.GetMessage()) { + // we don't care about these messages, so ignore them + continue + } + err := fmt.Errorf("msg: %s; error: %s", res.GetMessage(), res.GetError()) deployment.SetPodErrorStatus(conn.PodName, err) } else { deployment.SetPodErrorStatus(conn.PodName, nil) @@ -268,6 +278,8 @@ func (cs *commandService) setInitialConfig( for _, action := range deployment.GetNGINXPlusActions() { // retry the API update request because sometimes nginx isn't quite ready after the config apply reload timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + var overallUpstreamApplyErr error + if err := wait.PollUntilContextCancel( timeoutCtx, 500*time.Millisecond, @@ -287,13 +299,14 @@ func (cs *commandService) setInitialConfig( } if upstreamApplyErr != nil { - return false, nil //nolint:nilerr // this error is collected at the end + overallUpstreamApplyErr = errors.Join(overallUpstreamApplyErr, upstreamApplyErr) + return false, nil } return true, nil }, ); err != nil { - if strings.Contains(err.Error(), "bad response from agent") { - errs = append(errs, err) + if overallUpstreamApplyErr != nil { + errs = append(errs, overallUpstreamApplyErr) } else { cancel() return err @@ -330,7 +343,7 @@ func (cs *commandService) waitForInitialConfigApply( case msg := <-msgr.Messages(): res := msg.GetCommandResponse() if res.GetStatus() != pb.CommandResponse_COMMAND_STATUS_OK { - applyErr := fmt.Errorf("bad response from agent: msg: %s; error: %s", res.GetMessage(), res.GetError()) + applyErr := fmt.Errorf("msg: %s; error: %s", res.GetMessage(), res.GetError()) return applyErr, nil } @@ -379,6 +392,12 @@ func buildRequest(fileOverviews []*pb.File, instanceID, version string) *pb.Mana } } +func isRollbackMessage(msg string) bool { + msgToLower := strings.ToLower(msg) + return strings.Contains(msgToLower, "rollback successful") || + strings.Contains(msgToLower, "rollback failed") +} + func buildPlusAPIRequest(action *pb.NGINXPlusAction, instanceID string) *pb.ManagementPlaneRequest { return &pb.ManagementPlaneRequest{ MessageMeta: &pb.MessageMeta{ diff --git a/internal/mode/static/nginx/agent/file.go b/internal/mode/static/nginx/agent/file.go index a4163ea187..35f26b628c 100644 --- a/internal/mode/static/nginx/agent/file.go +++ b/internal/mode/static/nginx/agent/file.go @@ -75,6 +75,8 @@ func (fs *fileService) GetFile( return nil, status.Errorf(codes.NotFound, "file not found") } + fs.logger.V(1).Info("Getting file for agent", "file", filename) + return &pb.GetFileResponse{ Contents: &pb.FileContents{ Contents: contents, diff --git a/internal/mode/static/nginx/config/plus_api.go b/internal/mode/static/nginx/config/plus_api.go index 9b1894fe30..d4988bb838 100644 --- a/internal/mode/static/nginx/config/plus_api.go +++ b/internal/mode/static/nginx/config/plus_api.go @@ -10,15 +10,15 @@ import ( var plusAPITemplate = gotemplate.Must(gotemplate.New("plusAPI").Parse(plusAPITemplateText)) func executePlusAPI(conf dataplane.Configuration) []executeResult { - result := executeResult{ - dest: nginxPlusConfigFile, - } + var result executeResult // if AllowedAddresses is empty, it means that we are not running on nginx plus, and we don't want this generated if conf.NginxPlus.AllowedAddresses != nil { result = executeResult{ dest: nginxPlusConfigFile, data: helpers.MustExecuteTemplate(plusAPITemplate, conf.NginxPlus), } + } else { + return nil } return []executeResult{result} diff --git a/internal/mode/static/nginx/config/plus_api_test.go b/internal/mode/static/nginx/config/plus_api_test.go index 6afb79142a..f664143402 100644 --- a/internal/mode/static/nginx/config/plus_api_test.go +++ b/internal/mode/static/nginx/config/plus_api_test.go @@ -43,21 +43,7 @@ func TestExecutePlusAPI_EmptyNginxPlus(t *testing.T) { } g := NewWithT(t) - expSubStrings := map[string]int{ - "listen unix:/var/run/nginx/nginx-plus-api.sock;": 0, - "access_log off;": 0, - "api write=on;": 0, - "listen 8765;": 0, - "root /usr/share/nginx/html;": 0, - "allow 127.0.0.1;": 0, - "deny all;": 0, - "location = /dashboard.html {}": 0, - "api write=off;": 0, - } - for expSubStr, expCount := range expSubStrings { - res := executePlusAPI(conf) - g.Expect(res).To(HaveLen(1)) - g.Expect(expCount).To(Equal(strings.Count(string(res[0].data), expSubStr))) - } + res := executePlusAPI(conf) + g.Expect(res).To(BeNil()) }