Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve extract pvc template name from pvc #313

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
k8s.io/klog/v2 v2.100.1
k8s.io/kubernetes v0.0.0-00010101000000-000000000000
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
kusionstack.io/kube-api v0.6.5
kusionstack.io/kube-api v0.6.6
kusionstack.io/resourceconsist v0.0.1
sigs.k8s.io/controller-runtime v0.15.1
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,8 @@ k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
kusionstack.io/kube-api v0.6.5 h1:aVF6YsejHd1ujpI5oW6rdu2bhsYzX0PM5OnoHvAtIzs=
kusionstack.io/kube-api v0.6.5/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y=
kusionstack.io/kube-api v0.6.6 h1:gMLUQL/eectQxkosnlv1m/R2xieY2crETliWRcxBICg=
kusionstack.io/kube-api v0.6.6/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y=
kusionstack.io/resourceconsist v0.0.1 h1:+k/jriq5Ld7fQUYfWSMGynz/FesHtl3Rk2fmQPjBe0g=
kusionstack.io/resourceconsist v0.0.1/go.mod h1:816xS/fY6EOUbPFjXIWW/TGs8/YE46qP4ElKeIiwFdU=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
Expand Down
18 changes: 14 additions & 4 deletions pkg/controllers/collaset/pvccontrol/pvc_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,15 @@ func (pc *RealPvcControl) DeletePodUnusedPvcs(ctx context.Context, cls *appsv1al
return err
}

mountedPvcNames := sets.String{}
for _, container := range pod.Spec.Containers {
for _, v := range container.VolumeMounts {
mountedPvcNames.Insert(v.Name)
}
}

