Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: fix when scale replicant pod to 0, core pod can not ready
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
Rory-Z committed Jan 10, 2024
1 parent dd376af commit 0b33232
Showing 11 changed files with 190 additions and 99 deletions.
4 changes: 2 additions & 2 deletions apis/apps/v2beta1/status.go
Original file line number Diff line number Diff line change
@@ -31,8 +31,8 @@ type EMQXStatus struct {
CoreNodes []EMQXNode `json:"coreNodes,omitempty"`
CoreNodesStatus EMQXNodesStatus `json:"coreNodesStatus,omitempty"`

ReplicantNodes []EMQXNode `json:"replicantNodes,omitempty"`
ReplicantNodesStatus *EMQXNodesStatus `json:"replicantNodesStatus,omitempty"`
ReplicantNodes []EMQXNode `json:"replicantNodes,omitempty"`
ReplicantNodesStatus EMQXNodesStatus `json:"replicantNodesStatus,omitempty"`

NodeEvacuationsStatus []NodeEvacuationStatus `json:"nodEvacuationsStatus,omitempty"`
}
6 changes: 1 addition & 5 deletions apis/apps/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion controllers/apps/v2beta1/add_emqx_repl_suite_test.go
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
},
}
instance.Status = appsv2beta1.EMQXStatus{
ReplicantNodesStatus: &appsv2beta1.EMQXNodesStatus{
ReplicantNodesStatus: appsv2beta1.EMQXNodesStatus{
Replicas: 3,
},
Conditions: []metav1.Condition{
2 changes: 1 addition & 1 deletion controllers/apps/v2beta1/add_emqx_repl_test.go
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ func TestGetNewReplicaSet(t *testing.T) {
Replicas: pointer.Int32(3),
},
}
instance.Status.ReplicantNodesStatus = &appsv2beta1.EMQXNodesStatus{
instance.Status.ReplicantNodesStatus = appsv2beta1.EMQXNodesStatus{
CollisionCount: pointer.Int32(0),
}

4 changes: 2 additions & 2 deletions controllers/apps/v2beta1/status_machine.go
Original file line number Diff line number Diff line change
@@ -139,7 +139,7 @@ func (s *coreNodesProgressingStatus) nextStatus(ctx context.Context) {
emqx := s.emqxStatusMachine.GetEMQX()

updateSts, _, _ := getStateFulSetList(ctx, s.emqxStatusMachine.client, emqx)
if updateSts != nil && updateSts.Status.ReadyReplicas == emqx.Status.CoreNodesStatus.Replicas {
if updateSts != nil && updateSts.Status.ReadyReplicas != 0 && updateSts.Status.ReadyReplicas == emqx.Status.CoreNodesStatus.Replicas {
emqx.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesReady,
Status: metav1.ConditionTrue,
@@ -191,7 +191,7 @@ func (s *replicantNodesProgressingStatus) nextStatus(ctx context.Context) {
}

updateRs, _, _ := getReplicaSetList(ctx, s.emqxStatusMachine.client, emqx)
if updateRs != nil && updateRs.Status.ReadyReplicas == emqx.Status.ReplicantNodesStatus.Replicas {
if updateRs != nil && updateRs.Status.ReadyReplicas != 0 && updateRs.Status.ReadyReplicas == emqx.Status.ReplicantNodesStatus.Replicas {
emqx.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesReady,
Status: metav1.ConditionTrue,
1 change: 0 additions & 1 deletion controllers/apps/v2beta1/sync_pods.go
Original file line number Diff line number Diff line change
@@ -44,7 +44,6 @@ func (s *syncPods) reconcile(ctx context.Context, logger logr.Logger, instance *
}
}
}

} else {
if updateSts != nil {
for _, node := range instance.Status.CoreNodes {
4 changes: 2 additions & 2 deletions controllers/apps/v2beta1/sync_pods_suite_test.go
Original file line number Diff line number Diff line change
@@ -72,7 +72,7 @@ var _ = Describe("Check sync pods controller", Ordered, Label("node"), func() {
ReadyReplicas: 2,
Replicas: 1,
},
ReplicantNodesStatus: &appsv2beta1.EMQXNodesStatus{
ReplicantNodesStatus: appsv2beta1.EMQXNodesStatus{
UpdateRevision: "update",
UpdateReplicas: 1,
CurrentRevision: "current",
@@ -334,7 +334,7 @@ var _ = Describe("check can be scale down", func() {
Replicas: pointer.Int32Ptr(3),
},
}
instance.Status.ReplicantNodesStatus = &appsv2beta1.EMQXNodesStatus{
instance.Status.ReplicantNodesStatus = appsv2beta1.EMQXNodesStatus{
UpdateRevision: "update",
CurrentRevision: "current",
}
80 changes: 41 additions & 39 deletions controllers/apps/v2beta1/update_emqx_status.go
Original file line number Diff line number Diff line change
@@ -21,47 +21,53 @@ type updateStatus struct {
}

func (u *updateStatus) reconcile(ctx context.Context, logger logr.Logger, instance *appsv2beta1.EMQX, r innerReq.RequesterInterface) subResult {
if instance.Spec.ReplicantTemplate != nil && instance.Status.ReplicantNodesStatus == nil {
instance.Status.ReplicantNodesStatus = &appsv2beta1.EMQXNodesStatus{
Replicas: *instance.Spec.ReplicantTemplate.Spec.Replicas,
}
instance.Status.CoreNodesStatus.Replicas = *instance.Spec.CoreTemplate.Spec.Replicas
if instance.Spec.ReplicantTemplate != nil {
instance.Status.ReplicantNodesStatus.Replicas = *instance.Spec.ReplicantTemplate.Spec.Replicas
}

updateRs, currentRs, oldRsList := getReplicaSetList(ctx, u.Client, instance)
if updateRs != nil {
if currentRs == nil || currentRs.Status.Replicas == 0 {
if instance.Status.CoreNodesStatus.UpdateRevision != "" && instance.Status.CoreNodesStatus.CurrentRevision == "" {
instance.Status.CoreNodesStatus.CurrentRevision = instance.Status.CoreNodesStatus.UpdateRevision
}
if instance.Status.ReplicantNodesStatus.UpdateRevision != "" && instance.Status.ReplicantNodesStatus.CurrentRevision == "" {
instance.Status.ReplicantNodesStatus.CurrentRevision = instance.Status.ReplicantNodesStatus.UpdateRevision
}

updateSts, currentSts, oldStsList := getStateFulSetList(ctx, u.Client, instance)
if updateSts != nil && updateSts.UID != currentSts.UID {
if currentSts.Status.Replicas == 0 {
var i int
for i = 0; i < len(oldRsList); i++ {
if oldRsList[i].Status.Replicas > 0 {
currentRs = oldRsList[i]
for i = 0; i < len(oldStsList); i++ {
if oldStsList[i].Status.Replicas > 0 {
currentSts = oldStsList[i]

Check warning on line 42 in controllers/apps/v2beta1/update_emqx_status.go

Codecov / codecov/patch

controllers/apps/v2beta1/update_emqx_status.go#L42

Added line #L42 was not covered by tests
break
}
}
if i == len(oldRsList) {
currentRs = updateRs
if i == len(oldStsList) {
currentSts = updateSts
}
instance.Status.ReplicantNodesStatus.CurrentRevision = currentRs.Labels[appsv2beta1.LabelsPodTemplateHashKey]
instance.Status.CoreNodesStatus.CurrentRevision = currentSts.Labels[appsv2beta1.LabelsPodTemplateHashKey]
if err := u.Client.Status().Update(ctx, instance); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update status")}
}
return subResult{}
}
}

updateSts, currentSts, oldStsList := getStateFulSetList(ctx, u.Client, instance)
if updateSts != nil {
if currentSts == nil || currentSts.Status.Replicas == 0 {
updateRs, currentRs, oldRsList := getReplicaSetList(ctx, u.Client, instance)
if updateRs != nil && updateRs.UID != currentRs.UID {
if currentRs.Status.Replicas == 0 {
var i int
for i = 0; i < len(oldStsList); i++ {
if oldStsList[i].Status.Replicas > 0 {
currentSts = oldStsList[i]
for i = 0; i < len(oldRsList); i++ {
if oldRsList[i].Status.Replicas > 0 {
currentRs = oldRsList[i]

Check warning on line 63 in controllers/apps/v2beta1/update_emqx_status.go

Codecov / codecov/patch

controllers/apps/v2beta1/update_emqx_status.go#L62-L63

Added lines #L62 - L63 were not covered by tests
break
}
}
if i == len(oldStsList) {
currentSts = updateSts
if i == len(oldRsList) {
currentRs = updateRs
}
instance.Status.CoreNodesStatus.CurrentRevision = currentSts.Labels[appsv2beta1.LabelsPodTemplateHashKey]
instance.Status.ReplicantNodesStatus.CurrentRevision = currentRs.Labels[appsv2beta1.LabelsPodTemplateHashKey]
if err := u.Client.Status().Update(ctx, instance); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update status")}
}
@@ -80,7 +86,6 @@ func (u *updateStatus) reconcile(ctx context.Context, logger logr.Logger, instan
}

instance.Status.CoreNodes = coreNodes
instance.Status.CoreNodesStatus.Replicas = *instance.Spec.CoreTemplate.Spec.Replicas
instance.Status.CoreNodesStatus.ReadyReplicas = 0
instance.Status.CoreNodesStatus.CurrentReplicas = 0
instance.Status.CoreNodesStatus.UpdateReplicas = 0
@@ -96,22 +101,19 @@ func (u *updateStatus) reconcile(ctx context.Context, logger logr.Logger, instan
}
}

if len(replNodes) > 0 {
instance.Status.ReplicantNodes = replNodes
instance.Status.ReplicantNodesStatus.Replicas = *instance.Spec.ReplicantTemplate.Spec.Replicas
instance.Status.ReplicantNodesStatus.ReadyReplicas = 0
instance.Status.ReplicantNodesStatus.CurrentReplicas = 0
instance.Status.ReplicantNodesStatus.UpdateReplicas = 0
for _, node := range replNodes {
if node.NodeStatus == "running" {
instance.Status.ReplicantNodesStatus.ReadyReplicas++
}
if currentRs != nil && node.ControllerUID == currentRs.UID {
instance.Status.ReplicantNodesStatus.CurrentReplicas++
}
if updateRs != nil && node.ControllerUID == updateRs.UID {
instance.Status.ReplicantNodesStatus.UpdateReplicas++
}
instance.Status.ReplicantNodes = replNodes
instance.Status.ReplicantNodesStatus.ReadyReplicas = 0
instance.Status.ReplicantNodesStatus.CurrentReplicas = 0
instance.Status.ReplicantNodesStatus.UpdateReplicas = 0
for _, node := range replNodes {
if node.NodeStatus == "running" {
instance.Status.ReplicantNodesStatus.ReadyReplicas++
}
if currentRs != nil && node.ControllerUID == currentRs.UID {
instance.Status.ReplicantNodesStatus.CurrentReplicas++
}
if updateRs != nil && node.ControllerUID == updateRs.UID {
instance.Status.ReplicantNodesStatus.UpdateReplicas++
}
}

58 changes: 37 additions & 21 deletions controllers/apps/v2beta1/update_pod_conditions.go
Original file line number Diff line number Diff line change
@@ -21,8 +21,21 @@ type updatePodConditions struct {
}

func (u *updatePodConditions) reconcile(ctx context.Context, logger logr.Logger, instance *appsv2beta1.EMQX, r innerReq.RequesterInterface) subResult {
var updateStsUID, currentStsUID, updateRsUID, currentRsUID types.UID
updateRs, currentRs, _ := getReplicaSetList(ctx, u.Client, instance)
if updateRs != nil {
updateRsUID = updateRs.UID
}
if currentRs != nil {
currentRsUID = currentRs.UID
}
updateSts, currentSts, _ := getStateFulSetList(ctx, u.Client, instance)
if updateSts != nil {
updateStsUID = updateSts.UID
}
if currentSts != nil {
currentStsUID = currentSts.UID
}

pods := &corev1.PodList{}
_ = u.Client.List(ctx, pods,
@@ -38,40 +51,43 @@ func (u *updatePodConditions) reconcile(ctx context.Context, logger logr.Logger,
}

onServingCondition := corev1.PodCondition{
Type: appsv2beta1.PodOnServing,
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Type: appsv2beta1.PodOnServing,
}
for _, condition := range pod.Status.Conditions {
if condition.Type == appsv2beta1.PodOnServing {
onServingCondition.Status = condition.Status
onServingCondition.LastTransitionTime = condition.LastTransitionTime
}
}

if instance.Status.IsConditionTrue(appsv2beta1.Available) {
if (updateSts != nil && controllerRef.UID == updateSts.UID) ||
(updateRs != nil && controllerRef.UID == updateRs.UID) {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.ContainersReady && condition.Status == corev1.ConditionTrue {
onServingCondition.Status = u.checkInCluster(instance, r, pod)
break
switch controllerRef.UID {
case updateStsUID, updateRsUID:
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.ContainersReady && condition.Status == corev1.ConditionTrue {
status := u.checkInCluster(instance, r, pod)
if status != onServingCondition.Status {
onServingCondition.Status = status
onServingCondition.LastTransitionTime = metav1.Now()
}
break
}
}
} else {
if (currentSts != nil && controllerRef.UID == currentSts.UID) ||
(currentRs != nil && controllerRef.UID == currentRs.UID) ||
(updateSts != nil && controllerRef.UID == updateSts.UID) ||
(updateRs != nil && controllerRef.UID == updateRs.UID) {
case currentStsUID, currentRsUID:
// When available condition is true, need clean currentSts / currentRs pod
if instance.Status.IsConditionTrue(appsv2beta1.Available) {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.ContainersReady && condition.Status == corev1.ConditionTrue {
onServingCondition.Status = u.checkInCluster(instance, r, pod)
status := corev1.ConditionFalse
if status != onServingCondition.Status {
onServingCondition.Status = status
onServingCondition.LastTransitionTime = metav1.Now()
}
break
}
}
}
}

for _, condition := range pod.Status.Conditions {
if condition.Type == appsv2beta1.PodOnServing && condition.Status == onServingCondition.Status {
onServingCondition.LastTransitionTime = condition.LastTransitionTime
}
}
patchBytes, _ := json.Marshal(corev1.Pod{
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{onServingCondition},
7 changes: 0 additions & 7 deletions controllers/apps/v2beta1/util.go
Original file line number Diff line number Diff line change
@@ -93,13 +93,6 @@ func getReplicaSetList(ctx context.Context, k8sClient client.Client, instance *a
client.InNamespace(instance.Namespace),
client.MatchingLabels(labels),
)
if instance.Spec.ReplicantTemplate == nil {
for _, rs := range list.Items {
oldRsList = append(oldRsList, rs.DeepCopy())
}
sort.Sort(ReplicaSetsByCreationTimestamp(oldRsList))
return
}

for _, rs := range list.Items {
if hash, ok := rs.Labels[appsv2beta1.LabelsPodTemplateHashKey]; ok {
121 changes: 103 additions & 18 deletions e2e/v2beta1/e2e_test.go
Original file line number Diff line number Diff line change
@@ -72,17 +72,24 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
}, And(
HaveField("Replicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
HaveField("ReadyReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
HaveField("CurrentRevision", Not(BeEmpty())),
HaveField("CurrentRevision", Not(Equal(""))),
HaveField("CurrentReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
HaveField("UpdateRevision", Not(BeEmpty())),
HaveField("UpdateRevision", Not(Equal(""))),
HaveField("UpdateReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
)),
WithTransform(func(instance *appsv2beta1.EMQX) []appsv2beta1.EMQXNode {
return instance.Status.ReplicantNodes
}, BeNil()),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, BeNil()),
}, And(
HaveField("Replicas", Equal(int32(0))),
HaveField("ReadyReplicas", Equal(int32(0))),
HaveField("CurrentRevision", Equal("")),
HaveField("CurrentReplicas", Equal(int32(0))),
HaveField("UpdateRevision", Equal("")),
HaveField("UpdateReplicas", Equal(int32(0))),
)),
),
)

@@ -124,15 +131,22 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
HaveField("ReadyReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
HaveField("CurrentRevision", Equal(storage.Status.CoreNodesStatus.CurrentRevision)),
HaveField("CurrentReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
HaveField("UpdateRevision", Not(BeEmpty())),
HaveField("UpdateRevision", Not(Equal(""))),
HaveField("UpdateReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
)),
WithTransform(func(instance *appsv2beta1.EMQX) []appsv2beta1.EMQXNode {
return instance.Status.ReplicantNodes
}, BeNil()),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, BeNil()),
}, And(
HaveField("Replicas", Equal(int32(0))),
HaveField("ReadyReplicas", Equal(int32(0))),
HaveField("CurrentRevision", Equal("")),
HaveField("CurrentReplicas", Equal(int32(0))),
HaveField("UpdateRevision", Equal("")),
HaveField("UpdateReplicas", Equal(int32(0))),
)),
),
)

@@ -180,9 +194,16 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
WithTransform(func(instance *appsv2beta1.EMQX) []appsv2beta1.EMQXNode {
return instance.Status.ReplicantNodes
}, BeNil()),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, BeNil()),
}, And(
HaveField("Replicas", Equal(int32(0))),
HaveField("ReadyReplicas", Equal(int32(0))),
HaveField("CurrentRevision", Equal("")),
HaveField("CurrentReplicas", Equal(int32(0))),
HaveField("UpdateRevision", Equal("")),
HaveField("UpdateReplicas", Equal(int32(0))),
)),
),
)

@@ -220,9 +241,16 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
HaveField("CurrentRevision", Not(Equal(storage.Status.CoreNodesStatus.CurrentRevision))),
HaveField("UpdateRevision", Not(Equal(storage.Status.CoreNodesStatus.CurrentRevision))),
)),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, BeNil()),
}, And(
HaveField("Replicas", Equal(int32(0))),
HaveField("ReadyReplicas", Equal(int32(0))),
HaveField("CurrentRevision", Equal("")),
HaveField("CurrentReplicas", Equal(int32(0))),
HaveField("UpdateRevision", Equal("")),
HaveField("UpdateReplicas", Equal(int32(0))),
)),
),
)

@@ -327,19 +355,19 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
}, And(
HaveField("Replicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
HaveField("ReadyReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
HaveField("CurrentRevision", Not(BeEmpty())),
HaveField("CurrentRevision", Not(Equal(""))),
HaveField("CurrentReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
HaveField("UpdateRevision", Not(BeEmpty())),
HaveField("UpdateRevision", Not(Equal(""))),
HaveField("UpdateReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
)),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, And(
HaveField("Replicas", Equal(int32(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
HaveField("ReadyReplicas", Equal(int32(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
HaveField("CurrentRevision", Not(BeEmpty())),
HaveField("CurrentRevision", Not(Equal(""))),
HaveField("CurrentReplicas", Equal(int32(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
HaveField("UpdateRevision", Not(BeEmpty())),
HaveField("UpdateRevision", Not(Equal(""))),
HaveField("UpdateReplicas", Equal(int32(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
)),
),
@@ -389,7 +417,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
WithTransform(func(instance *appsv2beta1.EMQX) []appsv2beta1.EMQXNode {
return instance.Status.ReplicantNodes
}, HaveLen(int(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, And(
HaveField("Replicas", Equal(int32(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
@@ -411,6 +439,63 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
))
})

It("scale down EMQX replicant nodes to 0", func() {
var storage *appsv2beta1.EMQX
Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(instance), instance); err != nil {
return err
}
storage = instance.DeepCopy()
instance.Spec.ReplicantTemplate.Spec.Replicas = pointer.Int32Ptr(0)
return k8sClient.Update(ctx, instance)
})).Should(Succeed())

Eventually(func() *appsv2beta1.EMQX {
_ = k8sClient.Get(ctx, client.ObjectKeyFromObject(instance), instance)
return instance
}).WithTimeout(timeout).WithPolling(interval).Should(
And(
WithTransform(func(instance *appsv2beta1.EMQX) bool {
return instance.Status.IsConditionTrue(appsv2beta1.Ready)
}, BeTrue()),
WithTransform(func(instance *appsv2beta1.EMQX) []appsv2beta1.EMQXNode {
return instance.Status.CoreNodes
}, HaveLen(int(*instance.Spec.CoreTemplate.Spec.Replicas))),
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.CoreNodesStatus
}, And(
HaveField("Replicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
HaveField("ReadyReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
HaveField("CurrentRevision", Equal(storage.Status.CoreNodesStatus.CurrentRevision)),
HaveField("CurrentReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
HaveField("UpdateRevision", Equal(storage.Status.CoreNodesStatus.CurrentRevision)),
HaveField("UpdateReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))),
)),
WithTransform(func(instance *appsv2beta1.EMQX) []appsv2beta1.EMQXNode {
return instance.Status.ReplicantNodes
}, HaveLen(int(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, And(
HaveField("Replicas", Equal(int32(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
HaveField("ReadyReplicas", Equal(int32(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
HaveField("CurrentRevision", Equal(storage.Status.ReplicantNodesStatus.CurrentRevision)),
HaveField("CurrentReplicas", Equal(int32(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
HaveField("UpdateRevision", Equal(storage.Status.ReplicantNodesStatus.CurrentRevision)),
HaveField("UpdateReplicas", Equal(int32(*instance.Spec.ReplicantTemplate.Spec.Replicas))),
)),
),
)

checkServices(instance)
checkPods(instance)
checkEndpoints(instance, appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultCoreLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
instance.Status.CoreNodesStatus.CurrentRevision,
))
})

It("change EMQX image", func() {
var storage *appsv2beta1.EMQX
Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error {
@@ -436,7 +521,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
HaveField("CurrentRevision", Not(Equal(storage.Status.CoreNodesStatus.CurrentRevision))),
HaveField("UpdateRevision", Not(Equal(storage.Status.CoreNodesStatus.CurrentRevision))),
)),
WithTransform(func(instance *appsv2beta1.EMQX) *appsv2beta1.EMQXNodesStatus {
WithTransform(func(instance *appsv2beta1.EMQX) appsv2beta1.EMQXNodesStatus {
return instance.Status.ReplicantNodesStatus
}, And(
HaveField("CurrentRevision", Not(Equal(storage.Status.ReplicantNodesStatus.CurrentRevision))),

0 comments on commit 0b33232

Please sign in to comment.