Skip to content

Commit

Permalink
CP/DP Split: Fix empty plus file, blocking calls (#3078)
Browse files Browse the repository at this point in the history
Problem: The NGINX Plus API conf file was empty when sending using OSS, which caused an error applying config. This also revealed an issue where we received multiple messages from agent, causing some channel blocking.

Solution: Don't send the empty NGINX conf file if not running N+. Ignore responses from agent about rollbacks, so we only ever process a single response as expected.
  • Loading branch information
sjberman authored Jan 31, 2025
1 parent 5ccb941 commit c57a28c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 27 deletions.
5 changes: 3 additions & 2 deletions internal/mode/static/nginx/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func (n *NginxUpdaterImpl) UpdateConfig(
deployment *Deployment,
files []File,
) bool {
n.logger.Info("Sending nginx configuration to agent")

msg := deployment.SetFiles(files)
applied := deployment.GetBroadcaster().Send(msg)
if applied {
n.logger.Info("Sent nginx configuration to agent")
}

deployment.SetLatestConfigError(deployment.GetConfigurationStatus())

Expand Down
31 changes: 25 additions & 6 deletions internal/mode/static/nginx/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func (cs *commandService) CreateConnection(
// If any connection or unrecoverable errors occur, return and agent should re-establish a subscription.
// If errors occur with applying the config, log and put those errors into the status queue to be written
// to the Gateway status.
//
//nolint:gocyclo // could be room for improvement here
func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error {
ctx := in.Context()

Expand Down Expand Up @@ -179,6 +181,7 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
panic(fmt.Sprintf("unknown request type %d", msg.Type))
}

cs.logger.V(1).Info("Sending configuration to agent", "requestType", msg.Type)
if err := msgr.Send(ctx, req); err != nil {
cs.logger.Error(err, "error sending request to agent")
deployment.SetPodErrorStatus(conn.PodName, err)
Expand All @@ -189,7 +192,10 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
case err = <-msgr.Errors():
cs.logger.Error(err, "connection error", "pod", conn.PodName)
deployment.SetPodErrorStatus(conn.PodName, err)
channels.ResponseCh <- struct{}{}
select {
case channels.ResponseCh <- struct{}{}:
default:
}

if errors.Is(err, io.EOF) {
return grpcStatus.Error(codes.Aborted, err.Error())
Expand All @@ -198,7 +204,11 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
case msg := <-msgr.Messages():
res := msg.GetCommandResponse()
if res.GetStatus() != pb.CommandResponse_COMMAND_STATUS_OK {
err := fmt.Errorf("bad response from agent: msg: %s; error: %s", res.GetMessage(), res.GetError())
if isRollbackMessage(res.GetMessage()) {
// we don't care about these messages, so ignore them
continue
}
err := fmt.Errorf("msg: %s; error: %s", res.GetMessage(), res.GetError())
deployment.SetPodErrorStatus(conn.PodName, err)
} else {
deployment.SetPodErrorStatus(conn.PodName, nil)
Expand Down Expand Up @@ -268,6 +278,8 @@ func (cs *commandService) setInitialConfig(
for _, action := range deployment.GetNGINXPlusActions() {
// retry the API update request because sometimes nginx isn't quite ready after the config apply reload
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
var overallUpstreamApplyErr error

if err := wait.PollUntilContextCancel(
timeoutCtx,
500*time.Millisecond,
Expand All @@ -287,13 +299,14 @@ func (cs *commandService) setInitialConfig(
}

if upstreamApplyErr != nil {
return false, nil //nolint:nilerr // this error is collected at the end
overallUpstreamApplyErr = errors.Join(overallUpstreamApplyErr, upstreamApplyErr)
return false, nil
}
return true, nil
},
); err != nil {
if strings.Contains(err.Error(), "bad response from agent") {
errs = append(errs, err)
if overallUpstreamApplyErr != nil {
errs = append(errs, overallUpstreamApplyErr)
} else {
cancel()
return err
Expand Down Expand Up @@ -330,7 +343,7 @@ func (cs *commandService) waitForInitialConfigApply(
case msg := <-msgr.Messages():
res := msg.GetCommandResponse()
if res.GetStatus() != pb.CommandResponse_COMMAND_STATUS_OK {
applyErr := fmt.Errorf("bad response from agent: msg: %s; error: %s", res.GetMessage(), res.GetError())
applyErr := fmt.Errorf("msg: %s; error: %s", res.GetMessage(), res.GetError())
return applyErr, nil
}

Expand Down Expand Up @@ -379,6 +392,12 @@ func buildRequest(fileOverviews []*pb.File, instanceID, version string) *pb.Mana
}
}

func isRollbackMessage(msg string) bool {
msgToLower := strings.ToLower(msg)
return strings.Contains(msgToLower, "rollback successful") ||
strings.Contains(msgToLower, "rollback failed")
}

func buildPlusAPIRequest(action *pb.NGINXPlusAction, instanceID string) *pb.ManagementPlaneRequest {
return &pb.ManagementPlaneRequest{
MessageMeta: &pb.MessageMeta{
Expand Down
2 changes: 2 additions & 0 deletions internal/mode/static/nginx/agent/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (fs *fileService) GetFile(
return nil, status.Errorf(codes.NotFound, "file not found")
}

fs.logger.V(1).Info("Getting file for agent", "file", filename)

return &pb.GetFileResponse{
Contents: &pb.FileContents{
Contents: contents,
Expand Down
6 changes: 3 additions & 3 deletions internal/mode/static/nginx/config/plus_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
var plusAPITemplate = gotemplate.Must(gotemplate.New("plusAPI").Parse(plusAPITemplateText))

func executePlusAPI(conf dataplane.Configuration) []executeResult {
result := executeResult{
dest: nginxPlusConfigFile,
}
var result executeResult
// if AllowedAddresses is empty, it means that we are not running on nginx plus, and we don't want this generated
if conf.NginxPlus.AllowedAddresses != nil {
result = executeResult{
dest: nginxPlusConfigFile,
data: helpers.MustExecuteTemplate(plusAPITemplate, conf.NginxPlus),
}
} else {
return nil
}

return []executeResult{result}
Expand Down
18 changes: 2 additions & 16 deletions internal/mode/static/nginx/config/plus_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,7 @@ func TestExecutePlusAPI_EmptyNginxPlus(t *testing.T) {
}

g := NewWithT(t)
expSubStrings := map[string]int{
"listen unix:/var/run/nginx/nginx-plus-api.sock;": 0,
"access_log off;": 0,
"api write=on;": 0,
"listen 8765;": 0,
"root /usr/share/nginx/html;": 0,
"allow 127.0.0.1;": 0,
"deny all;": 0,
"location = /dashboard.html {}": 0,
"api write=off;": 0,
}

for expSubStr, expCount := range expSubStrings {
res := executePlusAPI(conf)
g.Expect(res).To(HaveLen(1))
g.Expect(expCount).To(Equal(strings.Count(string(res[0].data), expSubStr)))
}
res := executePlusAPI(conf)
g.Expect(res).To(BeNil())
}

0 comments on commit c57a28c

Please sign in to comment.