Skip to content

Commit

Permalink
[controller] Ensure k8s statefulset statuses are fresh (#271)
Browse files Browse the repository at this point in the history
In debugging #268, I discovered that in all cases where we updated more
than 1 statefulset, it was because the observed generation on the set
was 1 less than the set's actual generation. This means that the
Kubernetes statefulset controller had not yet processed the update, and
that the information on `Status` that we check (ready replicas, current
replicas, etc.) was not up-to-date.

Also out of paranoia, I changed the update behavior to ensure that we
keep the old object meta fields around when we send the `Update` to
Kubernetes. I think if this isn't the case, we potentially overwrite a
set even if it had changed since we last process it.

This PR ensures that we stop processing the cluster if any of the set
statuses are not fresh, and that we use familiar conflict methods on set
updated.

Closes #268.
  • Loading branch information
schallert authored Feb 11, 2021
1 parent 81aaae1 commit 81b6f3a
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cmd/m3db-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func main() {
os.Exit(1)
}()

if err := controller.Run(2, stopCh); err != nil {
if err := controller.Run(1, stopCh); err != nil {
logger.Fatal("error running controller", zap.Error(err))
}
}
Expand Down
73 changes: 62 additions & 11 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,40 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error
}
}

// For all statefulsets, ensure their observered generation is up to date.
// This means that the k8s statefulset controller has updated Status (and
// therefore ready replicas + updated replicas). If observed generation !=
// generation, it means Status will contain stale info.
for _, sts := range childrenSets {
if sts.Generation != sts.Status.ObservedGeneration {
c.logger.Warn("stateful set not up to date",
zap.String("namespace", sts.Namespace),
zap.String("name", sts.Name),
zap.Int32("readyReplicas", sts.Status.ReadyReplicas),
zap.Int32("updatedReplicas", sts.Status.UpdatedReplicas),
zap.String("currentRevision", sts.Status.CurrentRevision),
zap.String("updateRevision", sts.Status.UpdateRevision),
zap.Int64("generation", sts.Generation),
zap.Int64("observed", sts.Status.ObservedGeneration),
)
return fmt.Errorf("set %s generation is not up to date (current: %d, observed: %d)", sts.Name, sts.Generation, sts.Status.ObservedGeneration)
}
}

