diff --git a/go.mod b/go.mod index fb17897c3..9fb6c4c18 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( k8s.io/utils v0.0.0-20240310230437-4693a0247e57 open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556 open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba - open-cluster-management.io/sdk-go v0.13.1-0.20240520073308-f18d198a844d + open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523 sigs.k8s.io/controller-runtime v0.17.3 sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96 ) diff --git a/go.sum b/go.sum index e26752d4a..33b3364b7 100644 --- a/go.sum +++ b/go.sum @@ -469,8 +469,8 @@ open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556 open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556/go.mod h1:HayKCznnlyW+0dUJQGj5sNR6i3tvylSySD3YnvZkBtY= open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba h1:UsXnD4/N7pxYupPgoLvTq8wO73V72vD2D2ZkDd4iws0= open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba/go.mod h1:yrNuMMpciXjXPnj2yznb6LTyrGliiTrFZAJDp/Ck3c4= -open-cluster-management.io/sdk-go v0.13.1-0.20240520073308-f18d198a844d h1:5lcrL1DsQdNtDQU6U2oXwLAN0EBczcvI421YNgEzL/4= -open-cluster-management.io/sdk-go v0.13.1-0.20240520073308-f18d198a844d/go.mod h1:XBrldz+AqVBy9miOVNIu+6l8JXS18i795XbTqIqURJU= +open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523 h1:XMtsnv0zT8UvXcH4JbqquzhK4BK/XrCg81pCmp50VDs= +open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y= sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk= diff --git a/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_controller.go b/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_controller.go index cb03d8cd0..1b43c73c5 100644 --- a/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_controller.go +++ b/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_controller.go @@ -80,10 +80,18 @@ func NewManifestWorkReplicaSetController( manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer, manifestWorkInformer workinformerv1.ManifestWorkInformer, placementInformer clusterinformerv1beta1.PlacementInformer, - placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) factory.Controller { - + placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer, + cloudEventsUsed bool, +) factory.Controller { controller := newController( - workClient, workApplier, manifestWorkReplicaSetInformer, manifestWorkInformer, placementInformer, placeDecisionInformer) + workClient, + workApplier, + manifestWorkReplicaSetInformer, + manifestWorkInformer, + placementInformer, + placeDecisionInformer, + cloudEventsUsed, + ) err := manifestWorkReplicaSetInformer.Informer().AddIndexers( cache.Indexers{ @@ -114,12 +122,15 @@ func NewManifestWorkReplicaSetController( WithSync(controller.sync).ToController("ManifestWorkReplicaSetController", recorder) } -func newController(workClient workclientset.Interface, +func newController( + workClient workclientset.Interface, workApplier *workapplier.WorkApplier, manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer, manifestWorkInformer workinformerv1.ManifestWorkInformer, placementInformer clusterinformerv1beta1.PlacementInformer, - placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) *ManifestWorkReplicaSetController { + placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer, + cloudEventsUsed bool, +) *ManifestWorkReplicaSetController { return &ManifestWorkReplicaSetController{ workClient: workClient, manifestWorkReplicaSetLister: manifestWorkReplicaSetInformer.Lister(), @@ -139,6 +150,7 @@ func newController(workClient workclientset.Interface, manifestWorkLister: manifestWorkInformer.Lister(), placementLister: placementInformer.Lister(), placeDecisionLister: placeDecisionInformer.Lister(), + cloudEventsUsed: cloudEventsUsed, }, &statusReconciler{manifestWorkLister: manifestWorkInformer.Lister()}, }, diff --git a/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_controllers_test.go b/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_controllers_test.go index 98d3f2de5..a040a4b50 100644 --- a/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_controllers_test.go +++ b/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_controllers_test.go @@ -243,6 +243,7 @@ func TestManifestWorkReplicaSetControllerPatchStatus(t *testing.T) { workInformers.Work().V1().ManifestWorks(), clusterInformers.Cluster().V1beta1().Placements(), clusterInformers.Cluster().V1beta1().PlacementDecisions(), + false, ) controllerContext := testingcommon.NewFakeSyncContext(t, c.mwrSet.Namespace+"/"+c.mwrSet.Name) diff --git a/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_deploy_reconcile.go b/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_deploy_reconcile.go index af0e14e13..6a2775e12 100644 --- a/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_deploy_reconcile.go +++ b/pkg/work/hub/controllers/manifestworkreplicasetcontroller/manifestworkreplicaset_deploy_reconcile.go @@ -29,6 +29,7 @@ type deployReconciler struct { manifestWorkLister worklisterv1.ManifestWorkLister placeDecisionLister clusterlister.PlacementDecisionLister placementLister clusterlister.PlacementLister + cloudEventsUsed bool } func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, @@ -115,6 +116,19 @@ func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha continue } + if d.cloudEventsUsed { + // Add cloudevents annotations to support cloudevents integration. + mw.Annotations = map[string]string{ + // CloudEventsDataTypeAnnotationKey tells the cloudevents work client using ManifestBundle + // as its payload format + common.CloudEventsDataTypeAnnotationKey: payload.ManifestBundleEventDataType.String(), + // CloudEventsResourceVersionAnnotationKey is used by cloudevents work client for tracing the + // ManifestWork specific changes, we use the generation of the ManifestWorkReplicaset to trace + // its ManifestWorks specific changes here. + common.CloudEventsResourceVersionAnnotationKey: fmt.Sprintf("%d", mwrSet.Generation), + } + } + _, err = d.workApplier.Apply(ctx, mw) if err != nil { fmt.Printf("err is %v\n", err) @@ -264,7 +278,11 @@ func getCondition(conditionType string, reason string, message string, status me } } -func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string, placementRefName string) (*workv1.ManifestWork, error) { +func CreateManifestWork( + mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, + clusterNS string, + placementRefName string, +) (*workv1.ManifestWork, error) { if clusterNS == "" { return nil, fmt.Errorf("invalid cluster namespace") } @@ -277,12 +295,9 @@ func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterN ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet), ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName, }, - Annotations: map[string]string{ - common.CloudEventsDataTypeAnnotationKey: payload.ManifestBundleEventDataType.String(), - common.CloudEventsGenerationAnnotationKey: fmt.Sprintf("%d", mwrSet.Generation), - }, }, - Spec: mwrSet.Spec.ManifestWorkTemplate}, nil + Spec: mwrSet.Spec.ManifestWorkTemplate, + }, nil } func getAvailableDecisionGroupProgressMessage(groupNum int, existingClsCount int, totalCls int32) string { diff --git a/pkg/work/hub/manager.go b/pkg/work/hub/manager.go index 511384bab..d53960f9d 100644 --- a/pkg/work/hub/manager.go +++ b/pkg/work/hub/manager.go @@ -6,15 +6,18 @@ import ( "github.com/openshift/library-go/pkg/controller/controllercmd" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/clientcmd" clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned" clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions" workclientset "open-cluster-management.io/api/client/work/clientset/versioned" workinformers "open-cluster-management.io/api/client/work/informers/externalversions" + workv1informer "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" - cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" "open-cluster-management.io/ocm/pkg/work/hub/controllers/manifestworkreplicasetcontroller" ) @@ -48,17 +51,6 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller return err } - // To support sending ManifestWorks to different drivers (like the Kubernetes apiserver or MQTT broker), we build - // ManifestWork client that implements the ManifestWorkInterface and ManifestWork informer based on different - // driver configuration. - // Refer to Event Based Manifestwork proposal in enhancements repo to get more details. - _, config, err := generic.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig). - WithKubeConfig(controllerContext.KubeConfig). - LoadConfig() - if err != nil { - return err - } - // we need a separated filtered manifestwork informers so we only watch the manifestworks that manifestworkreplicaset cares. // This could reduce a lot of memory consumptions workInformOption := workinformers.WithTweakListOptions( @@ -75,44 +67,97 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller }, ) - clientHolder, err := cloudeventswork.NewClientHolderBuilder(config). - WithClientID(c.workOptions.CloudEventsClientID). - WithSourceID(sourceID). - WithInformerConfig(30*time.Minute, workInformOption). - WithCodecs(codec.NewManifestBundleCodec()). - NewSourceClientHolder(ctx) - if err != nil { - return err + cloudEventsUsed := false + + var workClient workclientset.Interface + var watcherStore *store.InformerWatcherStore + + if c.workOptions.WorkDriver == "kube" { + config := controllerContext.KubeConfig + if c.workOptions.WorkDriverConfig != "" { + config, err = clientcmd.BuildConfigFromFlags("", c.workOptions.WorkDriverConfig) + if err != nil { + return err + } + } + + workClient, err = workclientset.NewForConfig(config) + if err != nil { + return err + } + } else { + // For cloudevents drivers, we build ManifestWork client that implements the + // ManifestWorkInterface and ManifestWork informer based on different driver configuration. + // Refer to Event Based Manifestwork proposal in enhancements repo to get more details. + + watcherStore = store.NewInformerWatcherStore(ctx) + + _, config, err := generic.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig). + LoadConfig() + if err != nil { + return err + } + + clientHolder, err := work.NewClientHolderBuilder(config). + WithClientID(c.workOptions.CloudEventsClientID). + WithSourceID(sourceID). + WithCodecs(codec.NewManifestBundleCodec()). + WithWorkClientWatcherStore(watcherStore). + NewSourceClientHolder(ctx) + if err != nil { + return err + } + + workClient = clientHolder.WorkInterface() + cloudEventsUsed = true + } + + factory := workinformers.NewSharedInformerFactoryWithOptions(workClient, 30*time.Minute, workInformOption) + informer := factory.Work().V1().ManifestWorks() + + // For cloudevents work client, we use the informer store as the client store + if cloudEventsUsed { + watcherStore.SetStore(informer.Informer().GetStore()) } - return RunControllerManagerWithInformers(ctx, controllerContext, replicaSetsClient, clientHolder, clusterInformerFactory) + return RunControllerManagerWithInformers( + ctx, + controllerContext, + replicaSetsClient, + workClient, + informer, + clusterInformerFactory, + cloudEventsUsed, + ) } func RunControllerManagerWithInformers( ctx context.Context, controllerContext *controllercmd.ControllerContext, replicaSetClient workclientset.Interface, - hubWorkClientHolder *cloudeventswork.ClientHolder, + workClient workclientset.Interface, + workInformer workv1informer.ManifestWorkInformer, clusterInformers clusterinformers.SharedInformerFactory, + cloudEventsUsed bool, ) error { replicaSetInformerFactory := workinformers.NewSharedInformerFactory(replicaSetClient, 30*time.Minute) - hubWorkInformer := hubWorkClientHolder.ManifestWorkInformer() manifestWorkReplicaSetController := manifestworkreplicasetcontroller.NewManifestWorkReplicaSetController( controllerContext.EventRecorder, replicaSetClient, - workapplier.NewWorkApplierWithTypedClient(hubWorkClientHolder.WorkInterface(), hubWorkInformer.Lister()), + workapplier.NewWorkApplierWithTypedClient(workClient, workInformer.Lister()), replicaSetInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets(), - hubWorkInformer, + workInformer, clusterInformers.Cluster().V1beta1().Placements(), clusterInformers.Cluster().V1beta1().PlacementDecisions(), + cloudEventsUsed, ) go clusterInformers.Start(ctx.Done()) go replicaSetInformerFactory.Start(ctx.Done()) go manifestWorkReplicaSetController.Run(ctx, 5) - go hubWorkInformer.Informer().Run(ctx.Done()) + go workInformer.Informer().Run(ctx.Done()) <-ctx.Done() return nil diff --git a/pkg/work/spoke/options.go b/pkg/work/spoke/options.go index e6452d029..4434ae16e 100644 --- a/pkg/work/spoke/options.go +++ b/pkg/work/spoke/options.go @@ -4,8 +4,6 @@ import ( "time" "github.com/spf13/pflag" - - cloudeventsconstants "open-cluster-management.io/sdk-go/pkg/cloudevents/constants" ) const ( @@ -30,7 +28,7 @@ func NewWorkloadAgentOptions() *WorkloadAgentOptions { MaxJSONRawLength: 1024, StatusSyncInterval: 10 * time.Second, AppliedManifestWorkEvictionGracePeriod: 60 * time.Minute, - WorkloadSourceDriver: cloudeventsconstants.ConfigTypeKube, + WorkloadSourceDriver: "kube", WorkloadSourceConfig: "/spoke/hub-kubeconfig/kubeconfig", } } diff --git a/pkg/work/spoke/spokeagent.go b/pkg/work/spoke/spokeagent.go index 5026124d3..32d522d42 100644 --- a/pkg/work/spoke/spokeagent.go +++ b/pkg/work/spoke/spokeagent.go @@ -10,10 +10,13 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" workclientset "open-cluster-management.io/api/client/work/clientset/versioned" + workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" workinformers "open-cluster-management.io/api/client/work/informers/externalversions" + workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" ocmfeature "open-cluster-management.io/api/feature" workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" @@ -91,22 +94,7 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex return err } - // To support consuming ManifestWorks from different drivers (like the Kubernetes apiserver or MQTT broker), we build - // ManifestWork client that implements the ManifestWorkInterface and ManifestWork informer based on different - // driver configuration. - // Refer to Event Based Manifestwork proposal in enhancements repo to get more details. - hubHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig). - LoadConfig() - if err != nil { - return err - } - - clientHolder, err := cloudeventswork.NewClientHolderBuilder(config). - WithClientID(o.workOptions.CloudEventsClientID). - WithInformerConfig(5*time.Minute, workinformers.WithNamespace(o.agentOptions.SpokeClusterName)). - WithClusterName(o.agentOptions.SpokeClusterName). - WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...). - NewAgentClientHolder(ctx) + hubHost, hubWorkClient, hubWorkInformer, err := o.newHubWorkClientAndInformer(ctx, restMapper) if err != nil { return err } @@ -117,9 +105,6 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex agentID = hubHash } - hubWorkClient := clientHolder.ManifestWorks(o.agentOptions.SpokeClusterName) - hubWorkInformer := clientHolder.ManifestWorkInformer() - // create controllers validator := auth.NewFactory( spokeRestConfig, @@ -223,3 +208,50 @@ func buildCodecs(codecNames []string, restMapper meta.RESTMapper) []generic.Code } return codecs } + +func (o *WorkAgentConfig) newHubWorkClientAndInformer( + ctx context.Context, + restMapper meta.RESTMapper, +) (string, workv1client.ManifestWorkInterface, workv1informers.ManifestWorkInformer, error) { + if o.workOptions.WorkloadSourceDriver == "kube" { + config, err := clientcmd.BuildConfigFromFlags("", o.workOptions.WorkloadSourceConfig) + if err != nil { + return "", nil, nil, err + } + + kubeWorkClientSet, err := workclientset.NewForConfig(config) + if err != nil { + return "", nil, nil, err + } + + factory := workinformers.NewSharedInformerFactoryWithOptions( + kubeWorkClientSet, + 5*time.Minute, + workinformers.WithNamespace(o.agentOptions.SpokeClusterName), + ) + informer := factory.Work().V1().ManifestWorks() + + return config.Host, kubeWorkClientSet.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName), informer, nil + } + + // For cloudevents drivers, we build ManifestWork client that implements the + // ManifestWorkInterface and ManifestWork informer based on different driver configuration. + // Refer to Event Based Manifestwork proposal in enhancements repo to get more details. + hubHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig). + LoadConfig() + if err != nil { + return "", nil, nil, err + } + + clientHolder, informer, err := cloudeventswork.NewClientHolderBuilder(config). + WithClientID(o.workOptions.CloudEventsClientID). + WithInformerConfig(5*time.Minute, workinformers.WithNamespace(o.agentOptions.SpokeClusterName)). + WithClusterName(o.agentOptions.SpokeClusterName). + WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...). + NewAgentClientHolderWithInformer(ctx) + if err != nil { + return "", nil, nil, err + } + + return hubHost, clientHolder.ManifestWorks(o.agentOptions.SpokeClusterName), informer, err +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 189154208..251b56dd8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1589,7 +1589,7 @@ open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/utils/work/v1/workvalidator open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v0.13.1-0.20240520073308-f18d198a844d +# open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523 ## explicit; go 1.21 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 @@ -1617,7 +1617,8 @@ open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec -open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/handler +open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister +open-cluster-management.io/sdk-go/pkg/cloudevents/work/store open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher open-cluster-management.io/sdk-go/pkg/patcher diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/constants/constants.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/constants/constants.go index 6f2708557..28c0a6a6c 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/constants/constants.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/constants/constants.go @@ -1,7 +1,6 @@ package constants const ( - ConfigTypeKube = "kube" ConfigTypeMQTT = "mqtt" ConfigTypeGRPC = "grpc" ConfigTypeKafka = "kafka" diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/optionsbuilder.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/optionsbuilder.go index 9ba94126a..b839c86af 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/optionsbuilder.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/optionsbuilder.go @@ -3,9 +3,6 @@ package generic import ( "fmt" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - "open-cluster-management.io/sdk-go/pkg/cloudevents/constants" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" @@ -17,14 +14,11 @@ import ( type ConfigLoader struct { configType string configPath string - - kubeConfig *rest.Config } // NewConfigLoader returns a ConfigLoader with the given configuration type and configuration file path. // // Available configuration types: -// - kube // - mqtt // - grpc // - kafka @@ -35,31 +29,9 @@ func NewConfigLoader(configType, configPath string) *ConfigLoader { } } -// WithKubeConfig sets a kube config, this config will be used when configuration type is kube and the kube -// configuration file path not set. -func (l *ConfigLoader) WithKubeConfig(kubeConfig *rest.Config) *ConfigLoader { - l.kubeConfig = kubeConfig - return l -} - // TODO using a specified config instead of any func (l *ConfigLoader) LoadConfig() (string, any, error) { switch l.configType { - case constants.ConfigTypeKube: - if l.configPath == "" { - if l.kubeConfig == nil { - return "", nil, fmt.Errorf("neither the kube config path nor kube config object was specified") - } - - return l.kubeConfig.Host, l.kubeConfig, nil - } - - kubeConfig, err := clientcmd.BuildConfigFromFlags("", l.configPath) - if err != nil { - return "", nil, err - } - - return kubeConfig.Host, kubeConfig, nil case constants.ConfigTypeMQTT: mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(l.configPath) if err != nil { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go index 1dc5bc191..b27c1adf7 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go @@ -176,31 +176,14 @@ func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents return } - clusterName, err := evt.Context.GetExtension(types.ExtensionClusterName) - if err != nil { - klog.Errorf("failed to find cluster name, %v", err) - return - } - obj, err := codec.Decode(&evt) if err != nil { klog.Errorf("failed to decode status, %v", err) return } - action, err := c.statusAction(fmt.Sprintf("%s", clusterName), obj) - if err != nil { - klog.Errorf("failed to generate status event %s, %v", evt, err) - return - } - - if len(action) == 0 { - // no action is required, ignore - return - } - for _, handler := range handlers { - if err := handler(action, obj); err != nil { + if err := handler(types.StatusModified, obj); err != nil { klog.Errorf("failed to handle status event %s, %v", evt, err) } } @@ -238,14 +221,32 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest( return err } + // TODO we cannot list objs now, the lister may be not ready, we may need to add HasSynced + // for the lister + if len(objs) == 0 { + klog.V(4).Infof("there are is no objs from the list, do nothing") + return nil + } + for _, obj := range objs { + // respond with the deleting resource regardless of the resource version + if !obj.GetDeletionTimestamp().IsZero() { + if err := c.Publish(ctx, eventType, obj); err != nil { + return err + } + continue + } + lastResourceVersion := findResourceVersion(string(obj.GetUID()), resourceVersions.Versions) currentResourceVersion, err := strconv.ParseInt(obj.GetResourceVersion(), 10, 64) if err != nil { + klog.V(4).Infof("ignore the obj %v since it has a invalid resourceVersion, %v", obj, err) continue } - if currentResourceVersion > lastResourceVersion { + // the version of the work is not maintained on source or the source's work is newer than agent, send + // the newer work to agent + if currentResourceVersion == 0 || currentResourceVersion > lastResourceVersion { if err := c.Publish(ctx, eventType, obj); err != nil { return err } @@ -274,36 +275,6 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest( return nil } -func (c *CloudEventSourceClient[T]) statusAction(clusterName string, obj T) (evt types.ResourceAction, err error) { - objs, err := c.lister.List(types.ListOptions{ClusterName: clusterName, Source: c.sourceID}) - if err != nil { - return evt, err - } - - lastObj, exists := getObj(string(obj.GetUID()), objs) - if !exists { - return evt, nil - } - - lastStatusHash, err := c.statusHashGetter(lastObj) - if err != nil { - klog.Warningf("failed to hash object %s status, %v", lastObj.GetUID(), err) - return evt, err - } - - currentStatusHash, err := c.statusHashGetter(obj) - if err != nil { - klog.Warningf("failed to hash object %s status, %v", obj.GetUID(), err) - return evt, nil - } - - if lastStatusHash == currentStatusHash { - return evt, nil - } - - return types.StatusModified, nil -} - func findResourceVersion(id string, versions []payload.ResourceVersion) int64 { for _, version := range versions { if id == version.ResourceID { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifest.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifest.go index bb0f4f9ad..5ffc5726a 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifest.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifest.go @@ -65,8 +65,12 @@ func (c *ManifestCodec) Encode(source string, eventType types.CloudEventsType, w return nil, fmt.Errorf("failed to find originalsource from the work %s", work.UID) } - if len(work.Spec.Workload.Manifests) != 1 { - return nil, fmt.Errorf("too many manifests in the work %s", work.UID) + // for the manifest deletion case: no manifest in the spec will be rebuilt in the cache upon agent restart. + // for status update cases other than manifest deletion case, there should be only one manifest in the work. + if !meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) { + if len(work.Spec.Workload.Manifests) != 1 { + return nil, fmt.Errorf("too many manifests in the work %s", work.UID) + } } evt := types.NewEventBuilder(source, eventType). @@ -142,6 +146,9 @@ func (c *ManifestCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWork, er return nil, fmt.Errorf("failed to get deletiontimestamp, %v", err) } + // In the case of an agent restart, the manifestwork finalizer is cleared. + // Explicitly re-add the finalizer to ensure proper cleanup of the manifestwork. + work.Finalizers = []string{workv1.ManifestWorkFinalizer} work.DeletionTimestamp = &metav1.Time{Time: deletionTimestamp} return work, nil } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifestbundle.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifestbundle.go index fc17ffaa4..ec2a0cd1f 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifestbundle.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifestbundle.go @@ -115,6 +115,9 @@ func (c *ManifestBundleCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWo return nil, fmt.Errorf("failed to get deletiontimestamp, %v", err) } + // In the case of an agent restart, the manifestwork finalizer is cleared. + // Explicitly re-add the finalizer to ensure proper cleanup of the manifestwork. + work.Finalizers = []string{workv1.ManifestWorkFinalizer} work.DeletionTimestamp = &metav1.Time{Time: deletionTimestamp} return work, nil } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go index e0946b158..7497ae5f9 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go @@ -5,34 +5,28 @@ import ( "fmt" "time" - "k8s.io/client-go/rest" "k8s.io/klog/v2" workclientset "open-cluster-management.io/api/client/work/clientset/versioned" workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" workinformers "open-cluster-management.io/api/client/work/informers/externalversions" - workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client" - agenthandler "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal" sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client" - sourcehandler "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/handler" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" ) const defaultInformerResyncTime = 10 * time.Minute // ClientHolder holds a manifestwork client that implements the ManifestWorkInterface based on different configuration -// and a ManifestWorkInformer that is built with the manifestWork client. // // ClientHolder also implements the ManifestWorksGetter interface. type ClientHolder struct { - workClientSet workclientset.Interface - manifestWorkInformer workv1informers.ManifestWorkInformer + workClientSet workclientset.Interface } var _ workv1client.ManifestWorksGetter = &ClientHolder{} @@ -47,26 +41,22 @@ func (h *ClientHolder) ManifestWorks(namespace string) workv1client.ManifestWork return h.workClientSet.WorkV1().ManifestWorks(namespace) } -// ManifestWorkInformer returns a ManifestWorkInformer -func (h *ClientHolder) ManifestWorkInformer() workv1informers.ManifestWorkInformer { - return h.manifestWorkInformer -} - // ClientHolderBuilder builds the ClientHolder with different configuration. type ClientHolderBuilder struct { config any + watcherStore store.WorkClientWatcherStore codecs []generic.Codec[*workv1.ManifestWork] informerOptions []workinformers.SharedInformerOption informerResyncTime time.Duration sourceID string clusterName string clientID string + resync bool } // NewClientHolderBuilder returns a ClientHolderBuilder with a given configuration. // // Available configurations: -// - Kubeconfig (*rest.Config): builds a manifestwork client with kubeconfig // - MQTTOptions (*mqtt.MQTTOptions): builds a manifestwork client based on cloudevents with MQTT // - GRPCOptions (*grpc.GRPCOptions): builds a manifestwork client based on cloudevents with GRPC // - KafkaOptions (*kafka.KafkaOptions): builds a manifestwork client based on cloudevents with Kafka @@ -76,6 +66,7 @@ func NewClientHolderBuilder(config any) *ClientHolderBuilder { return &ClientHolderBuilder{ config: config, informerResyncTime: defaultInformerResyncTime, + resync: true, } } @@ -113,108 +104,44 @@ func (b *ClientHolderBuilder) WithInformerConfig( return b } -// NewSourceClientHolder returns a ClientHolder for source -func (b *ClientHolderBuilder) NewSourceClientHolder(ctx context.Context) (*ClientHolder, error) { - switch config := b.config.(type) { - case *rest.Config: - return b.newKubeClients(config) - default: - options, err := generic.BuildCloudEventsSourceOptions(config, b.clientID, b.sourceID) - if err != nil { - return nil, err - } - return b.newSourceClients(ctx, options) - } +// WithWorkClientWatcherStore set the WorkClientWatcherStore. The client will use this store to caches the works and +// watch the work events. +func (b *ClientHolderBuilder) WithWorkClientWatcherStore(store store.WorkClientWatcherStore) *ClientHolderBuilder { + b.watcherStore = store + return b } -// NewAgentClientHolder returns a ClientHolder for agent -func (b *ClientHolderBuilder) NewAgentClientHolder(ctx context.Context) (*ClientHolder, error) { - switch config := b.config.(type) { - case *rest.Config: - return b.newKubeClients(config) - default: - options, err := generic.BuildCloudEventsAgentOptions(config, b.clusterName, b.clientID) - if err != nil { - return nil, err - } - return b.newAgentClients(ctx, options) - } +// WithResyncEnabled control the client resync (Default is true), if it's true, the resync happens when +// 1. after the client's store is initiated +// 2. the client reconnected +func (b *ClientHolderBuilder) WithResyncEnabled(resync bool) *ClientHolderBuilder { + b.resync = resync + return b } -func (b *ClientHolderBuilder) newAgentClients(ctx context.Context, agentOptions *options.CloudEventsAgentOptions) (*ClientHolder, error) { +// NewSourceClientHolder returns a ClientHolder for a source +func (b *ClientHolderBuilder) NewSourceClientHolder(ctx context.Context) (*ClientHolder, error) { if len(b.clientID) == 0 { return nil, fmt.Errorf("client id is required") } - if len(b.clusterName) == 0 { - return nil, fmt.Errorf("cluster name is required") + if len(b.sourceID) == 0 { + return nil, fmt.Errorf("source id is required") } - workLister := &ManifestWorkLister{} - watcher := watcher.NewManifestWorkWatcher() - cloudEventsClient, err := generic.NewCloudEventAgentClient[*workv1.ManifestWork]( - ctx, - agentOptions, - workLister, - ManifestWorkStatusHash, - b.codecs..., - ) - if err != nil { - return nil, err + if b.watcherStore == nil { + return nil, fmt.Errorf("a watcher store is required") } - manifestWorkClient := agentclient.NewManifestWorkAgentClient(cloudEventsClient, watcher) - workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} - workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} - factory := workinformers.NewSharedInformerFactoryWithOptions(workClientSet, b.informerResyncTime, b.informerOptions...) - informers := factory.Work().V1().ManifestWorks() - manifestWorkLister := informers.Lister() - namespacedLister := manifestWorkLister.ManifestWorks(b.clusterName) - - // Set informer lister back to work lister and client. - workLister.Lister = manifestWorkLister - // TODO the work client and informer share a same store in the current implementation, ideally, the store should be - // only written from the server. we may need to revisit the implementation in the future. - manifestWorkClient.SetLister(namespacedLister) - - cloudEventsClient.Subscribe(ctx, agenthandler.NewManifestWorkAgentHandler(namespacedLister, watcher)) - - go func() { - for { - select { - case <-ctx.Done(): - return - case <-cloudEventsClient.ReconnectedChan(): - // when receiving a client reconnected signal, we resync all sources for this agent - // TODO after supporting multiple sources, we should only resync agent known sources - if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { - klog.Errorf("failed to send resync request, %v", err) - } - } - } - }() - - return &ClientHolder{ - workClientSet: workClientSet, - manifestWorkInformer: informers, - }, nil -} - -func (b *ClientHolderBuilder) newSourceClients(ctx context.Context, sourceOptions *options.CloudEventsSourceOptions) (*ClientHolder, error) { - if len(b.clientID) == 0 { - return nil, fmt.Errorf("client id is required") - } - - if len(b.sourceID) == 0 { - return nil, fmt.Errorf("source id is required") + options, err := generic.BuildCloudEventsSourceOptions(b.config, b.clientID, b.sourceID) + if err != nil { + return nil, err } - workLister := &ManifestWorkLister{} - watcher := watcher.NewManifestWorkWatcher() cloudEventsClient, err := generic.NewCloudEventSourceClient[*workv1.ManifestWork]( ctx, - sourceOptions, - workLister, + options, + lister.NewWatcherStoreLister(b.watcherStore), ManifestWorkStatusHash, b.codecs..., ) @@ -222,20 +149,18 @@ func (b *ClientHolderBuilder) newSourceClients(ctx context.Context, sourceOption return nil, err } - manifestWorkClient := sourceclient.NewManifestWorkSourceClient(b.sourceID, cloudEventsClient, watcher) + // start to subscribe + cloudEventsClient.Subscribe(ctx, b.watcherStore.HandleReceivedWork) + + manifestWorkClient := sourceclient.NewManifestWorkSourceClient(b.sourceID, cloudEventsClient, b.watcherStore) workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} - factory := workinformers.NewSharedInformerFactoryWithOptions(workClientSet, b.informerResyncTime, b.informerOptions...) - informers := factory.Work().V1().ManifestWorks() - manifestWorkLister := informers.Lister() - // Set informer lister back to work lister and client. - workLister.Lister = manifestWorkLister - manifestWorkClient.SetLister(manifestWorkLister) - sourceHandler := sourcehandler.NewManifestWorkSourceHandler(manifestWorkLister, watcher) - cloudEventsClient.Subscribe(ctx, sourceHandler.HandlerFunc()) + if !b.resync { + return &ClientHolder{workClientSet: workClientSet}, nil + } - go sourceHandler.Run(ctx.Done()) + // start a go routine to resync the works when this client reconnected go func() { for { select { @@ -250,21 +175,14 @@ func (b *ClientHolderBuilder) newSourceClients(ctx context.Context, sourceOption } }() - return &ClientHolder{ - workClientSet: workClientSet, - manifestWorkInformer: informers, - }, nil -} - -func (b *ClientHolderBuilder) newKubeClients(config *rest.Config) (*ClientHolder, error) { - kubeWorkClientSet, err := workclientset.NewForConfig(config) - if err != nil { - return nil, err - } + // start a go routine to resync the works after this client's store is initiated + go func() { + if store.WaitForStoreInit(ctx, b.watcherStore.HasInitiated) { + if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil { + klog.Errorf("failed to resync") + } + } + }() - factory := workinformers.NewSharedInformerFactoryWithOptions(kubeWorkClientSet, b.informerResyncTime, b.informerOptions...) - return &ClientHolder{ - workClientSet: kubeWorkClientSet, - manifestWorkInformer: factory.Work().V1().ManifestWorks(), - }, nil + return &ClientHolder{workClientSet: workClientSet}, nil } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientinformerbuilder.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientinformerbuilder.go new file mode 100644 index 000000000..c7f443d2d --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientinformerbuilder.go @@ -0,0 +1,107 @@ +package work + +import ( + "context" + "fmt" + + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + + workclientset "open-cluster-management.io/api/client/work/clientset/versioned" + workinformers "open-cluster-management.io/api/client/work/informers/externalversions" + workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" + workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" + agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client" + agenthandler "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher" +) + +// NewAgentClientHolderWithInformer returns a ClientHolder with a ManifestWorkInformer for an agent. +// This function is suitable for the scenarios where a ManifestWorkInformer is required +// +// Note: the ClientHolder should be used after the ManifestWorkInformer starts +// +// TODO enhance the manifestwork agent client with WorkClientWatcherCache +func (b *ClientHolderBuilder) NewAgentClientHolderWithInformer( + ctx context.Context) (*ClientHolder, workv1informers.ManifestWorkInformer, error) { + switch config := b.config.(type) { + case *rest.Config: + kubeWorkClientSet, err := workclientset.NewForConfig(config) + if err != nil { + return nil, nil, err + } + + factory := workinformers.NewSharedInformerFactoryWithOptions(kubeWorkClientSet, b.informerResyncTime, b.informerOptions...) + return &ClientHolder{workClientSet: kubeWorkClientSet}, factory.Work().V1().ManifestWorks(), nil + default: + options, err := generic.BuildCloudEventsAgentOptions(config, b.clusterName, b.clientID) + if err != nil { + return nil, nil, err + } + return b.newAgentClients(ctx, options) + } +} + +func (b *ClientHolderBuilder) newAgentClients( + ctx context.Context, + agentOptions *options.CloudEventsAgentOptions, +) (*ClientHolder, workv1informers.ManifestWorkInformer, error) { + if len(b.clientID) == 0 { + return nil, nil, fmt.Errorf("client id is required") + } + + if len(b.clusterName) == 0 { + return nil, nil, fmt.Errorf("cluster name is required") + } + + workLister := &ManifestWorkLister{} + watcher := watcher.NewManifestWorkWatcher() + cloudEventsClient, err := generic.NewCloudEventAgentClient[*workv1.ManifestWork]( + ctx, + agentOptions, + workLister, + ManifestWorkStatusHash, + b.codecs..., + ) + if err != nil { + return nil, nil, err + } + + manifestWorkClient := agentclient.NewManifestWorkAgentClient(cloudEventsClient, watcher) + workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} + workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} + factory := workinformers.NewSharedInformerFactoryWithOptions(workClientSet, b.informerResyncTime, b.informerOptions...) + informers := factory.Work().V1().ManifestWorks() + manifestWorkLister := informers.Lister() + namespacedLister := manifestWorkLister.ManifestWorks(b.clusterName) + + // Set informer lister back to work lister and client. + workLister.Lister = manifestWorkLister + // TODO the work client and informer share a same store in the current implementation, ideally, the store should be + // only written from the server. we may need to revisit the implementation in the future. + manifestWorkClient.SetLister(namespacedLister) + + cloudEventsClient.Subscribe(ctx, agenthandler.NewManifestWorkAgentHandler(namespacedLister, watcher)) + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-cloudEventsClient.ReconnectedChan(): + // when receiving a client reconnected signal, we resync all sources for this agent + // TODO after supporting multiple sources, we should only resync agent known sources + if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { + klog.Errorf("failed to send resync request, %v", err) + } + } + } + }() + + return &ClientHolder{workClientSet: workClientSet}, informers, nil +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/common/common.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/common/common.go index 53134eb89..3c06d9b97 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/common/common.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/common/common.go @@ -10,8 +10,11 @@ const ( // CloudEventsDataTypeAnnotationKey is the key of the cloudevents data type annotation. CloudEventsDataTypeAnnotationKey = "cloudevents.open-cluster-management.io/datatype" - // CloudEventsGenerationAnnotationKey is the key of the manifestwork generation annotation. - CloudEventsGenerationAnnotationKey = "cloudevents.open-cluster-management.io/generation" + // CloudEventsResourceVersionAnnotationKey is the key of the manifestwork resourceversion annotation. + // + // This annotation is used for tracing the ManifestWork specific changes, the value of this annotation + // should be a sequence number representing the ManifestWork specific generation. + CloudEventsResourceVersionAnnotationKey = "cloudevents.open-cluster-management.io/resourceversion" ) // CloudEventsOriginalSourceLabelKey is the key of the cloudevents original source label. @@ -26,4 +29,5 @@ const ( DeleteRequestAction = "delete_request" ) +var ManifestWorkGK = schema.GroupKind{Group: workv1.GroupName, Kind: "ManifestWork"} var ManifestWorkGR = schema.GroupResource{Group: workv1.GroupName, Resource: "manifestworks"} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/mainfiest.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifest.go similarity index 100% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/mainfiest.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifest.go diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go index ed6eb8a46..26bf55e32 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go @@ -3,56 +3,63 @@ package client import ( "context" "fmt" - "strconv" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubetypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" - workv1lister "open-cluster-management.io/api/client/work/listers/work/v1" workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher" ) // ManifestWorkSourceClient implements the ManifestWorkInterface. type ManifestWorkSourceClient struct { cloudEventsClient *generic.CloudEventSourceClient[*workv1.ManifestWork] - watcher *watcher.ManifestWorkWatcher - lister workv1lister.ManifestWorkLister + watcherStore store.WorkClientWatcherStore namespace string sourceID string } var _ workv1client.ManifestWorkInterface = &ManifestWorkSourceClient{} -func NewManifestWorkSourceClient(sourceID string, +func NewManifestWorkSourceClient( + sourceID string, cloudEventsClient *generic.CloudEventSourceClient[*workv1.ManifestWork], - watcher *watcher.ManifestWorkWatcher) *ManifestWorkSourceClient { + watcherStore store.WorkClientWatcherStore, +) *ManifestWorkSourceClient { return &ManifestWorkSourceClient{ cloudEventsClient: cloudEventsClient, - watcher: watcher, + watcherStore: watcherStore, sourceID: sourceID, } } -func (c *ManifestWorkSourceClient) SetLister(lister workv1lister.ManifestWorkLister) { - c.lister = lister -} - -func (mw *ManifestWorkSourceClient) SetNamespace(namespace string) { - mw.namespace = namespace +func (c *ManifestWorkSourceClient) SetNamespace(namespace string) { + c.namespace = namespace } func (c *ManifestWorkSourceClient) Create(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.CreateOptions) (*workv1.ManifestWork, error) { - _, err := c.lister.ManifestWorks(c.namespace).Get(manifestWork.Name) + if manifestWork.Namespace != "" && manifestWork.Namespace != c.namespace { + return nil, errors.NewInvalid(common.ManifestWorkGK, "namespace", field.ErrorList{ + field.Invalid( + field.NewPath("metadata").Child("namespace"), + manifestWork.Namespace, + fmt.Sprintf("does not match the namespace %s", c.namespace), + ), + }) + } + + _, err := c.watcherStore.Get(c.namespace, manifestWork.Name) if err == nil { return nil, errors.NewAlreadyExists(common.ManifestWorkGR, manifestWork.Name) } @@ -69,21 +76,23 @@ func (c *ManifestWorkSourceClient) Create(ctx context.Context, manifestWork *wor Action: common.CreateRequestAction, } - generation, err := getWorkGeneration(manifestWork) - if err != nil { + newWork := manifestWork.DeepCopy() + newWork.UID = kubetypes.UID(utils.UID(c.sourceID, c.namespace, newWork.Name)) + newWork.Namespace = c.namespace + newWork.ResourceVersion = getWorkResourceVersion(manifestWork) + + if err := utils.Validate(newWork); err != nil { return nil, err } - newWork := manifestWork.DeepCopy() - newWork.UID = kubetypes.UID(utils.UID(c.sourceID, c.namespace, newWork.Name)) - newWork.Generation = generation - ensureSourceLabel(c.sourceID, newWork) if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { return nil, err } - // add the new work to the ManifestWorkInformer local cache. - c.watcher.Receive(watch.Event{Type: watch.Added, Object: newWork}) + // add the new work to the local cache. + if err := c.watcherStore.Add(newWork); err != nil { + return nil, err + } return newWork.DeepCopy(), nil } @@ -96,7 +105,7 @@ func (c *ManifestWorkSourceClient) UpdateStatus(ctx context.Context, manifestWor } func (c *ManifestWorkSourceClient) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { - work, err := c.lister.ManifestWorks(c.namespace).Get(name) + work, err := c.watcherStore.Get(c.namespace, name) if errors.IsNotFound(err) { return nil } @@ -120,9 +129,16 @@ func (c *ManifestWorkSourceClient) Delete(ctx context.Context, name string, opts return err } - // update the deleting work in the ManifestWorkInformer local cache. - c.watcher.Receive(watch.Event{Type: watch.Modified, Object: deletingWork}) - return nil + if len(work.Finalizers) == 0 { + // the work has no any finalizers, there are two cases in this scenario + // 1) the agent does not start yet, we delete this work from the local cache directly. + // 2) the agent is running, but the status response does not be handled by source yet, + // after the deleted status is back, we need ignore this work in the ManifestWorkSourceHandler. + return c.watcherStore.Delete(deletingWork) + } + + // update the work with deletion timestamp in the local cache. + return c.watcherStore.Update(deletingWork) } func (c *ManifestWorkSourceClient) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error { @@ -131,21 +147,26 @@ func (c *ManifestWorkSourceClient) DeleteCollection(ctx context.Context, opts me func (c *ManifestWorkSourceClient) Get(ctx context.Context, name string, opts metav1.GetOptions) (*workv1.ManifestWork, error) { klog.V(4).Infof("getting manifestwork %s", name) - return c.lister.ManifestWorks(c.namespace).Get(name) + return c.watcherStore.Get(c.namespace, name) } func (c *ManifestWorkSourceClient) List(ctx context.Context, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) { klog.V(4).Infof("list manifestworks") - // send resync request to fetch manifestwork status from agents when the ManifestWorkInformer starts - if err := c.cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil { + works, err := c.watcherStore.List(opts) + if err != nil { return nil, err } - return &workv1.ManifestWorkList{}, nil + items := []workv1.ManifestWork{} + for _, work := range works { + items = append(items, *work) + } + + return &workv1.ManifestWorkList{Items: items}, nil } func (c *ManifestWorkSourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return c.watcher, nil + return c.watcherStore, nil } func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt kubetypes.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *workv1.ManifestWork, err error) { @@ -155,7 +176,7 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku return nil, fmt.Errorf("unsupported to update subresources %v", subresources) } - lastWork, err := c.lister.ManifestWorks(c.namespace).Get(name) + lastWork, err := c.watcherStore.Get(c.namespace, name) if err != nil { return nil, err } @@ -165,11 +186,6 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku return nil, err } - generation, err := getWorkGeneration(patchedWork) - if err != nil { - return nil, err - } - // TODO if we support multiple data type in future, we may need to get the data type from // the cloudevents data type annotation eventType := types.CloudEventsType{ @@ -179,38 +195,32 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku } newWork := patchedWork.DeepCopy() - newWork.Generation = generation - if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { + newWork.ResourceVersion = getWorkResourceVersion(patchedWork) + + if err := utils.Validate(newWork); err != nil { return nil, err } - // refresh the work in the ManifestWorkInformer local cache with patched work. - c.watcher.Receive(watch.Event{Type: watch.Modified, Object: newWork}) - return newWork.DeepCopy(), nil -} - -// getWorkGeneration retrieves the work generation from the annotation with the key -// "cloudevents.open-cluster-management.io/generation". -// if no generation is set in the annotation, then 0 is returned, which means the message -// broker guarantees the message order. -func getWorkGeneration(work *workv1.ManifestWork) (int64, error) { - generation, ok := work.Annotations[common.CloudEventsGenerationAnnotationKey] - if !ok { - return 0, nil + if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { + return nil, err } - generationInt, err := strconv.Atoi(generation) - if err != nil { - return 0, fmt.Errorf("failed to convert generation %s to int: %v", generation, err) + // modify the updated work in the local cache. + if err := c.watcherStore.Update(newWork); err != nil { + return nil, err } - - return int64(generationInt), nil + return newWork.DeepCopy(), nil } -func ensureSourceLabel(sourceID string, work *workv1.ManifestWork) { - if work.Labels == nil { - work.Labels = map[string]string{} +// getWorkResourceVersion retrieves the work generation from the annotation with the key +// "cloudevents.open-cluster-management.io/resourceversion". +// If no value is set in this annotation, then "0" is returned, which means the version of the work will not be +// maintained on source, the message broker guarantees the work update order. +func getWorkResourceVersion(work *workv1.ManifestWork) string { + resourceVersion, ok := work.Annotations[common.CloudEventsResourceVersionAnnotationKey] + if !ok { + return "0" } - work.Labels[common.CloudEventsOriginalSourceLabelKey] = sourceID + return resourceVersion } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec/manifestbundle.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec/manifestbundle.go index f19e70b11..f1e5a6510 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec/manifestbundle.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec/manifestbundle.go @@ -2,6 +2,7 @@ package codec import ( "fmt" + "strconv" cloudevents "github.com/cloudevents/sdk-go/v2" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" @@ -32,10 +33,15 @@ func (c *ManifestBundleCodec) Encode(source string, eventType types.CloudEventsT return nil, fmt.Errorf("unsupported cloudevents data type %s", eventType.CloudEventsDataType) } + resourceVersion, err := strconv.Atoi(work.ResourceVersion) + if err != nil { + return nil, fmt.Errorf("failed to convert resource version %s to int: %v", work.ResourceVersion, err) + } + evt := types.NewEventBuilder(source, eventType). WithClusterName(work.Namespace). WithResourceID(string(work.UID)). - WithResourceVersion(work.Generation). + WithResourceVersion(int64(resourceVersion)). NewEvent() if !work.DeletionTimestamp.IsZero() { evt.SetExtension(types.ExtensionDeletionTimestamp, work.DeletionTimestamp.Time) diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/handler/resourcehandler.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/handler/resourcehandler.go deleted file mode 100644 index 0ad31f15d..000000000 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/handler/resourcehandler.go +++ /dev/null @@ -1,167 +0,0 @@ -package handler - -import ( - "fmt" - "strconv" - "time" - - "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/labels" - kubetypes "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/util/workqueue" - "k8s.io/klog/v2" - workv1lister "open-cluster-management.io/api/client/work/listers/work/v1" - - workv1 "open-cluster-management.io/api/work/v1" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher" -) - -const ManifestWorkFinalizer = "cloudevents.open-cluster-management.io/manifest-work-cleanup" - -type ManifestWorkSourceHandler struct { - works workqueue.RateLimitingInterface - lister workv1lister.ManifestWorkLister - watcher *watcher.ManifestWorkWatcher -} - -// NewManifestWorkSourceHandler returns a ResourceHandler for a ManifestWork source client. It sends the kube events -// with ManifestWorWatcher after CloudEventSourceClient received the ManifestWork status from agent, then the -// ManifestWorkInformer handles the kube events in its local cache. -func NewManifestWorkSourceHandler(lister workv1lister.ManifestWorkLister, watcher *watcher.ManifestWorkWatcher) *ManifestWorkSourceHandler { - return &ManifestWorkSourceHandler{ - works: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "manifestwork-source-handler"), - lister: lister, - watcher: watcher, - } -} - -func (h *ManifestWorkSourceHandler) Run(stopCh <-chan struct{}) { - defer h.works.ShutDown() - - // start a goroutine to handle the works from the queue - // the .Until will re-kick the runWorker one second after the runWorker completes - go wait.Until(h.runWorker, time.Second, stopCh) - - // wait until we're told to stop - <-stopCh -} - -func (h *ManifestWorkSourceHandler) HandlerFunc() generic.ResourceHandler[*workv1.ManifestWork] { - return func(action types.ResourceAction, obj *workv1.ManifestWork) error { - switch action { - case types.StatusModified: - h.works.Add(obj) - default: - return fmt.Errorf("unsupported resource action %s", action) - } - return nil - } -} - -func (h *ManifestWorkSourceHandler) runWorker() { - // hot loop until we're told to stop. processNextEvent will automatically wait until there's work available, so - // we don't worry about secondary waits - for h.processNextWork() { - } -} - -// processNextWork deals with one key off the queue. -func (h *ManifestWorkSourceHandler) processNextWork() bool { - // pull the next event item from queue. - // events queue blocks until it can return an item to be processed - key, quit := h.works.Get() - if quit { - // the current queue is shutdown and becomes empty, quit this process - return false - } - defer h.works.Done(key) - - if err := h.handleWork(key.(*workv1.ManifestWork)); err != nil { - // we failed to handle the work, we should requeue the item to work on later - // this method will add a backoff to avoid hotlooping on particular items - h.works.AddRateLimited(key) - return true - } - - // we handle the event successfully, tell the queue to stop tracking history for this event - h.works.Forget(key) - return true -} - -func (h *ManifestWorkSourceHandler) handleWork(work *workv1.ManifestWork) error { - lastWork := h.getWorkByUID(work.UID) - if lastWork == nil { - // the work is not found, this may be the client is restarted and the local cache is not ready, requeue this - // work - return errors.NewNotFound(common.ManifestWorkGR, string(work.UID)) - } - - updatedWork := lastWork.DeepCopy() - if meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) { - updatedWork.Finalizers = []string{} - h.watcher.Receive(watch.Event{Type: watch.Deleted, Object: updatedWork}) - return nil - } - - resourceVersion, err := strconv.Atoi(work.ResourceVersion) - if err != nil { - klog.Errorf("invalid resource version for work %s/%s, %v", lastWork.Namespace, lastWork.Name, err) - return nil - } - - if int64(resourceVersion) > lastWork.Generation { - klog.Warningf("the work %s/%s resource version %d is great than its generation %d, ignore", - lastWork.Namespace, lastWork.Name, resourceVersion, work.Generation) - return nil - } - - // no status change - if equality.Semantic.DeepEqual(lastWork.Status, work.Status) { - return nil - } - - // the work has been handled by agent, we ensure a finalizer on the work - updatedWork.Finalizers = ensureFinalizers(updatedWork.Finalizers) - updatedWork.Status = work.Status - h.watcher.Receive(watch.Event{Type: watch.Modified, Object: updatedWork}) - return nil -} - -func (h *ManifestWorkSourceHandler) getWorkByUID(uid kubetypes.UID) *workv1.ManifestWork { - works, err := h.lister.List(labels.Everything()) - if err != nil { - klog.Errorf("failed to lists works, %v", err) - return nil - } - - for _, work := range works { - if work.UID == uid { - return work - } - } - - return nil -} - -func ensureFinalizers(workFinalizers []string) []string { - has := false - for _, f := range workFinalizers { - if f == ManifestWorkFinalizer { - has = true - break - } - } - - if !has { - workFinalizers = append(workFinalizers, ManifestWorkFinalizer) - } - - return workFinalizers -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister/lister.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister/lister.go new file mode 100644 index 000000000..f939beac7 --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister/lister.go @@ -0,0 +1,32 @@ +package lister + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" +) + +// WatcherStoreLister list the ManifestWorks from the WorkClientWatcherStore. +type WatcherStoreLister struct { + store store.WorkClientWatcherStore +} + +func NewWatcherStoreLister(store store.WorkClientWatcherStore) *WatcherStoreLister { + return &WatcherStoreLister{ + store: store, + } +} + +// List returns the ManifestWorks from the WorkClientWatcherCache with list options. +func (l *WatcherStoreLister) List(options types.ListOptions) ([]*workv1.ManifestWork, error) { + opts := metav1.ListOptions{} + if options.ClusterName != types.ClusterAll { + opts.FieldSelector = fmt.Sprintf("metadata.namespace=%s", options.ClusterName) + } + + return l.store.List(opts) +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/base.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/base.go new file mode 100644 index 000000000..5e21e7167 --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/base.go @@ -0,0 +1,256 @@ +package store + +import ( + "fmt" + "strconv" + "sync" + "time" + + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubetypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils" +) + +const ManifestWorkFinalizer = "cloudevents.open-cluster-management.io/manifest-work-cleanup" + +type baseStore struct { + sync.RWMutex + + result chan watch.Event + done chan struct{} + + store cache.Store + + initiated bool + + // a queue to save the received work events + receivedWorks workqueue.RateLimitingInterface +} + +// ResultChan implements watch interface. +func (b *baseStore) ResultChan() <-chan watch.Event { + return b.result +} + +// Stop implements watch interface. +func (b *baseStore) Stop() { + // Call Close() exactly once by locking and setting a flag. + b.Lock() + defer b.Unlock() + + // closing a closed channel always panics, therefore check before closing + select { + case <-b.done: + close(b.result) + default: + close(b.done) + } +} + +// List the works from the store with the list options +func (b *baseStore) List(opts metav1.ListOptions) ([]*workv1.ManifestWork, error) { + b.RLock() + defer b.RUnlock() + + return utils.ListWorksWithOptions(b.store, opts) +} + +// Get a works from the store +func (b *baseStore) Get(namespace, name string) (*workv1.ManifestWork, error) { + b.RLock() + defer b.RUnlock() + + obj, exists, err := b.store.GetByKey(fmt.Sprintf("%s/%s", namespace, name)) + if err != nil { + return nil, err + } + + if !exists { + return nil, errors.NewNotFound(common.ManifestWorkGR, name) + } + + work, ok := obj.(*workv1.ManifestWork) + if !ok { + return nil, fmt.Errorf("unknown type %T", obj) + } + + return work, nil +} + +// List all of works from the store +func (b *baseStore) ListAll() ([]*workv1.ManifestWork, error) { + b.RLock() + defer b.RUnlock() + + works := []*workv1.ManifestWork{} + for _, obj := range b.store.List() { + if work, ok := obj.(*workv1.ManifestWork); ok { + works = append(works, work) + } + } + + return works, nil +} + +func (b *baseStore) HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error { + switch action { + case types.StatusModified: + b.receivedWorks.Add(work) + default: + return fmt.Errorf("unsupported resource action %s", action) + } + return nil +} + +// workProcessor process the received works from given work queue with a specific store +type workProcessor struct { + works workqueue.RateLimitingInterface + store WorkClientWatcherStore +} + +func newWorkProcessor(works workqueue.RateLimitingInterface, store WorkClientWatcherStore) *workProcessor { + return &workProcessor{ + works: works, + store: store, + } +} + +func (b *workProcessor) run(stopCh <-chan struct{}) { + defer b.works.ShutDown() + + // start a goroutine to handle the works from the queue + // the .Until will re-kick the runWorker one second after the runWorker completes + go wait.Until(b.runWorker, time.Second, stopCh) + + // wait until we're told to stop + <-stopCh +} + +func (b *workProcessor) runWorker() { + // hot loop until we're told to stop. processNextEvent will automatically wait until there's work available, so + // we don't worry about secondary waits + for b.processNextWork() { + } +} + +// processNextWork deals with one key off the queue. +func (b *workProcessor) processNextWork() bool { + // pull the next event item from queue. + // events queue blocks until it can return an item to be processed + key, quit := b.works.Get() + if quit { + // the current queue is shutdown and becomes empty, quit this process + return false + } + defer b.works.Done(key) + + if err := b.handleWork(key.(*workv1.ManifestWork)); err != nil { + // we failed to handle the work, we should requeue the item to work on later + // this method will add a backoff to avoid hotlooping on particular items + b.works.AddRateLimited(key) + return true + } + + // we handle the event successfully, tell the queue to stop tracking history for this event + b.works.Forget(key) + return true +} + +func (b *workProcessor) handleWork(work *workv1.ManifestWork) error { + lastWork := b.getWork(work.UID) + if lastWork == nil { + // the work is not found from the local cache and it has been deleted by the agent, + // ignore this work. + if meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) { + return nil + } + + // the work is not found, there are two cases: + // 1) the source is restarted and the local cache is not ready, requeue this work. + // 2) (TODO) during the source restart, the work is deleted forcibly, we may need an + // eviction mechanism for this. + return errors.NewNotFound(common.ManifestWorkGR, string(work.UID)) + } + + updatedWork := lastWork.DeepCopy() + if meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) { + updatedWork.Finalizers = []string{} + // delete the work from the local cache. + return b.store.Delete(updatedWork) + } + + lastResourceVersion, err := strconv.Atoi(lastWork.ResourceVersion) + if err != nil { + klog.Errorf("invalid resource version for work %s/%s, %v", lastWork.Namespace, lastWork.Name, err) + return nil + } + + resourceVersion, err := strconv.Atoi(work.ResourceVersion) + if err != nil { + klog.Errorf("invalid resource version for work %s/%s, %v", lastWork.Namespace, lastWork.Name, err) + return nil + } + + // the current work's version is maintained on source and the agent's work is newer than source, ignore + if lastResourceVersion != 0 && resourceVersion > lastResourceVersion { + klog.Warningf("the work %s/%s resource version %d is great than its generation %d, ignore", + lastWork.Namespace, lastWork.Name, resourceVersion, lastResourceVersion) + return nil + } + + // no status change + if equality.Semantic.DeepEqual(lastWork.Status, work.Status) { + return nil + } + + // the work has been handled by agent, we ensure a finalizer on the work + updatedWork.Finalizers = ensureFinalizers(updatedWork.Finalizers) + updatedWork.Status = work.Status + // update the work with status in the local cache. + return b.store.Update(updatedWork) +} + +func (b *workProcessor) getWork(uid kubetypes.UID) *workv1.ManifestWork { + works, err := b.store.ListAll() + if err != nil { + klog.Errorf("failed to lists works, %v", err) + return nil + } + + for _, work := range works { + if work.UID == uid { + return work + } + } + + return nil +} + +func ensureFinalizers(workFinalizers []string) []string { + has := false + for _, f := range workFinalizers { + if f == ManifestWorkFinalizer { + has = true + break + } + } + + if !has { + workFinalizers = append(workFinalizers, ManifestWorkFinalizer) + } + + return workFinalizers +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go new file mode 100644 index 000000000..d2d6fb3c4 --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go @@ -0,0 +1,60 @@ +package store + +import ( + "context" + + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + workv1 "open-cluster-management.io/api/work/v1" +) + +// InformerWatcherStore extends the baseStore. +// It gets/lists the works from the given informer store and send +// the work add/update/delete event to the watch channel directly. +type InformerWatcherStore struct { + baseStore +} + +var _ watch.Interface = &LocalWatcherStore{} +var _ WorkClientWatcherStore = &InformerWatcherStore{} + +func NewInformerWatcherStore(ctx context.Context) *InformerWatcherStore { + s := &InformerWatcherStore{ + baseStore: baseStore{ + result: make(chan watch.Event), + done: make(chan struct{}), + receivedWorks: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "informer-watcher-store"), + }, + } + + // start a goroutine to process the received work events from the work queue with current store. + go newWorkProcessor(s.baseStore.receivedWorks, s).run(ctx.Done()) + + return s +} + +func (s *InformerWatcherStore) Add(work *workv1.ManifestWork) error { + s.result <- watch.Event{Type: watch.Added, Object: work} + return nil +} + +func (s *InformerWatcherStore) Update(work *workv1.ManifestWork) error { + s.result <- watch.Event{Type: watch.Modified, Object: work} + return nil +} + +func (s *InformerWatcherStore) Delete(work *workv1.ManifestWork) error { + s.result <- watch.Event{Type: watch.Deleted, Object: work} + return nil +} + +func (s *InformerWatcherStore) HasInitiated() bool { + return s.initiated +} + +func (s *InformerWatcherStore) SetStore(store cache.Store) { + s.store = store + s.initiated = true +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/interface.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/interface.go new file mode 100644 index 000000000..f5f72d1e3 --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/interface.go @@ -0,0 +1,75 @@ +package store + +import ( + "context" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" + + workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" +) + +const syncedPollPeriod = 100 * time.Millisecond + +// StoreInitiated is a function that can be used to determine if an informer has synced. +// This is useful for determining if caches have synced. +type StoreInitiated func() bool + +// WorkClientWatcherStore extends the watch interface with a work store. +type WorkClientWatcherStore interface { + watch.Interface + + // HandleReceivedWork handles the client received work events. + HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error + + // Add will be called by work client when adding works. The implementation is based on the specific + // watcher store, in some case, it does not need to update a store, but just send a watch event. + Add(work *workv1.ManifestWork) error + + // Update will be called by work client when updating works. The implementation is based on the specific + // watcher store, in some case, it does not need to update a store, but just send a watch event. + Update(work *workv1.ManifestWork) error + + // Delete will be called by work client when deleting works. The implementation is based on the specific + // watcher store, in some case, it does not need to update a store, but just send a watch event. + Delete(work *workv1.ManifestWork) error + + // List returns the works from store with list options + List(opts metav1.ListOptions) ([]*workv1.ManifestWork, error) + + // ListAll list all of the works from store + ListAll() ([]*workv1.ManifestWork, error) + + // Get returns a work from store with work namespace and name + Get(namespace, name string) (*workv1.ManifestWork, error) + + // HasInitiated marks the store has been initiated, A resync may be required after the store is initiated + // when building a work client. + HasInitiated() bool +} + +func WaitForStoreInit(ctx context.Context, cacheSyncs ...StoreInitiated) bool { + err := wait.PollUntilContextCancel( + ctx, + syncedPollPeriod, + true, + func(ctx context.Context) (bool, error) { + for _, syncFunc := range cacheSyncs { + if !syncFunc() { + return false, nil + } + } + return true, nil + }, + ) + if err != nil { + klog.Errorf("stop WaitForStoreInit, %v", err) + return false + } + + return true +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go new file mode 100644 index 000000000..5e3542b36 --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go @@ -0,0 +1,209 @@ +package store + +import ( + "context" + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils" +) + +// ListLocalWorksFunc loads the works from the local environment. +type ListLocalWorksFunc func(ctx context.Context) ([]*workv1.ManifestWork, error) + +type watchEvent struct { + Key string + Type watch.EventType +} + +var _ watch.Interface = &LocalWatcherStore{} +var _ WorkClientWatcherStore = &LocalWatcherStore{} + +// LocalWatcherStore caches the works in this local store and provide the watch ability by watch event channel. +type LocalWatcherStore struct { + baseStore + eventQueue cache.Queue +} + +// NewLocalWatcherStore returns a LocalWatcherStore with works that list by ListLocalWorksFunc +func NewLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc) (*LocalWatcherStore, error) { + works, err := listFunc(ctx) + if err != nil { + return nil, err + } + + // A local store to cache the works + store := cache.NewStore(cache.MetaNamespaceKeyFunc) + for _, work := range works { + if err := utils.Validate(work); err != nil { + return nil, err + } + + if err := store.Add(work.DeepCopy()); err != nil { + return nil, err + } + } + + s := &LocalWatcherStore{ + baseStore: baseStore{ + // A channel for watcher, it's easy for a consumer to add buffering via an extra + // goroutine/channel, but impossible for them to remove it, so nonbuffered is better. + result: make(chan watch.Event), + // If the watcher is externally stopped there is no receiver anymore + // and the send operations on the result channel, especially the + // error reporting might block forever. + // Therefore a dedicated stop channel is used to resolve this blocking. + done: make(chan struct{}), + + store: store, + initiated: true, + + // A queue to save the received work events, it helps us retry events + // where errors occurred while processing + receivedWorks: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "local-watcher-store"), + }, + + // A queue to save the work client send events, if run a client without a watcher, + // it will block the client, this queue helps to resolve this blocking. + // Only save the latest event for a work. + eventQueue: cache.NewFIFO(func(obj interface{}) (string, error) { + evt, ok := obj.(*watchEvent) + if !ok { + return "", fmt.Errorf("unknown object type %T", obj) + } + + return evt.Key, nil + }), + } + + // start a goroutine to process the received work events from the work queue with current store. + go newWorkProcessor(s.baseStore.receivedWorks, s).run(ctx.Done()) + + // start a goroutine to handle the events that are produced by work client + go wait.Until(s.processLoop, time.Second, ctx.Done()) + + return s, nil +} + +// Add a work to the cache and send an event to the event queue +func (s *LocalWatcherStore) Add(work *workv1.ManifestWork) error { + s.Lock() + defer s.Unlock() + + if err := s.store.Add(work); err != nil { + return err + } + + return s.eventQueue.Add(&watchEvent{Key: key(work), Type: watch.Added}) +} + +// Update a work in the cache and send an event to the event queue +func (s *LocalWatcherStore) Update(work *workv1.ManifestWork) error { + s.Lock() + defer s.Unlock() + + if err := s.store.Update(work); err != nil { + return err + } + + return s.eventQueue.Update(&watchEvent{Key: key(work), Type: watch.Modified}) +} + +// Delete a work from the cache and send an event to the event queue +func (s *LocalWatcherStore) Delete(work *workv1.ManifestWork) error { + s.Lock() + defer s.Unlock() + + if err := s.store.Delete(work); err != nil { + return err + } + + return s.eventQueue.Update(&watchEvent{Key: key(work), Type: watch.Deleted}) +} + +// processLoop drains the work event queue and send the event to the watch channel. +func (s *LocalWatcherStore) processLoop() { + for { + // this will be blocked until the event queue has events + obj, err := s.eventQueue.Pop(func(interface{}, bool) error { + // do nothing + return nil + }) + if err != nil { + if err == cache.ErrFIFOClosed { + return + } + + klog.Warningf("failed to pop the %v requeue it, %v", obj, err) + // this is the safe way to re-enqueue. + if err := s.eventQueue.AddIfNotPresent(obj); err != nil { + klog.Errorf("failed to requeue the obj %v, %v", obj, err) + return + } + } + + evt, ok := obj.(*watchEvent) + if !ok { + klog.Errorf("unknown the object type %T from the event queue", obj) + return + } + + obj, exists, err := s.store.GetByKey(evt.Key) + if err != nil { + klog.Errorf("failed to get the work %s, %v", evt.Key, err) + return + } + + if !exists { + if evt.Type == watch.Deleted { + namespace, name, err := cache.SplitMetaNamespaceKey(evt.Key) + if err != nil { + klog.Errorf("unexpected event key %s, %v", evt.Key, err) + return + } + + // the work has been deleted, return a work only with its namespace and name + // this will be blocked until this event is consumed + s.result <- watch.Event{ + Type: watch.Deleted, + Object: &workv1.ManifestWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }, + } + return + } + + klog.Errorf("the work %s does not exist in the cache", evt.Key) + return + } + + work, ok := obj.(*workv1.ManifestWork) + if !ok { + klog.Errorf("unknown the object type %T from the cache", obj) + return + } + + // this will be blocked until this event is consumed + s.result <- watch.Event{Type: evt.Type, Object: work} + } +} + +func (c *LocalWatcherStore) HasInitiated() bool { + return c.initiated +} + +func key(work *workv1.ManifestWork) string { + return work.Namespace + "/" + work.Name +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go index 51b0b3354..886cab552 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go @@ -6,8 +6,19 @@ import ( jsonpatch "github.com/evanphx/json-patch" "github.com/google/uuid" + + "k8s.io/apimachinery/pkg/api/validation" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1validation "k8s.io/apimachinery/pkg/apis/meta/v1/validation" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/client-go/tools/cache" + workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/apis/work/v1/validator" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" ) @@ -53,3 +64,92 @@ func UID(sourceID, namespace, name string) string { id := fmt.Sprintf("%s-%s-%s-%s", sourceID, common.ManifestWorkGR.String(), namespace, name) return uuid.NewSHA1(uuid.NameSpaceOID, []byte(id)).String() } + +// ListWorksWithOptions retrieves the manifestworks from store which matches the options. +func ListWorksWithOptions(store cache.Store, opts metav1.ListOptions) ([]*workv1.ManifestWork, error) { + var err error + + labelSelector := labels.Everything() + fieldSelector := fields.Everything() + + if len(opts.LabelSelector) != 0 { + labelSelector, err = labels.Parse(opts.LabelSelector) + if err != nil { + return nil, fmt.Errorf("invalid labels selector %q: %v", opts.LabelSelector, err) + } + } + + if len(opts.FieldSelector) != 0 { + fieldSelector, err = fields.ParseSelector(opts.FieldSelector) + if err != nil { + return nil, fmt.Errorf("invalid fields selector %q: %v", opts.FieldSelector, err) + } + } + + works := []*workv1.ManifestWork{} + // list with labels + if err := cache.ListAll(store, labelSelector, func(obj interface{}) { + work, ok := obj.(*workv1.ManifestWork) + if !ok { + return + } + + workFieldSet := fields.Set{ + "metadata.name": work.Name, + "metadata.namespace": work.Namespace, + } + + if !fieldSelector.Matches(workFieldSet) { + return + } + + works = append(works, work) + }); err != nil { + return nil, err + } + + return works, nil +} + +func Validate(work *workv1.ManifestWork) error { + fldPath := field.NewPath("metadata") + errs := field.ErrorList{} + + if work.UID == "" { + errs = append(errs, field.Required(fldPath.Child("uid"), "field not set")) + } + + if work.ResourceVersion == "" { + errs = append(errs, field.Required(fldPath.Child("resourceVersion"), "field not set")) + } + + if work.Name == "" { + errs = append(errs, field.Required(fldPath.Child("name"), "field not set")) + } + + for _, msg := range validation.ValidateNamespaceName(work.Name, false) { + errs = append(errs, field.Invalid(fldPath.Child("name"), work.Name, msg)) + } + + if work.Namespace == "" { + errs = append(errs, field.Required(fldPath.Child("namespace"), "field not set")) + } + + for _, msg := range validation.ValidateNamespaceName(work.Namespace, false) { + errs = append(errs, field.Invalid(fldPath.Child("namespace"), work.Namespace, msg)) + } + + errs = append(errs, validation.ValidateAnnotations(work.Annotations, fldPath.Child("annotations"))...) + errs = append(errs, validation.ValidateFinalizers(work.Finalizers, fldPath.Child("finalizers"))...) + errs = append(errs, metav1validation.ValidateLabels(work.Labels, fldPath.Child("labels"))...) + + if err := validator.ManifestValidator.ValidateManifests(work.Spec.Workload.Manifests); err != nil { + errs = append(errs, field.Invalid(field.NewPath("spec"), "spec", err.Error())) + } + + if len(errs) == 0 { + return nil + } + + return fmt.Errorf(errs.ToAggregate().Error()) +}