//delete pvc which is not claimed in templates
if err := deleteUnclaimedPvcs(pc.client, ctx, cls, oldPvcs); err != nil {
if err := deleteUnclaimedPvcs(pc.client, ctx, cls, oldPvcs, mountedPvcNames); err != nil {
return err
}

Expand Down Expand Up @@ -304,16 +311,20 @@ func IsPodPvcTmpChanged(cls *appsv1alpha1.CollaSet, pod *corev1.Pod, existingPvc
return false, nil
}

func deleteUnclaimedPvcs(c client.Client, ctx context.Context, cls *appsv1alpha1.CollaSet, oldPvcs *map[string]*corev1.PersistentVolumeClaim) error {
func deleteUnclaimedPvcs(c client.Client, ctx context.Context, cls *appsv1alpha1.CollaSet, oldPvcs *map[string]*corev1.PersistentVolumeClaim, mountedPvcNames sets.String) error {
expectedNames := sets.String{}
for _, pvcTmp := range cls.Spec.VolumeClaimTemplates {
expectedNames.Insert(pvcTmp.Name)
}
for pvcTmpName, pvc := range *oldPvcs {
// if pvc is still mounted on pod, keep it
if mountedPvcNames.Has(pvcTmpName) {
continue
}
// if pvc is claimed in pvc templates, keep it
if expectedNames.Has(pvcTmpName) {
continue
}
// if pvc is not claimed in pvc templates, delete it
if err := c.Delete(ctx, pvc); err != nil {
return err
} else if err := collasetutils.ActiveExpectations.ExpectDelete(cls, expectations.Pvc, pvc.Name); err != nil {
Expand All @@ -325,7 +336,6 @@ func deleteUnclaimedPvcs(c client.Client, ctx context.Context, cls *appsv1alpha1

func deleteOldPvcs(c client.Client, ctx context.Context, cls *appsv1alpha1.CollaSet, newPvcs, oldPvcs *map[string]*corev1.PersistentVolumeClaim) error {
for pvcTmpName, pvc := range *oldPvcs {
// if new pvc is not ready, keep this pvc
if _, newPvcExist := (*newPvcs)[pvcTmpName]; !newPvcExist {
continue
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/controllers/collaset/synccontrol/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func getPodsToDelete(filteredPods []*collasetutils.PodWrapper, replaceMapping ma

if replacePairPod, exist := replaceMapping[pod.Name]; exist && replacePairPod != nil {
// don't selective scaleIn newPod (and its originPod) until replace finished
if replacePairPod.ToDelete {
if replacePairPod.ToDelete && !pod.ToDelete {
continue
}
// when scaleIn origin Pod, newPod should be deleted if not service available
Expand Down Expand Up @@ -220,14 +220,16 @@ func (r *RealSyncControl) allowIncludeExcludePods(ctx context.Context, cls *apps

// doIncludeExcludePods do real include and exclude for pods which are allowed to in/exclude
func (r *RealSyncControl) doIncludeExcludePods(ctx context.Context, cls *appsv1alpha1.CollaSet, excludePods []string, includePods []string, availableContexts []*appsv1alpha1.ContextDetail) error {
_, exErr := controllerutils.SlowStartBatch(len(excludePods), controllerutils.SlowStartInitialBatchSize, false, func(idx int, _ error) error {
var excludeErrs, includeErrs []error
_, _ = controllerutils.SlowStartBatch(len(excludePods), controllerutils.SlowStartInitialBatchSize, false, func(idx int, _ error) (err error) {
defer func() { excludeErrs = append(excludeErrs, err) }()
return r.excludePod(ctx, cls, excludePods[idx])
})

_, inErr := controllerutils.SlowStartBatch(len(includePods), controllerutils.SlowStartInitialBatchSize, false, func(idx int, _ error) error {
_, _ = controllerutils.SlowStartBatch(len(includePods), controllerutils.SlowStartInitialBatchSize, false, func(idx int, _ error) (err error) {
defer func() { includeErrs = append(includeErrs, err) }()
return r.includePod(ctx, cls, includePods[idx], strconv.Itoa(availableContexts[idx].ID))
})
return controllerutils.AggregateErrors([]error{exErr, inErr})
return controllerutils.AggregateErrors(append(includeErrs, excludeErrs...))
}

// excludePod try to exclude a pod from collaset
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/collaset/utils/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"

appsv1alpha1 "kusionstack.io/kube-api/apps/v1alpha1"
)

Expand All @@ -51,10 +50,14 @@ func BuildPvcWithHash(cls *appsv1alpha1.CollaSet, pvcTmp *corev1.PersistentVolum
}
claim.Labels[appsv1alpha1.PvcTemplateHashLabelKey] = hash
claim.Labels[appsv1alpha1.PodInstanceIDLabelKey] = id
claim.Labels[appsv1alpha1.PvcTemplateLabelKey] = pvcTmp.Name
return claim, nil
}

func ExtractPvcTmpName(cls *appsv1alpha1.CollaSet, pvc *corev1.PersistentVolumeClaim) (string, error) {
if pvcTmpName, exist := pvc.Labels[appsv1alpha1.PvcTemplateLabelKey]; exist {
return pvcTmpName, nil
}
lastDashIndex := strings.LastIndex(pvc.Name, "-")
if lastDashIndex == -1 {
return "", fmt.Errorf("pvc %s has no postfix", pvc.Name)
Expand Down
235 changes: 235 additions & 0 deletions test/e2e/apps/collaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,185 @@ var _ = SIGDescribe("CollaSet", func() {
Expect(errors.IsNotFound(err)).To(Equal(true))
})

framework.ConformanceIt("Include pod with different pvc template", func() {
cls1 := tester.NewCollaSet("collaset-inc-"+randStr, 1, appsv1alpha1.UpdateStrategy{RollingUpdate: &appsv1alpha1.RollingUpdateCollaSetStrategy{ByLabel: &appsv1alpha1.ByLabel{}}})
cls1.Spec.VolumeClaimTemplates = []v1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-test1",
},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"storage": resource.MustParse("100m"),
},
},
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
},
},
}
cls1.Spec.Template.Spec.Containers[0].VolumeMounts = []v1.VolumeMount{
{
MountPath: "/path/to/mount",
Name: "pvc-test1",
},
}
Expect(tester.CreateCollaSet(cls1)).NotTo(HaveOccurred())
By("Wait for CollaSet1 status replicas satisfied")
Eventually(func() error { return tester.ExpectedStatusReplicas(cls1, 1, 1, 1, 1, 1) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred())

cls2 := tester.NewCollaSet("collaset-exc-"+randStr, 1, appsv1alpha1.UpdateStrategy{})
cls2.Spec.VolumeClaimTemplates = []v1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-test2",
},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"storage": resource.MustParse("100m"),
},
},
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
},
},
}
cls2.Spec.Template.Spec.Containers[0].VolumeMounts = []v1.VolumeMount{
{
MountPath: "/path/to/mount",
Name: "pvc-test2",
},
}
Expect(tester.CreateCollaSet(cls2)).NotTo(HaveOccurred())
By("Wait for CollaSet2 with different pvc status replicas satisfied")
Eventually(func() error { return tester.ExpectedStatusReplicas(cls2, 1, 1, 1, 1, 1) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred())

By("exclude pod and scale in 1 replicas from CollaSet2")
pvcs, err := tester.ListPVCForCollaSet(cls2)
Expect(err).NotTo(HaveOccurred())
pods, err := tester.ListPodsForCollaSet(cls2)
Expect(err).NotTo(HaveOccurred())
PodToExclude := pods[0]
PvcToExclude := pvcs[0]
Expect(tester.UpdateCollaSet(cls2, func(cls *appsv1alpha1.CollaSet) {
cls.Spec.Replicas = int32Pointer(0)
cls.Spec.ScaleStrategy = appsv1alpha1.ScaleStrategy{
PodToExclude: []string{PodToExclude.Name},
}
})).NotTo(HaveOccurred())

By("Wait for CollaSet2 reconciled")
Eventually(func() bool {
if err := tester.GetCollaSet(cls2); err != nil {
return false
}
return cls2.Generation == cls2.Status.ObservedGeneration
}, 10*time.Second, 3*time.Second).Should(Equal(true))

By("Check pod is excluded")
excludedPodID := PodToExclude.Labels[appsv1alpha1.PodInstanceIDLabelKey]
Eventually(func() bool {
pods, err = tester.ListPodsForCollaSet(cls2)
Expect(err).Should(BeNil())
for i := range pods {
pod := pods[i]
if pod.Name == PodToExclude.Name {
return false
}
}
return true
}, 10*time.Second, 1*time.Second).Should(BeTrue())

By("Check pvc is excluded")
Eventually(func() bool {
pvcs, err := tester.ListPVCForCollaSet(cls2)
Expect(err).Should(BeNil())
for i := range pvcs {
pvc := pvcs[i]
if pvc.Labels[appsv1alpha1.PodInstanceIDLabelKey] == excludedPodID {
return false
}
}
return true
}, 10*time.Second, 1*time.Second).Should(BeTrue())

By("Wait for CollaSet2 reconciled")
Eventually(func() error { return tester.ExpectedStatusReplicas(cls2, 0, 0, 0, 0, 0) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred())

By("include pod and scale out 1 replicas in CollaSet1")
pods, err = tester.ListPodsForCollaSet(cls1)
Expect(err).NotTo(HaveOccurred())
PodToInclude := PodToExclude
PvcToInclude := PvcToExclude
Expect(tester.UpdatePod(PodToInclude, func(pod *v1.Pod) {
pod.Labels["owner"] = cls1.Name
})).NotTo(HaveOccurred())
Expect(tester.UpdatePvc(PvcToInclude, func(pvc *v1.PersistentVolumeClaim) {
pvc.Labels["owner"] = cls1.Name
})).NotTo(HaveOccurred())
Expect(tester.UpdateCollaSet(cls1, func(cls *appsv1alpha1.CollaSet) {
cls.Spec.Replicas = int32Pointer(2)
cls.Spec.ScaleStrategy = appsv1alpha1.ScaleStrategy{
PodToInclude: []string{PodToInclude.Name},
}
})).NotTo(HaveOccurred())

By("Wait for CollaSet1 reconciled")
Eventually(func() bool {
if err := tester.GetCollaSet(cls1); err != nil {
return false
}
return cls1.Generation == cls1.Status.ObservedGeneration
}, 10*time.Second, 3*time.Second).Should(Equal(true))

By("Check pod is included")
Eventually(func() bool {
pods, err = tester.ListPodsForCollaSet(cls1)
Expect(err).Should(BeNil())
for i := range pods {
pod := pods[i]
if pod.Name == PodToInclude.Name {
return true
}
}
return false
}, 30*time.Second, 1*time.Second).Should(BeTrue())

By("Check pvc is included")
Eventually(func() bool {
pvcs, err := tester.ListPVCForCollaSet(cls1)
Expect(err).Should(BeNil())
for i := range pvcs {
pvc := pvcs[i]
if pvc.Labels[appsv1alpha1.PvcTemplateLabelKey] == cls2.Spec.VolumeClaimTemplates[0].Name {
return true
}
}
return false
}, 30*time.Second, 1*time.Second).Should(BeTrue())

By("Update included pod from CollaSet1")
Expect(tester.UpdateCollaSet(cls1, func(cls *appsv1alpha1.CollaSet) {
cls.Spec.UpdateStrategy.RollingUpdate = &appsv1alpha1.RollingUpdateCollaSetStrategy{}
})).NotTo(HaveOccurred())

By("Wait for CollaSet1 reconciled")
Eventually(func() error { return tester.ExpectedStatusReplicas(cls1, 2, 2, 2, 2, 2) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred())

By("Check pvc from CollaSet2 is deleted")
Eventually(func() bool {
pvcs, err := tester.ListPVCForCollaSet(cls1)
Expect(err).Should(BeNil())
for i := range pvcs {
pvc := pvcs[i]
if pvc.Labels[appsv1alpha1.PvcTemplateLabelKey] == cls2.Spec.VolumeClaimTemplates[0].Name {
return false
}
}
return len(pvcs) == 2
}, 1000*time.Second, 1*time.Second).Should(BeTrue())
})

framework.ConformanceIt("PVC retention policy with scale in pods", func() {
cls := tester.NewCollaSet("collaset-"+randStr, 2, appsv1alpha1.UpdateStrategy{})
cls.Spec.ScaleStrategy.PersistentVolumeClaimRetentionPolicy = &appsv1alpha1.PersistentVolumeClaimRetentionPolicy{
Expand Down Expand Up @@ -1831,6 +2010,62 @@ var _ = SIGDescribe("CollaSet", func() {
}, 30*time.Second, 3*time.Second).Should(BeTrue())
})

framework.ConformanceIt("scaleIn origin and new pod", func() {
cls := tester.NewCollaSet("collaset-"+randStr, 1, appsv1alpha1.UpdateStrategy{})
// use bad image to mock new replace pod unavailable
cls.Spec.Template.Spec.Containers[0].Image = "nginx:non-exist"
Expect(tester.CreateCollaSet(cls)).NotTo(HaveOccurred())

By("Wait for status replicas satisfied")
Eventually(func() error { return tester.ExpectedStatusReplicas(cls, 1, 0, 0, 1, 1) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred())

By("Replace pod by label")
pods, err := tester.ListPodsForCollaSet(cls)
Expect(err).NotTo(HaveOccurred())
podToReplace := pods[0]
Expect(tester.UpdatePod(podToReplace, func(pod *v1.Pod) {
pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey] = "true"
})).NotTo(HaveOccurred())

By("Wait for replace new pod created")
Eventually(func() error { return tester.ExpectedStatusReplicas(cls, 2, 0, 0, 2, 2) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred())

By("Selective scaleIn new pod")
pods, err = tester.ListPodsForCollaSet(cls)
Expect(err).NotTo(HaveOccurred())
var newPod *v1.Pod
for _, pod := range pods {
if pod.Name != podToReplace.Name {
newPod = pod
}
}
Expect(tester.UpdateCollaSet(cls, func(cls *appsv1alpha1.CollaSet) {
cls.Spec.Replicas = int32Pointer(0)
cls.Spec.ScaleStrategy = appsv1alpha1.ScaleStrategy{
PodToDelete: []string{newPod.Name, podToReplace.Name},
}
})).NotTo(HaveOccurred())

By("Wait for CollaSet reconciled")
Eventually(func() bool {
if err := tester.GetCollaSet(cls); err != nil {
return false
}
return cls.Generation == cls.Status.ObservedGeneration
}, 10*time.Second, 3*time.Second).Should(Equal(true))

By("Wait for pods are deleted")
Eventually(func() error { return tester.ExpectedStatusReplicas(cls, 0, 0, 0, 0, 0) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred())

By("Check resourceContext")
var currResourceContexts []*appsv1alpha1.ResourceContext
Eventually(func() bool {
currResourceContexts, err = tester.ListResourceContextsForCollaSet(cls)
Expect(err).Should(BeNil())
return len(currResourceContexts) == 0
}, 30*time.Second, 3*time.Second).Should(BeTrue())
})

framework.ConformanceIt("scaleIn new pod", func() {
cls := tester.NewCollaSet("collaset-"+randStr, 1, appsv1alpha1.UpdateStrategy{})
// use bad image to mock new replace pod unavailable
Expand Down
13 changes: 13 additions & 0 deletions test/e2e/framework/collaset_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,19 @@ func (t *CollaSetTester) UpdatePod(pod *v1.Pod, fn func(pod *v1.Pod)) error {
})
}

func (t *CollaSetTester) UpdatePvc(pvc *v1.PersistentVolumeClaim, fn func(pvc *v1.PersistentVolumeClaim)) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
err := t.client.Get(context.TODO(), types.NamespacedName{Namespace: pvc.Namespace, Name: pvc.Name}, pvc)
if err != nil {
return err
}

fn(pvc)
err = t.client.Update(context.TODO(), pvc)
return err
})
}

func (t *CollaSetTester) ExpectedStatusReplicas(cls *appsv1alpha1.CollaSet, replicas, readyReplicas, availableReplicas, updatedReplicas, totalReplicas int32) error {
if err := t.client.Get(context.TODO(), types.NamespacedName{Namespace: cls.Namespace, Name: cls.Name}, cls); err != nil {
return err
Expand Down
Loading