// If any of the statefulsets aren't ready, wait until they are as we'll get
// another event (ready == bootstrapped)
for _, sts := range childrenSets {
c.logger.Debug("processing set",
zap.String("namespace", sts.Namespace),
zap.String("name", sts.Name),
zap.Int32("readyReplicas", sts.Status.ReadyReplicas),
zap.Int32("updatedReplicas", sts.Status.UpdatedReplicas),
zap.String("currentRevision", sts.Status.CurrentRevision),
zap.String("updateRevision", sts.Status.UpdateRevision),
zap.Int64("generation", sts.Generation),
zap.Int64("observed", sts.Status.ObservedGeneration),
)

if sts.Spec.Replicas == nil {
c.logger.Warn("skip check for statefulset, replicas is nil",
zap.String("name", sts.Name),
Expand Down Expand Up @@ -539,7 +570,20 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error
return err
}

c.logger.Info("updated statefulset", zap.String("name", name))
c.logger.Info("updated statefulset",
zap.String("name", expected.Name),
zap.Int32("actual_readyReplicas", actual.Status.ReadyReplicas),
zap.Int32("actual_updatedReplicas", actual.Status.UpdatedReplicas),
zap.String("actual_currentRevision", actual.Status.CurrentRevision),
zap.String("actual_updateRevision", actual.Status.UpdateRevision),
zap.Int32("expected_readyReplicas", expected.Status.ReadyReplicas),
zap.Int32("expected_updatedReplicas", expected.Status.UpdatedReplicas),
zap.String("expected_currentRevision", expected.Status.CurrentRevision),
zap.String("expected_updateRevision", expected.Status.UpdateRevision),
zap.Int64("generation", expected.Generation),
zap.Int64("observed", expected.Status.ObservedGeneration),
)

return nil
}

Expand All @@ -564,10 +608,10 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error
// At this point we have the desired number of statefulsets, and every pod
// across those sets is bootstrapped. However some may be bootstrapped because
// they own no shards. Check to see that all pods are in the placement.
selector := klabels.SelectorFromSet(labels.BaseLabels(cluster))
pods, err := c.podLister.Pods(cluster.Namespace).List(selector)
clusterPodsSelector := klabels.SelectorFromSet(labels.BaseLabels(cluster))
pods, err := c.podLister.Pods(cluster.Namespace).List(clusterPodsSelector)
if err != nil {
return fmt.Errorf("error listing pods: %v", err)
return fmt.Errorf("error listing pods to construct placement: %v", err)
}

placement, err := c.adminClient.placementClientForCluster(cluster).Get()
Expand Down Expand Up @@ -867,7 +911,7 @@ func (c *M3DBController) handlePodEvent(key string) error {

func (c *M3DBController) handlePodUpdate(pod *corev1.Pod) error {
// We only process pods that are members of m3db clusters.
if _, found := getClusterValue(pod); !found {
if _, err := getClusterValue(pod); err != nil {
return nil
}

Expand Down Expand Up @@ -945,19 +989,19 @@ func (c *M3DBController) handlePodUpdate(pod *corev1.Pod) error {
return nil
}

func getClusterValue(pod *corev1.Pod) (string, bool) {
func getClusterValue(pod *corev1.Pod) (string, error) {
cluster, ok := pod.Labels[labels.Cluster]
if !ok {
return "", false
return "", errOrphanedPod
}

return cluster, true
return cluster, nil
}

func (c *M3DBController) getParentCluster(pod *corev1.Pod) (*myspec.M3DBCluster, error) {
clusterName, found := getClusterValue(pod)
if !found {
return nil, errOrphanedPod
clusterName, err := getClusterValue(pod)
if err != nil {
return nil, err
}

cluster, err := c.clusterLister.M3DBClusters(pod.Namespace).Get(clusterName)
Expand Down Expand Up @@ -1031,6 +1075,13 @@ func updatedStatefulSet(
return actual, true, nil
}

// Ensure we keep old object meta so that resource version info can be used by
// K8s API for conflict resolution.
expected.ObjectMeta = *actual.ObjectMeta.DeepCopy()
// Reset expected annotations since we ensure their final state below.
expected.ObjectMeta.Annotations = map[string]string{}
expected.Status = actual.DeepCopy().Status

copyAnnotations(expected, actual)
return expected, true, nil
}
Expand Down
66 changes: 54 additions & 12 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ func setupTestCluster(
}

type waitForStatefulSetsOptions struct {
setReadyReplicas bool
expectedStatefulSets []string
simulatePodsNotUpdated bool
setReadyReplicas bool
expectedStatefulSets []string
simulatePodsNotUpdated bool
simulateStaleGeneration bool
expectError string
}

type waitForStatefulSetsResult struct {
Expand Down Expand Up @@ -181,6 +183,10 @@ func waitForStatefulSets(
// testing for when not updating them.
sts.Status.UpdatedReplicas = *sts.Spec.Replicas
}
if opts.simulateStaleGeneration {
sts.Status.ObservedGeneration = 2
sts.ObjectMeta.Generation = 1
}
default:
t.Errorf("verb %s is not supported", verb)
}
Expand All @@ -204,11 +210,16 @@ func waitForStatefulSets(
// we see all stateful sets that we expect and also be able to catch any extra stateful
// sets that we don't.
var (
iters = math.Max(float64(2*len(opts.expectedStatefulSets)), 5)
iters = math.Max(float64(2*len(opts.expectedStatefulSets)), 5)
finalErr error
)
for i := 0; i < int(iters); i++ {
err := controller.handleClusterUpdate(cluster)
require.NoError(t, err)
if opts.expectError != "" {
finalErr = err
} else {
require.NoError(t, err)
}

mu.Lock()
var seen int
Expand Down Expand Up @@ -257,6 +268,10 @@ func waitForStatefulSets(
}
}

if opts.expectError != "" {
require.EqualError(t, finalErr, opts.expectError)
}

done := seen == len(opts.expectedStatefulSets)
return waitForStatefulSetsResult{
updatedStatefulSets: updated,
Expand Down Expand Up @@ -421,16 +436,16 @@ func TestInstancesInIsoGroup(t *testing.T) {

func TestGetClusterValue(t *testing.T) {
pod := &corev1.Pod{}
cluster, ok := getClusterValue(pod)
assert.False(t, ok)
cluster, err := getClusterValue(pod)
assert.Error(t, err)
assert.Equal(t, "", cluster)

pod.Labels = map[string]string{
"operator.m3db.io/cluster": "foo",
}

cluster, ok = getClusterValue(pod)
assert.True(t, ok)
cluster, err = getClusterValue(pod)
assert.NoError(t, err)
assert.Equal(t, "foo", cluster)
}

Expand Down Expand Up @@ -697,8 +712,10 @@ func TestHandleUpdateClusterUpdatesStatefulSets(t *testing.T) {
newConfigMap string
increaseReplicas bool
simulatePodsNotUpdated bool
simulateStaleGeneration bool
expUpdateStatefulSets []string
expFailedUpdateStatefulSets []string
expError string
expNotDone bool
}{
{
Expand Down Expand Up @@ -821,6 +838,29 @@ func TestHandleUpdateClusterUpdatesStatefulSets(t *testing.T) {
expFailedUpdateStatefulSets: []string{"cluster1-rep1", "cluster1-rep2"},
expNotDone: true,
},
{
name: "fails if generation is stale",
cluster: newMeta("cluster1", map[string]string{
"foo": "bar",
"operator.m3db.io/app": "m3db",
"operator.m3db.io/cluster": "cluster1",
}, nil),
sets: []*metav1.ObjectMeta{
newMeta("cluster1-rep0", nil, map[string]string{
annotations.Update: "enabled",
}),
newMeta("cluster1-rep1", nil, map[string]string{
annotations.Update: "enabled",
}),
newMeta("cluster1-rep2", nil, map[string]string{
annotations.Update: "enabled",
}),
},
newImage: "m3db:v2.0.0",
simulateStaleGeneration: true,
expNotDone: true,
expError: "set cluster1-rep0 generation is not up to date (current: 1, observed: 2)",
},
}

for _, test := range tests {
Expand Down Expand Up @@ -867,9 +907,11 @@ func TestHandleUpdateClusterUpdatesStatefulSets(t *testing.T) {
}

result, done := waitForStatefulSets(t, c, cluster, "update", waitForStatefulSetsOptions{
setReadyReplicas: true,
expectedStatefulSets: test.expUpdateStatefulSets,
simulatePodsNotUpdated: test.simulatePodsNotUpdated,
setReadyReplicas: true,
expectedStatefulSets: test.expUpdateStatefulSets,
simulatePodsNotUpdated: test.simulatePodsNotUpdated,
simulateStaleGeneration: test.simulateStaleGeneration,
expectError: test.expError,
})
if test.expNotDone {
assert.False(t, done, "expected not all sets to be updated")
Expand Down

0 comments on commit 81b6f3a

Please sign in to comment.