From 7b08f59f67dd7d408507f9584ecd0ae7c4ebfe79 Mon Sep 17 00:00:00 2001 From: WeichengWang Date: Tue, 30 Jan 2024 20:24:46 +0800 Subject: [PATCH 1/2] feat: support multi cluster --- go.mod | 25 +- go.sum | 29 ++- pkg/frame/controller/consister.go | 246 +++++++++++++++--- .../controller/resourceconsist_controller.go | 93 ++++--- pkg/frame/controller/types.go | 16 ++ 5 files changed, 309 insertions(+), 100 deletions(-) diff --git a/go.mod b/go.mod index 451f14d..b87ce98 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,13 @@ require ( github.com/alibabacloud-go/tea-utils/v2 v2.0.4 github.com/go-logr/logr v1.2.4 github.com/onsi/ginkgo v1.16.5 - github.com/onsi/gomega v1.26.0 - github.com/stretchr/testify v1.8.1 - k8s.io/api v0.27.2 - k8s.io/apimachinery v0.27.2 - k8s.io/client-go v0.22.6 + github.com/onsi/gomega v1.27.6 + github.com/stretchr/testify v1.8.2 + k8s.io/api v0.28.4 + k8s.io/apimachinery v0.28.4 + k8s.io/client-go v0.28.4 kusionstack.io/kube-api v0.0.27 + kusionstack.io/kube-utils v0.1.9 sigs.k8s.io/controller-runtime v0.15.1 ) @@ -31,7 +32,7 @@ require ( github.com/clbanning/mxj/v2 v2.5.5 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/distribution v2.8.2+incompatible // indirect - github.com/evanphx/json-patch v4.11.0+incompatible // indirect + github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/zapr v1.2.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -59,22 +60,22 @@ require ( github.com/tjfoc/gmsm v1.3.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.25.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/net v0.18.0 // indirect golang.org/x/oauth2 v0.8.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/term v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/sys v0.14.0 // indirect + golang.org/x/term v0.14.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.3.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/protobuf v1.30.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.56.0 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.28.3 // indirect - k8s.io/component-base v0.28.3 // indirect + k8s.io/component-base v0.28.4 // indirect k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect diff --git a/go.sum b/go.sum index 301f815..8204623 100644 --- a/go.sum +++ b/go.sum @@ -135,8 +135,9 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= -github.com/evanphx/json-patch v4.11.0+incompatible h1:glyUF9yIYtMHzn8xaKw5rMhdWcwsYV8dZHIq5567/xs= github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= +github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -434,8 +435,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tjfoc/gmsm v1.3.2 h1:7JVkAn5bvUJ7HtU08iW6UiD+UTmJTIToHCfeFzkcCxM= github.com/tjfoc/gmsm v1.3.2/go.mod h1:HaUcFuY0auTiaHB9MHFGCPx5IaLhTUd2atbCFBQXn9w= @@ -570,8 +571,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -644,8 +645,8 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -653,8 +654,8 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo= -golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.14.0 h1:LGK9IlZ8T9jvdy6cTdfKUCltatMFOehAQo9SRC46UQ8= +golang.org/x/term v0.14.0/go.mod h1:TySc+nGkYR6qt8km8wUhuFRTVSMIX3XPR58y2lC8vww= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -666,8 +667,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -791,8 +792,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -856,6 +857,8 @@ k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSn k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= kusionstack.io/kube-api v0.0.27 h1:7umLyoMmOdse0A8nXGccNpOdYu7B5sNPLoPxc/hmBIY= kusionstack.io/kube-api v0.0.27/go.mod h1:QIQrH+MK9xuV+mXCAkk6DN8z6b8oyf4XN0VRccmHH/k= +kusionstack.io/kube-utils v0.1.9 h1:xZX0nwu9BJDXYxNOICP9/eLee3So7cpHWi55V0+8B1c= +kusionstack.io/kube-utils v0.1.9/go.mod h1:ktBPYyVZCLtSvzMLyfEcQ7VkLLstdazsvqt2gB+4hwE= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/pkg/frame/controller/consister.go b/pkg/frame/controller/consister.go index d47c6e3..3cca819 100644 --- a/pkg/frame/controller/consister.go +++ b/pkg/frame/controller/consister.go @@ -32,6 +32,7 @@ import ( "kusionstack.io/kube-api/apps/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/client" + "kusionstack.io/kube-utils/multicluster/clusterinfo" "kusionstack.io/resourceconsist/pkg/utils" ) @@ -252,7 +253,11 @@ func (r *Consist) syncEmployees(ctx context.Context, employer client.Object, exp } annos[lifecycleFinalizerRecordedAnnoKey] = strings.Join(toAddLifecycleFlzEmployees, ",") employer.SetAnnotations(annos) - err = r.Client.Patch(ctx, employer, patch) + if _, ok := r.adapter.(MultiClusterOptions); ok { + err = r.Client.Patch(clusterinfo.WithCluster(ctx, clusterinfo.Fed), employer, patch) + } else { + err = r.Client.Patch(ctx, employer, patch) + } if err != nil { return false, false, fmt.Errorf("patch lifecycleFinalizerRecordedAnno failed, err: %s", err.Error()) } @@ -323,6 +328,7 @@ func (r *Consist) ensureExpectedFinalizerNotNeedRecord(ctx context.Context, empl } func (r *Consist) ensureExpectedFinalizerNeedRecord(ctx context.Context, employer client.Object, selectedEmployeeNames []string) (bool, error) { + var err error var toAdd, toDelete []PodExpectedFinalizerOps addedExpectedFinalizerPodNames := strings.Split(employer.GetAnnotations()[expectedFinalizerAddedAnnoKey], ",") @@ -351,7 +357,12 @@ func (r *Consist) ensureExpectedFinalizerNeedRecord(ctx context.Context, employe } annos[expectedFinalizerAddedAnnoKey] = strings.Join(notDeletedPodNames, ",") employer.SetAnnotations(annos) - return len(notDeletedPodNames) == 0, r.Client.Patch(ctx, employer, patch) + if _, ok := r.adapter.(MultiClusterOptions); ok { + err = r.Client.Patch(clusterinfo.WithCluster(ctx, clusterinfo.Fed), employer, patch) + } else { + err = r.Client.Patch(ctx, employer, patch) + } + return len(notDeletedPodNames) == 0, err } selectedSet := sets.NewString(selectedEmployeeNames...) @@ -404,8 +415,13 @@ func (r *Consist) ensureExpectedFinalizerNeedRecord(ctx context.Context, employe } annos[expectedFinalizerAddedAnnoKey] = strings.Join(addedNames, ",") employer.SetAnnotations(annos) - errPatchEmployer := r.Client.Patch(ctx, employer, patch) - return len(addedNames) == 0, errors2.NewAggregate([]error{errPatchEmployees, errPatchEmployer}) + + if _, ok := r.adapter.(MultiClusterOptions); ok { + err = r.Client.Patch(clusterinfo.WithCluster(ctx, clusterinfo.Fed), employer, patch) + } else { + err = r.Client.Patch(ctx, employer, patch) + } + return len(addedNames) == 0, errors2.NewAggregate([]error{errPatchEmployees, err}) } func (r *Consist) patchPodExpectedFinalizer(ctx context.Context, employer client.Object, toAdd, toDelete []PodExpectedFinalizerOps) error { @@ -420,13 +436,40 @@ func (r *Consist) patchPodExpectedFinalizer(ctx context.Context, employer client func (r *Consist) patchAddPodExpectedFinalizer(ctx context.Context, employer client.Object, toAdd []PodExpectedFinalizerOps, expectedFlzKey, expectedFlz string) error { + var employeeUnderLocal bool + multiClusterOptions, multiClusterOptionsImplemented := r.adapter.(MultiClusterOptions) + if multiClusterOptionsImplemented { + employeeUnderLocal = !multiClusterOptions.EmployeeFed() + } + _, err := utils.SlowStartBatch(len(toAdd), 1, false, func(i int, _ error) error { podExpectedFinalizerOps := &toAdd[i] + var localCluster string + employeeName := podExpectedFinalizerOps.Name + if multiClusterOptionsImplemented && employeeUnderLocal { + // todo check avoid panic + employeeName = strings.Split(employeeName, "#")[0] + localCluster = strings.Split(employeeName, "#")[1] + } var pod corev1.Pod - err := r.Client.Get(ctx, types.NamespacedName{ - Namespace: employer.GetNamespace(), - Name: podExpectedFinalizerOps.Name, - }, &pod) + var err error + if multiClusterOptionsImplemented { + if employeeUnderLocal { + err = r.Client.Get(clusterinfo.WithCluster(ctx, localCluster), types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: employeeName, + }, &pod) + } + err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: employeeName, + }, &pod) + } else { + err = r.Client.Get(ctx, types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: employeeName, + }, &pod) + } if err != nil { if errors.IsNotFound(err) { return nil @@ -451,7 +494,16 @@ func (r *Consist) patchAddPodExpectedFinalizer(ctx context.Context, employer cli return errMarshal } pod.Annotations[v1alpha1.PodAvailableConditionsAnnotation] = string(annoAvailableExpectedFlzs) - errPatch := r.Client.Patch(ctx, &pod, patch) + var errPatch error + if multiClusterOptionsImplemented { + if employeeUnderLocal { + errPatch = r.Client.Patch(clusterinfo.WithCluster(ctx, localCluster), &pod, patch) + } else { + errPatch = r.Client.Patch(clusterinfo.WithCluster(ctx, clusterinfo.Fed), &pod, patch) + } + } else { + errPatch = r.Client.Patch(ctx, &pod, patch) + } if errPatch != nil { return errPatch } @@ -470,7 +522,16 @@ func (r *Consist) patchAddPodExpectedFinalizer(ctx context.Context, employer cli return errMarshal } pod.Annotations[v1alpha1.PodAvailableConditionsAnnotation] = string(annoAvailableExpectedFlzs) - errPatch := r.Client.Patch(ctx, &pod, patch) + var errPatch error + if multiClusterOptionsImplemented { + if employeeUnderLocal { + errPatch = r.Client.Patch(clusterinfo.WithCluster(ctx, localCluster), &pod, patch) + } else { + errPatch = r.Client.Patch(clusterinfo.WithCluster(ctx, clusterinfo.Fed), &pod, patch) + } + } else { + errPatch = r.Client.Patch(ctx, &pod, patch) + } if errPatch != nil { return errPatch } @@ -485,13 +546,44 @@ func (r *Consist) patchAddPodExpectedFinalizer(ctx context.Context, employer cli func (r *Consist) patchDeletePodExpectedFinalizer(ctx context.Context, employer client.Object, toDelete []PodExpectedFinalizerOps, expectedFlzKey string) error { + var employeeUnderLocal bool + multiClusterOptions, multiClusterOptionsImplemented := r.adapter.(MultiClusterOptions) + if multiClusterOptionsImplemented { + employeeUnderLocal = !multiClusterOptions.EmployeeFed() + } + _, err := utils.SlowStartBatch(len(toDelete), 1, false, func(i int, _ error) error { podExpectedFinalizerOps := &toDelete[i] + + var localCluster string + employeeName := podExpectedFinalizerOps.Name + if multiClusterOptionsImplemented && employeeUnderLocal { + // todo check avoid panic + employeeName = strings.Split(employeeName, "#")[0] + localCluster = strings.Split(employeeName, "#")[1] + } + var pod corev1.Pod - err := r.Client.Get(ctx, types.NamespacedName{ - Namespace: employer.GetNamespace(), - Name: podExpectedFinalizerOps.Name, - }, &pod) + + var err error + if multiClusterOptionsImplemented { + if employeeUnderLocal { + err = r.Client.Get(clusterinfo.WithCluster(ctx, localCluster), types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: employeeName, + }, &pod) + } + err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: employeeName, + }, &pod) + } else { + err = r.Client.Get(ctx, types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: employeeName, + }, &pod) + } + if err != nil { if errors.IsNotFound(err) { podExpectedFinalizerOps.Succeed = true @@ -519,7 +611,16 @@ func (r *Consist) patchDeletePodExpectedFinalizer(ctx context.Context, employer return errMarshal } pod.Annotations[v1alpha1.PodAvailableConditionsAnnotation] = string(annoAvailableExpectedFlzs) - errPatch := r.Client.Patch(ctx, &pod, patch) + var errPatch error + if multiClusterOptionsImplemented { + if employeeUnderLocal { + errPatch = r.Client.Patch(clusterinfo.WithCluster(ctx, localCluster), &pod, patch) + } else { + errPatch = r.Client.Patch(clusterinfo.WithCluster(ctx, clusterinfo.Fed), &pod, patch) + } + } else { + errPatch = r.Client.Patch(ctx, &pod, patch) + } if errPatch != nil { return errPatch } @@ -539,10 +640,19 @@ func (r *Consist) cleanEmployerCleanFinalizer(ctx context.Context, employer clie employerLatest = &corev1.Service{} } - err := r.Client.Get(ctx, types.NamespacedName{ - Namespace: employer.GetNamespace(), - Name: employer.GetName(), - }, employerLatest) + var err error + if _, ok := r.adapter.(MultiClusterOptions); ok { + err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: employer.GetName(), + }, employerLatest) + } else { + err = r.Client.Get(ctx, types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: employer.GetName(), + }, employerLatest) + } + if err != nil { if errors.IsNotFound(err) { return nil @@ -564,24 +674,49 @@ func (r *Consist) cleanEmployerCleanFinalizer(ctx context.Context, employer clie return nil } employerLatest.SetFinalizers(finalizers) + if _, ok := r.adapter.(MultiClusterOptions); ok { + return r.Client.Update(clusterinfo.WithCluster(ctx, clusterinfo.Fed), employerLatest) + } return r.Client.Update(ctx, employerLatest) } +// ensureLifecycleFinalizer add/delete lifecycle finalizer to pods +// if employee is not pod, or the adapter not follows PodOpsLifecycle, len of toAdd & toDelete would be 0 func (r *Consist) ensureLifecycleFinalizer(ctx context.Context, ns, lifecycleFlz string, toAdd, toDelete []string) error { - watchOptions, watchOptionsImplemented := r.adapter.(ReconcileWatchOptions) + var employeeUnderLocal bool + multiClusterOptions, multiClusterOptionsImplemented := r.adapter.(MultiClusterOptions) + if multiClusterOptionsImplemented { + employeeUnderLocal = !multiClusterOptions.EmployeeFed() + } _, err := utils.SlowStartBatch(len(toAdd), 1, false, func(i int, _ error) error { employeeName := toAdd[i] - var employee client.Object - if watchOptionsImplemented { - employee = watchOptions.NewEmployee() + var localCluster string + if multiClusterOptionsImplemented && employeeUnderLocal { + // todo check avoid panic + employeeName = strings.Split(employeeName, "#")[0] + localCluster = strings.Split(employeeName, "#")[1] + } + var employee = &corev1.Pod{} + var err error + if multiClusterOptionsImplemented { + if employeeUnderLocal { + err = r.Client.Get(clusterinfo.WithCluster(ctx, localCluster), types.NamespacedName{ + Namespace: ns, + Name: employeeName, + }, employee) + } + err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ + Namespace: ns, + Name: employeeName, + }, employee) } else { - employee = &corev1.Pod{} + err = r.Client.Get(ctx, types.NamespacedName{ + Namespace: ns, + Name: employeeName, + }, employee) } - err := r.Client.Get(ctx, types.NamespacedName{ - Namespace: ns, - Name: employeeName, - }, employee) + if err != nil { if errors.IsNotFound(err) { return nil @@ -599,7 +734,16 @@ func (r *Consist) ensureLifecycleFinalizer(ctx context.Context, ns, lifecycleFlz return nil } employee.SetFinalizers(append(employee.GetFinalizers(), lifecycleFlz)) - return r.Client.Update(ctx, employee) + if multiClusterOptionsImplemented { + if employeeUnderLocal { + err = r.Client.Update(clusterinfo.WithCluster(ctx, localCluster), employee) + } else { + err = r.Client.Update(clusterinfo.WithCluster(ctx, clusterinfo.Fed), employee) + } + } else { + err = r.Client.Update(ctx, employee) + } + return err }) if err != nil { return err @@ -607,16 +751,30 @@ func (r *Consist) ensureLifecycleFinalizer(ctx context.Context, ns, lifecycleFlz _, err = utils.SlowStartBatch(len(toDelete), 1, false, func(i int, _ error) error { employeeName := toDelete[i] - var employee client.Object - if watchOptionsImplemented { - employee = watchOptions.NewEmployee() + var localCluster string + if multiClusterOptionsImplemented && employeeUnderLocal { + // todo check avoid panic + employeeName = strings.Split(employeeName, "#")[0] + localCluster = strings.Split(employeeName, "#")[1] + } + var employee = &corev1.Pod{} + if multiClusterOptionsImplemented { + if employeeUnderLocal { + err = r.Client.Get(clusterinfo.WithCluster(ctx, localCluster), types.NamespacedName{ + Namespace: ns, + Name: employeeName, + }, employee) + } + err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ + Namespace: ns, + Name: employeeName, + }, employee) } else { - employee = &corev1.Pod{} + err = r.Client.Get(ctx, types.NamespacedName{ + Namespace: ns, + Name: employeeName, + }, employee) } - err := r.Client.Get(ctx, types.NamespacedName{ - Namespace: ns, - Name: employeeName, - }, employee) if err != nil { if errors.IsNotFound(err) { return nil @@ -636,7 +794,16 @@ func (r *Consist) ensureLifecycleFinalizer(ctx context.Context, ns, lifecycleFlz return nil } employee.SetFinalizers(finalizers) - return r.Client.Update(ctx, employee) + if multiClusterOptionsImplemented { + if employeeUnderLocal { + err = r.Client.Update(clusterinfo.WithCluster(ctx, localCluster), employee) + } else { + err = r.Client.Update(clusterinfo.WithCluster(ctx, clusterinfo.Fed), employee) + } + } else { + err = r.Client.Update(ctx, employee) + } + return err }) return err } @@ -704,5 +871,8 @@ func (r *Consist) ensureEmployerCleanFlz(ctx context.Context, employer client.Ob } } employer.SetFinalizers(append(employer.GetFinalizers(), cleanFinalizerPrefix+employer.GetName())) + if _, ok := r.adapter.(MultiClusterOptions); ok { + return true, r.Client.Update(clusterinfo.WithCluster(ctx, clusterinfo.Fed), employer) + } return true, r.Client.Update(ctx, employer) } diff --git a/pkg/frame/controller/resourceconsist_controller.go b/pkg/frame/controller/resourceconsist_controller.go index e40fac1..7b4a968 100644 --- a/pkg/frame/controller/resourceconsist_controller.go +++ b/pkg/frame/controller/resourceconsist_controller.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -28,10 +29,15 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + + "kusionstack.io/kube-utils/multicluster" + "kusionstack.io/kube-utils/multicluster/clusterinfo" ) // AddToMgr creates a new Controller of specified reconcileAdapter and adds it to the Manager with default RBAC. @@ -54,49 +60,51 @@ func AddToMgr(mgr manager.Manager, adapter ReconcileAdapter) error { return err } - if watchOptions, ok := adapter.(ReconcileWatchOptions); ok { - // Watch for changes to EmployerResources - err = c.Watch(&source.Kind{ - Type: watchOptions.NewEmployer()}, - watchOptions.EmployerEventHandler(), - watchOptions.EmployerPredicates()) - if err != nil { - return err - } + return watch(c, mgr, adapter) +} - // Watch for changes to EmployeeResources - err = c.Watch(&source.Kind{ - Type: watchOptions.NewEmployee()}, - watchOptions.EmployeeEventHandler(), - watchOptions.EmployeePredicates()) - if err != nil { - return err - } +func watch(c controller.Controller, mgr manager.Manager, adapter ReconcileAdapter) error { + var employer, employee client.Object + var employerEventHandler, employeeEventHandler handler.EventHandler + var employerPredicateFuncs, employeePredicateFuncs predicate.Funcs + var employerSource, employeeSource source.Source - return nil + if watchOptions, ok := adapter.(ReconcileWatchOptions); ok { + employer = watchOptions.NewEmployer() + employee = watchOptions.NewEmployee() + employerEventHandler = watchOptions.EmployerEventHandler() + employerPredicateFuncs = watchOptions.EmployerPredicates() + employeeEventHandler = watchOptions.EmployeeEventHandler() + employeePredicateFuncs = watchOptions.EmployeePredicates() + } else { + employer = &corev1.Service{} + employee = &corev1.Pod{} + employerEventHandler = &EnqueueServiceWithRateLimit{} + employerPredicateFuncs = employerPredicates + employeeEventHandler = &EnqueueServiceByPod{ + c: mgr.GetClient(), + } + employeePredicateFuncs = employeePredicates } - // Watch for changes to EmployerResources - err = c.Watch(&source.Kind{ - Type: &corev1.Service{}}, - &EnqueueServiceWithRateLimit{}, - employerPredicates) - if err != nil { - return err + if multiClusterOptions, ok := adapter.(MultiClusterOptions); ok { + employerSource = multicluster.FedKind(&source.Kind{Type: employer}) + + employeeSource = multicluster.FedKind(&source.Kind{Type: employee}) + if !multiClusterOptions.EmployeeFed() { + employeeSource = multicluster.ClustersKind(&source.Kind{Type: employee}) + } + } else { + employerSource = &source.Kind{Type: employer} + employeeSource = &source.Kind{Type: employee} } - // Watch for changes to EmployeeResources - err = c.Watch(&source.Kind{ - Type: &corev1.Pod{}}, - &EnqueueServiceByPod{ - c: mgr.GetClient(), - }, - employeePredicates) + err := c.Watch(employerSource, employerEventHandler, employerPredicateFuncs) if err != nil { return err } - return nil + return c.Watch(employeeSource, employeeEventHandler, employeePredicateFuncs) } func NewReconcile(mgr manager.Manager, reconcileAdapter ReconcileAdapter) *Consist { @@ -120,15 +128,26 @@ type Consist struct { func (r *Consist) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { var employer client.Object + var err error + if watchOptions, ok := r.adapter.(ReconcileWatchOptions); ok { employer = watchOptions.NewEmployer() } else { employer = &corev1.Service{} } - err := r.Client.Get(ctx, types.NamespacedName{ - Namespace: request.Namespace, - Name: request.Name, - }, employer) + + if _, ok := r.adapter.(MultiClusterOptions); ok { + err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ + Namespace: request.Namespace, + Name: request.Name, + }, employer) + } else { + err = r.Client.Get(ctx, types.NamespacedName{ + Namespace: request.Namespace, + Name: request.Name, + }, employer) + } + if err != nil { if errors.IsNotFound(err) { return reconcile.Result{}, nil diff --git a/pkg/frame/controller/types.go b/pkg/frame/controller/types.go index a566728..1da6452 100644 --- a/pkg/frame/controller/types.go +++ b/pkg/frame/controller/types.go @@ -48,6 +48,16 @@ type ReconcileWatchOptions interface { EmployeePredicates() predicate.Funcs } +// MultiClusterOptions defines whether employee is under fed cluster +// "kusionstack.io/kube-utils/multicluster" is the solution for multi cluster +// if MultiClusterOptions implemented, the cache and client of manager should be generated via "kusionstack.io/kube-utils/multicluster" +type MultiClusterOptions interface { + // Employer should be under fed, otherwise, just forget multi cluster :) + // EmployerFed() bool + + EmployeeFed() bool +} + type ExpectedFinalizerRecordOptions interface { // NeedRecordExpectedFinalizerCondition only needed for those adapters that follow PodOpsLifecycle, // in the case of employment relationship might change(like label/selector changes) and the compensation logic @@ -77,6 +87,9 @@ type ReconcileRequeueOptions interface { type ReconcileAdapter interface { GetControllerName() string + // GetSelectedEmployeeNames returns employees' names selected by employer + // note: in multi cluster case, if adapters deployed in fed and employees are under local, the format of employeeName + // should be "employeeName" + "#" + "clusterName" GetSelectedEmployeeNames(ctx context.Context, employer client.Object) ([]string, error) // GetExpectedEmployer and GetCurrentEmployer return expect/current status of employer from related backend provider @@ -106,6 +119,9 @@ type IEmployer interface { type IEmployee interface { GetEmployeeId() string + // GetEmployeeName returns employee's name + // note: in multi cluster case, if adapters deployed in fed and employees are under local, the format of employeeName + // should be "employeeName" + "#" + "clusterName" GetEmployeeName() string GetEmployeeStatuses() interface{} EmployeeEqual(employee IEmployee) (bool, error) From 1154b20534b91828acb814af12cbaf9577d48a90 Mon Sep 17 00:00:00 2001 From: WeichengWang Date: Wed, 31 Jan 2024 10:25:49 +0800 Subject: [PATCH 2/2] fix lint --- pkg/frame/controller/consister.go | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/pkg/frame/controller/consister.go b/pkg/frame/controller/consister.go index 3cca819..107f48a 100644 --- a/pkg/frame/controller/consister.go +++ b/pkg/frame/controller/consister.go @@ -459,11 +459,12 @@ func (r *Consist) patchAddPodExpectedFinalizer(ctx context.Context, employer cli Namespace: employer.GetNamespace(), Name: employeeName, }, &pod) + } else { + err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: employeeName, + }, &pod) } - err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ - Namespace: employer.GetNamespace(), - Name: employeeName, - }, &pod) } else { err = r.Client.Get(ctx, types.NamespacedName{ Namespace: employer.GetNamespace(), @@ -572,11 +573,12 @@ func (r *Consist) patchDeletePodExpectedFinalizer(ctx context.Context, employer Namespace: employer.GetNamespace(), Name: employeeName, }, &pod) + } else { + err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ + Namespace: employer.GetNamespace(), + Name: employeeName, + }, &pod) } - err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ - Namespace: employer.GetNamespace(), - Name: employeeName, - }, &pod) } else { err = r.Client.Get(ctx, types.NamespacedName{ Namespace: employer.GetNamespace(), @@ -705,11 +707,12 @@ func (r *Consist) ensureLifecycleFinalizer(ctx context.Context, ns, lifecycleFlz Namespace: ns, Name: employeeName, }, employee) + } else { + err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ + Namespace: ns, + Name: employeeName, + }, employee) } - err = r.Client.Get(clusterinfo.WithCluster(ctx, clusterinfo.Fed), types.NamespacedName{ - Namespace: ns, - Name: employeeName, - }, employee) } else { err = r.Client.Get(ctx, types.NamespacedName{ Namespace: ns,