Skip to content

Commit

Permalink
CP/DP split: Add leader election (#3092)
Browse files Browse the repository at this point in the history
Add leader election to allow data plane pods to only connect to the lead NGF pod. If control plane is scaled, only the leader is marked as ready and the backups are Unready so the data plane doesn't connect to them.

Problem: We want the NGF control plane to fail-over to another pod when the control plane pod goes down.

Solution: Only the leader pod is marked as ready by Kubernetes, and all connections from data plane pods are connected to the leader pod.
  • Loading branch information
bjee19 authored Feb 10, 2025
1 parent c57a28c commit b1ba80c
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 52 deletions.
Binary file modified docs/proposals/control-data-plane-split/graph-conns.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 16 additions & 12 deletions internal/framework/runnables/runnables.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,33 @@ func (r *LeaderOrNonLeader) NeedLeaderElection() bool {
return false
}

// EnableAfterBecameLeader is a Runnable that will call the enable function when the current instance becomes
// CallFunctionsAfterBecameLeader is a Runnable that will call the given functions when the current instance becomes
// the leader.
type EnableAfterBecameLeader struct {
enable func(context.Context)
type CallFunctionsAfterBecameLeader struct {
enableFunctions []func(context.Context)
}

var (
_ manager.LeaderElectionRunnable = &EnableAfterBecameLeader{}
_ manager.Runnable = &EnableAfterBecameLeader{}
_ manager.LeaderElectionRunnable = &CallFunctionsAfterBecameLeader{}
_ manager.Runnable = &CallFunctionsAfterBecameLeader{}
)

// NewEnableAfterBecameLeader creates a new EnableAfterBecameLeader Runnable.
func NewEnableAfterBecameLeader(enable func(context.Context)) *EnableAfterBecameLeader {
return &EnableAfterBecameLeader{
enable: enable,
// NewCallFunctionsAfterBecameLeader creates a new CallFunctionsAfterBecameLeader Runnable.
func NewCallFunctionsAfterBecameLeader(
enableFunctions []func(context.Context),
) *CallFunctionsAfterBecameLeader {
return &CallFunctionsAfterBecameLeader{
enableFunctions: enableFunctions,
}
}

func (j *EnableAfterBecameLeader) Start(ctx context.Context) error {
j.enable(ctx)
func (j *CallFunctionsAfterBecameLeader) Start(ctx context.Context) error {
for _, f := range j.enableFunctions {
f(ctx)
}
return nil
}

func (j *EnableAfterBecameLeader) NeedLeaderElection() bool {
func (j *CallFunctionsAfterBecameLeader) NeedLeaderElection() bool {
return true
}
22 changes: 14 additions & 8 deletions internal/framework/runnables/runnables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,25 @@ func TestLeaderOrNonLeader(t *testing.T) {
g.Expect(leaderOrNonLeader.NeedLeaderElection()).To(BeFalse())
}

func TestEnableAfterBecameLeader(t *testing.T) {
func TestCallFunctionsAfterBecameLeader(t *testing.T) {
t.Parallel()
enabled := false
enableAfterBecameLeader := NewEnableAfterBecameLeader(func(_ context.Context) {
enabled = true
statusUpdaterEnabled := false
healthCheckEnableLeader := false
eventHandlerEnabled := false

callFunctionsAfterBecameLeader := NewCallFunctionsAfterBecameLeader([]func(ctx context.Context){
func(_ context.Context) { statusUpdaterEnabled = true },
func(_ context.Context) { healthCheckEnableLeader = true },
func(_ context.Context) { eventHandlerEnabled = true },
})

g := NewWithT(t)
g.Expect(enableAfterBecameLeader.NeedLeaderElection()).To(BeTrue())
g.Expect(enabled).To(BeFalse())
g.Expect(callFunctionsAfterBecameLeader.NeedLeaderElection()).To(BeTrue())

err := enableAfterBecameLeader.Start(context.Background())
err := callFunctionsAfterBecameLeader.Start(context.Background())
g.Expect(err).ToNot(HaveOccurred())

g.Expect(enabled).To(BeTrue())
g.Expect(statusUpdaterEnabled).To(BeTrue())
g.Expect(healthCheckEnableLeader).To(BeTrue())
g.Expect(eventHandlerEnabled).To(BeTrue())
}
30 changes: 25 additions & 5 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,33 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log

changeType, gr := h.cfg.processor.Process()

// Once we've processed resources on startup and built our first graph, mark the Pod as ready.
if !h.cfg.graphBuiltHealthChecker.ready {
h.cfg.graphBuiltHealthChecker.setAsReady()
// Once we've processed resources on startup and built our first graph, mark the Pod as having built the graph.
if !h.cfg.graphBuiltHealthChecker.graphBuilt {
h.cfg.graphBuiltHealthChecker.setGraphBuilt()
}

// TODO(sberman): hardcode this deployment name until we support provisioning data planes
// If no deployments exist, we should just return without doing anything.
// if this Pod is not the leader or does not have the leader lease yet,
// the nginx conf should not be updated.
if !h.cfg.graphBuiltHealthChecker.leader {
return
}

h.sendNginxConfig(ctx, logger, gr, changeType)
}

func (h *eventHandlerImpl) eventHandlerEnable(ctx context.Context) {
// Latest graph is guaranteed to not be nil since the leader election process takes longer than
// the initial call to HandleEventBatch when NGF starts up. And GatewayClass will typically always exist which
// triggers an event.
h.sendNginxConfig(ctx, h.cfg.logger, h.cfg.processor.GetLatestGraph(), state.ClusterStateChange)
}

func (h *eventHandlerImpl) sendNginxConfig(
ctx context.Context,
logger logr.Logger,
gr *graph.Graph,
changeType state.ChangeType,
) {
deploymentName := types.NamespacedName{
Name: "tmp-nginx-deployment",
Namespace: h.cfg.gatewayPodConfig.Namespace,
Expand Down
28 changes: 22 additions & 6 deletions internal/mode/static/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ var _ = Describe("eventHandler", func() {
metricsCollector: collectors.NewControllerNoopCollector(),
updateGatewayClassStatus: true,
})
Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeFalse())
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeFalse())

handler.cfg.graphBuiltHealthChecker.leader = true
})

AfterEach(func() {
Expand Down Expand Up @@ -161,7 +163,7 @@ var _ = Describe("eventHandler", func() {
})

AfterEach(func() {
Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeTrue())
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeTrue())
})

When("a batch has one event", func() {
Expand Down Expand Up @@ -484,22 +486,36 @@ var _ = Describe("eventHandler", func() {
Expect(gr.LatestReloadResult.Error.Error()).To(Equal("status error"))
})

It("should set the health checker status properly", func() {
It("should update nginx conf only when leader", func() {
ctx := context.Background()
handler.cfg.graphBuiltHealthChecker.leader = false

e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
batch := []interface{}{e}
readyChannel := handler.cfg.graphBuiltHealthChecker.getReadyCh()

fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})

Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed())
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

// graph is built, but since the graphBuiltHealthChecker.leader is false, configuration isn't created and
// the readyCheck fails
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeTrue())
Expect(handler.GetLatestConfiguration()).To(BeNil())
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed())
Expect(readyChannel).ShouldNot(BeClosed())

// Once the pod becomes leader, these two functions will be called through the runnables we set in the manager
handler.cfg.graphBuiltHealthChecker.setAsLeader(ctx)
handler.eventHandlerEnable(ctx)

// nginx conf has been set
dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1)
Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty())

Expect(readyChannel).To(BeClosed())

// ready check is also set
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).To(Succeed())
Expect(handler.cfg.graphBuiltHealthChecker.getReadyCh()).To(BeClosed())
})

It("should panic for an unknown event type", func() {
Expand Down
87 changes: 76 additions & 11 deletions internal/mode/static/health.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package static

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"

"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/nginx/nginx-gateway-fabric/internal/mode/static/config"
)

// newGraphBuiltHealthChecker creates a new graphBuiltHealthChecker.
Expand All @@ -13,37 +21,94 @@ func newGraphBuiltHealthChecker() *graphBuiltHealthChecker {
}
}

// graphBuiltHealthChecker is used to check if the initial graph is built and the NGF Pod is ready.
// graphBuiltHealthChecker is used to check if the NGF Pod is ready. The NGF Pod is ready if the initial graph has
// been built and if it is leader.
type graphBuiltHealthChecker struct {
// readyCh is a channel that is initialized in newGraphBuiltHealthChecker and represents if the NGF Pod is ready.
readyCh chan struct{}
lock sync.RWMutex
ready bool
readyCh chan struct{}
lock sync.RWMutex
graphBuilt bool
leader bool
}

// createHealthProbe creates a Server runnable to serve as our health and readiness checker.
func createHealthProbe(cfg config.Config, healthChecker *graphBuiltHealthChecker) (manager.Server, error) {
// we chose to create our own health probe server instead of using the controller-runtime one because
// of repetitive log which would flood our logs on non-ready non-leader NGF Pods. This health probe is
// similar to the controller-runtime's health probe.

mux := http.NewServeMux()

// copy of controller-runtime sane defaults for new http.Server
s := &http.Server{
Handler: mux,
MaxHeaderBytes: 1 << 20,
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
ReadHeaderTimeout: 32 * time.Second,
}

mux.HandleFunc(readinessEndpointName, healthChecker.readyHandler)

ln, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.HealthConfig.Port))
if err != nil {
return manager.Server{},
fmt.Errorf("error listening on %s: %w", fmt.Sprintf(":%d", cfg.HealthConfig.Port), err)
}

return manager.Server{
Name: "health probe",
Server: s,
Listener: ln,
}, nil
}

func (h *graphBuiltHealthChecker) readyHandler(resp http.ResponseWriter, req *http.Request) {
if err := h.readyCheck(req); err != nil {
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
resp.WriteHeader(http.StatusOK)
}
}

// readyCheck returns the ready-state of the Pod. It satisfies the controller-runtime Checker type.
// We are considered ready after the first graph is built.
// We are considered ready after the first graph is built and if the NGF Pod is leader.
func (h *graphBuiltHealthChecker) readyCheck(_ *http.Request) error {
h.lock.RLock()
defer h.lock.RUnlock()

if !h.ready {
return errors.New("control plane is not yet ready")
if !h.leader {
return errors.New("this Pod is not currently leader")
}

if !h.graphBuilt {
return errors.New("control plane initial graph has not been built")
}

return nil
}

// setAsReady marks the health check as ready.
func (h *graphBuiltHealthChecker) setAsReady() {
// setGraphBuilt marks the health check as having the initial graph built.
func (h *graphBuiltHealthChecker) setGraphBuilt() {
h.lock.Lock()
defer h.lock.Unlock()

h.ready = true
close(h.readyCh)
h.graphBuilt = true
}

// getReadyCh returns a read-only channel, which determines if the NGF Pod is ready.
func (h *graphBuiltHealthChecker) getReadyCh() <-chan struct{} {
return h.readyCh
}

// setAsLeader marks the health check as leader.
func (h *graphBuiltHealthChecker) setAsLeader(_ context.Context) {
h.lock.Lock()
defer h.lock.Unlock()

h.leader = true

// setGraphBuilt should already have been called when processing the resources on startup because the leader
// election process takes longer than the initial call to HandleEventBatch. Thus, the NGF Pod should be marked as
// ready and have this channel be closed.
close(h.readyCh)
}
Loading

0 comments on commit b1ba80c

Please sign in to comment.