diff --git a/go.mod b/go.mod index e0ac5e27..3a5d90df 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( k8s.io/klog/v2 v2.100.1 k8s.io/kubernetes v0.0.0-00010101000000-000000000000 k8s.io/utils v0.0.0-20230726121419-3b25d923346b - kusionstack.io/kube-api v0.6.5 + kusionstack.io/kube-api v0.6.6 kusionstack.io/resourceconsist v0.0.1 sigs.k8s.io/controller-runtime v0.15.1 ) diff --git a/go.sum b/go.sum index aca8f645..84c5352b 100644 --- a/go.sum +++ b/go.sum @@ -1139,8 +1139,8 @@ k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -kusionstack.io/kube-api v0.6.5 h1:aVF6YsejHd1ujpI5oW6rdu2bhsYzX0PM5OnoHvAtIzs= -kusionstack.io/kube-api v0.6.5/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y= +kusionstack.io/kube-api v0.6.6 h1:gMLUQL/eectQxkosnlv1m/R2xieY2crETliWRcxBICg= +kusionstack.io/kube-api v0.6.6/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y= kusionstack.io/resourceconsist v0.0.1 h1:+k/jriq5Ld7fQUYfWSMGynz/FesHtl3Rk2fmQPjBe0g= kusionstack.io/resourceconsist v0.0.1/go.mod h1:816xS/fY6EOUbPFjXIWW/TGs8/YE46qP4ElKeIiwFdU= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= diff --git a/pkg/controllers/collaset/pvccontrol/pvc_control.go b/pkg/controllers/collaset/pvccontrol/pvc_control.go index 0ddf422e..bd8bb64f 100644 --- a/pkg/controllers/collaset/pvccontrol/pvc_control.go +++ b/pkg/controllers/collaset/pvccontrol/pvc_control.go @@ -178,8 +178,15 @@ func (pc *RealPvcControl) DeletePodUnusedPvcs(ctx context.Context, cls *appsv1al return err } + mountedPvcNames := sets.String{} + for _, container := range pod.Spec.Containers { + for _, v := range container.VolumeMounts { + mountedPvcNames.Insert(v.Name) + } + } + //delete pvc which is not claimed in templates - if err := deleteUnclaimedPvcs(pc.client, ctx, cls, oldPvcs); err != nil { + if err := deleteUnclaimedPvcs(pc.client, ctx, cls, oldPvcs, mountedPvcNames); err != nil { return err } @@ -304,16 +311,20 @@ func IsPodPvcTmpChanged(cls *appsv1alpha1.CollaSet, pod *corev1.Pod, existingPvc return false, nil } -func deleteUnclaimedPvcs(c client.Client, ctx context.Context, cls *appsv1alpha1.CollaSet, oldPvcs *map[string]*corev1.PersistentVolumeClaim) error { +func deleteUnclaimedPvcs(c client.Client, ctx context.Context, cls *appsv1alpha1.CollaSet, oldPvcs *map[string]*corev1.PersistentVolumeClaim, mountedPvcNames sets.String) error { expectedNames := sets.String{} for _, pvcTmp := range cls.Spec.VolumeClaimTemplates { expectedNames.Insert(pvcTmp.Name) } for pvcTmpName, pvc := range *oldPvcs { + // if pvc is still mounted on pod, keep it + if mountedPvcNames.Has(pvcTmpName) { + continue + } + // if pvc is claimed in pvc templates, keep it if expectedNames.Has(pvcTmpName) { continue } - // if pvc is not claimed in pvc templates, delete it if err := c.Delete(ctx, pvc); err != nil { return err } else if err := collasetutils.ActiveExpectations.ExpectDelete(cls, expectations.Pvc, pvc.Name); err != nil { @@ -325,7 +336,6 @@ func deleteUnclaimedPvcs(c client.Client, ctx context.Context, cls *appsv1alpha1 func deleteOldPvcs(c client.Client, ctx context.Context, cls *appsv1alpha1.CollaSet, newPvcs, oldPvcs *map[string]*corev1.PersistentVolumeClaim) error { for pvcTmpName, pvc := range *oldPvcs { - // if new pvc is not ready, keep this pvc if _, newPvcExist := (*newPvcs)[pvcTmpName]; !newPvcExist { continue } diff --git a/pkg/controllers/collaset/synccontrol/scale.go b/pkg/controllers/collaset/synccontrol/scale.go index 62d46168..4c1aea5d 100644 --- a/pkg/controllers/collaset/synccontrol/scale.go +++ b/pkg/controllers/collaset/synccontrol/scale.go @@ -220,14 +220,16 @@ func (r *RealSyncControl) allowIncludeExcludePods(ctx context.Context, cls *apps // doIncludeExcludePods do real include and exclude for pods which are allowed to in/exclude func (r *RealSyncControl) doIncludeExcludePods(ctx context.Context, cls *appsv1alpha1.CollaSet, excludePods []string, includePods []string, availableContexts []*appsv1alpha1.ContextDetail) error { - _, exErr := controllerutils.SlowStartBatch(len(excludePods), controllerutils.SlowStartInitialBatchSize, false, func(idx int, _ error) error { + var excludeErrs, includeErrs []error + _, _ = controllerutils.SlowStartBatch(len(excludePods), controllerutils.SlowStartInitialBatchSize, false, func(idx int, _ error) (err error) { + defer func() { excludeErrs = append(excludeErrs, err) }() return r.excludePod(ctx, cls, excludePods[idx]) }) - - _, inErr := controllerutils.SlowStartBatch(len(includePods), controllerutils.SlowStartInitialBatchSize, false, func(idx int, _ error) error { + _, _ = controllerutils.SlowStartBatch(len(includePods), controllerutils.SlowStartInitialBatchSize, false, func(idx int, _ error) (err error) { + defer func() { includeErrs = append(includeErrs, err) }() return r.includePod(ctx, cls, includePods[idx], strconv.Itoa(availableContexts[idx].ID)) }) - return controllerutils.AggregateErrors([]error{exErr, inErr}) + return controllerutils.AggregateErrors(append(includeErrs, excludeErrs...)) } // excludePod try to exclude a pod from collaset diff --git a/pkg/controllers/collaset/utils/pvc.go b/pkg/controllers/collaset/utils/pvc.go index 87bff04d..076fa1ed 100644 --- a/pkg/controllers/collaset/utils/pvc.go +++ b/pkg/controllers/collaset/utils/pvc.go @@ -25,7 +25,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" - appsv1alpha1 "kusionstack.io/kube-api/apps/v1alpha1" ) @@ -51,10 +50,14 @@ func BuildPvcWithHash(cls *appsv1alpha1.CollaSet, pvcTmp *corev1.PersistentVolum } claim.Labels[appsv1alpha1.PvcTemplateHashLabelKey] = hash claim.Labels[appsv1alpha1.PodInstanceIDLabelKey] = id + claim.Labels[appsv1alpha1.PvcTemplateLabelKey] = pvcTmp.Name return claim, nil } func ExtractPvcTmpName(cls *appsv1alpha1.CollaSet, pvc *corev1.PersistentVolumeClaim) (string, error) { + if pvcTmpName, exist := pvc.Labels[appsv1alpha1.PvcTemplateLabelKey]; exist { + return pvcTmpName, nil + } lastDashIndex := strings.LastIndex(pvc.Name, "-") if lastDashIndex == -1 { return "", fmt.Errorf("pvc %s has no postfix", pvc.Name) diff --git a/test/e2e/apps/collaset.go b/test/e2e/apps/collaset.go index 3f1f8fed..74b38aa6 100644 --- a/test/e2e/apps/collaset.go +++ b/test/e2e/apps/collaset.go @@ -744,6 +744,185 @@ var _ = SIGDescribe("CollaSet", func() { Expect(errors.IsNotFound(err)).To(Equal(true)) }) + framework.ConformanceIt("Include pod with different pvc template", func() { + cls1 := tester.NewCollaSet("collaset-inc-"+randStr, 1, appsv1alpha1.UpdateStrategy{RollingUpdate: &appsv1alpha1.RollingUpdateCollaSetStrategy{ByLabel: &appsv1alpha1.ByLabel{}}}) + cls1.Spec.VolumeClaimTemplates = []v1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-test1", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + "storage": resource.MustParse("100m"), + }, + }, + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + } + cls1.Spec.Template.Spec.Containers[0].VolumeMounts = []v1.VolumeMount{ + { + MountPath: "/path/to/mount", + Name: "pvc-test1", + }, + } + Expect(tester.CreateCollaSet(cls1)).NotTo(HaveOccurred()) + By("Wait for CollaSet1 status replicas satisfied") + Eventually(func() error { return tester.ExpectedStatusReplicas(cls1, 1, 1, 1, 1, 1) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred()) + + cls2 := tester.NewCollaSet("collaset-exc-"+randStr, 1, appsv1alpha1.UpdateStrategy{}) + cls2.Spec.VolumeClaimTemplates = []v1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-test2", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + "storage": resource.MustParse("100m"), + }, + }, + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + } + cls2.Spec.Template.Spec.Containers[0].VolumeMounts = []v1.VolumeMount{ + { + MountPath: "/path/to/mount", + Name: "pvc-test2", + }, + } + Expect(tester.CreateCollaSet(cls2)).NotTo(HaveOccurred()) + By("Wait for CollaSet2 with different pvc status replicas satisfied") + Eventually(func() error { return tester.ExpectedStatusReplicas(cls2, 1, 1, 1, 1, 1) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred()) + + By("exclude pod and scale in 1 replicas from CollaSet2") + pvcs, err := tester.ListPVCForCollaSet(cls2) + Expect(err).NotTo(HaveOccurred()) + pods, err := tester.ListPodsForCollaSet(cls2) + Expect(err).NotTo(HaveOccurred()) + PodToExclude := pods[0] + PvcToExclude := pvcs[0] + Expect(tester.UpdateCollaSet(cls2, func(cls *appsv1alpha1.CollaSet) { + cls.Spec.Replicas = int32Pointer(0) + cls.Spec.ScaleStrategy = appsv1alpha1.ScaleStrategy{ + PodToExclude: []string{PodToExclude.Name}, + } + })).NotTo(HaveOccurred()) + + By("Wait for CollaSet2 reconciled") + Eventually(func() bool { + if err := tester.GetCollaSet(cls2); err != nil { + return false + } + return cls2.Generation == cls2.Status.ObservedGeneration + }, 10*time.Second, 3*time.Second).Should(Equal(true)) + + By("Check pod is excluded") + excludedPodID := PodToExclude.Labels[appsv1alpha1.PodInstanceIDLabelKey] + Eventually(func() bool { + pods, err = tester.ListPodsForCollaSet(cls2) + Expect(err).Should(BeNil()) + for i := range pods { + pod := pods[i] + if pod.Name == PodToExclude.Name { + return false + } + } + return true + }, 10*time.Second, 1*time.Second).Should(BeTrue()) + + By("Check pvc is excluded") + Eventually(func() bool { + pvcs, err := tester.ListPVCForCollaSet(cls2) + Expect(err).Should(BeNil()) + for i := range pvcs { + pvc := pvcs[i] + if pvc.Labels[appsv1alpha1.PodInstanceIDLabelKey] == excludedPodID { + return false + } + } + return true + }, 10*time.Second, 1*time.Second).Should(BeTrue()) + + By("Wait for CollaSet2 reconciled") + Eventually(func() error { return tester.ExpectedStatusReplicas(cls2, 0, 0, 0, 0, 0) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred()) + + By("include pod and scale out 1 replicas in CollaSet1") + pods, err = tester.ListPodsForCollaSet(cls1) + Expect(err).NotTo(HaveOccurred()) + PodToInclude := PodToExclude + PvcToInclude := PvcToExclude + Expect(tester.UpdatePod(PodToInclude, func(pod *v1.Pod) { + pod.Labels["owner"] = cls1.Name + })).NotTo(HaveOccurred()) + Expect(tester.UpdatePvc(PvcToInclude, func(pvc *v1.PersistentVolumeClaim) { + pvc.Labels["owner"] = cls1.Name + })).NotTo(HaveOccurred()) + Expect(tester.UpdateCollaSet(cls1, func(cls *appsv1alpha1.CollaSet) { + cls.Spec.Replicas = int32Pointer(2) + cls.Spec.ScaleStrategy = appsv1alpha1.ScaleStrategy{ + PodToInclude: []string{PodToInclude.Name}, + } + })).NotTo(HaveOccurred()) + + By("Wait for CollaSet1 reconciled") + Eventually(func() bool { + if err := tester.GetCollaSet(cls1); err != nil { + return false + } + return cls1.Generation == cls1.Status.ObservedGeneration + }, 10*time.Second, 3*time.Second).Should(Equal(true)) + + By("Check pod is included") + Eventually(func() bool { + pods, err = tester.ListPodsForCollaSet(cls1) + Expect(err).Should(BeNil()) + for i := range pods { + pod := pods[i] + if pod.Name == PodToInclude.Name { + return true + } + } + return false + }, 30*time.Second, 1*time.Second).Should(BeTrue()) + + By("Check pvc is included") + Eventually(func() bool { + pvcs, err := tester.ListPVCForCollaSet(cls1) + Expect(err).Should(BeNil()) + for i := range pvcs { + pvc := pvcs[i] + if pvc.Labels[appsv1alpha1.PvcTemplateLabelKey] == cls2.Spec.VolumeClaimTemplates[0].Name { + return true + } + } + return false + }, 30*time.Second, 1*time.Second).Should(BeTrue()) + + By("Update included pod from CollaSet1") + Expect(tester.UpdateCollaSet(cls1, func(cls *appsv1alpha1.CollaSet) { + cls.Spec.UpdateStrategy.RollingUpdate = &appsv1alpha1.RollingUpdateCollaSetStrategy{} + })).NotTo(HaveOccurred()) + + By("Wait for CollaSet1 reconciled") + Eventually(func() error { return tester.ExpectedStatusReplicas(cls1, 2, 2, 2, 2, 2) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred()) + + By("Check pvc from CollaSet2 is deleted") + Eventually(func() bool { + pvcs, err := tester.ListPVCForCollaSet(cls1) + Expect(err).Should(BeNil()) + for i := range pvcs { + pvc := pvcs[i] + if pvc.Labels[appsv1alpha1.PvcTemplateLabelKey] == cls2.Spec.VolumeClaimTemplates[0].Name { + return false + } + } + return len(pvcs) == 2 + }, 1000*time.Second, 1*time.Second).Should(BeTrue()) + }) + framework.ConformanceIt("PVC retention policy with scale in pods", func() { cls := tester.NewCollaSet("collaset-"+randStr, 2, appsv1alpha1.UpdateStrategy{}) cls.Spec.ScaleStrategy.PersistentVolumeClaimRetentionPolicy = &appsv1alpha1.PersistentVolumeClaimRetentionPolicy{ diff --git a/test/e2e/framework/collaset_util.go b/test/e2e/framework/collaset_util.go index 126b58d5..20601255 100644 --- a/test/e2e/framework/collaset_util.go +++ b/test/e2e/framework/collaset_util.go @@ -174,6 +174,19 @@ func (t *CollaSetTester) UpdatePod(pod *v1.Pod, fn func(pod *v1.Pod)) error { }) } +func (t *CollaSetTester) UpdatePvc(pvc *v1.PersistentVolumeClaim, fn func(pvc *v1.PersistentVolumeClaim)) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + err := t.client.Get(context.TODO(), types.NamespacedName{Namespace: pvc.Namespace, Name: pvc.Name}, pvc) + if err != nil { + return err + } + + fn(pvc) + err = t.client.Update(context.TODO(), pvc) + return err + }) +} + func (t *CollaSetTester) ExpectedStatusReplicas(cls *appsv1alpha1.CollaSet, replicas, readyReplicas, availableReplicas, updatedReplicas, totalReplicas int32) error { if err := t.client.Get(context.TODO(), types.NamespacedName{Namespace: cls.Namespace, Name: cls.Name}, cls); err != nil { return err