From 769e929203fa80d86e28fd4a936fa090f40a0a2f Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Thu, 13 Jun 2024 17:47:46 +0800 Subject: [PATCH] upgrade sdk Signed-off-by: Wei Liu --- go.mod | 2 +- go.sum | 4 +- pkg/work/hub/manager.go | 4 +- pkg/work/spoke/spokeagent.go | 66 +++++++---- vendor/modules.txt | 4 +- .../work/agent/client/manifestwork.go | 58 +++++---- .../work/agent/handler/resourcehandler.go | 59 ---------- .../cloudevents/work/agent/lister/lister.go | 35 ++++++ .../pkg/cloudevents/work/clientbuilder.go | 111 +++++++++++++----- .../cloudevents/work/clientinformerbuilder.go | 107 ----------------- .../sdk-go/pkg/cloudevents/work/lister.go | 31 ----- .../work/payload/manifestbundle.go | 5 + .../work/source/codec/manifestbundle.go | 45 ++++++- .../pkg/cloudevents/work/store/informer.go | 110 +++++++++++++++-- .../pkg/cloudevents/work/store/local.go | 26 ++-- 15 files changed, 358 insertions(+), 309 deletions(-) delete mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go delete mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientinformerbuilder.go delete mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/lister.go diff --git a/go.mod b/go.mod index e2c8087d8..f62283ad3 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( k8s.io/utils v0.0.0-20240310230437-4693a0247e57 open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556 open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc - open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523 + open-cluster-management.io/sdk-go v0.13.1-0.20240614070053-a01091a14da7 sigs.k8s.io/controller-runtime v0.17.3 sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96 ) diff --git a/go.sum b/go.sum index ad2a3a5c4..a4a85b683 100644 --- a/go.sum +++ b/go.sum @@ -469,8 +469,8 @@ open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556 open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556/go.mod h1:HayKCznnlyW+0dUJQGj5sNR6i3tvylSySD3YnvZkBtY= open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc h1:tcfncubZRFphYtDXBE7ApBNlSnj1RNazhW+8F01XYYg= open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc/go.mod h1:ltijKJhDifrPH0csvCUmFt5lzaERv+BBfh6X3l83rT0= -open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523 h1:XMtsnv0zT8UvXcH4JbqquzhK4BK/XrCg81pCmp50VDs= -open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= +open-cluster-management.io/sdk-go v0.13.1-0.20240614070053-a01091a14da7 h1:/Tit/ldsK/+gwYpljBPzOGpFwdN44+yIOiHO+kja5XU= +open-cluster-management.io/sdk-go v0.13.1-0.20240614070053-a01091a14da7/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y= sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk= diff --git a/pkg/work/hub/manager.go b/pkg/work/hub/manager.go index c8b5347f3..0d057cf1f 100644 --- a/pkg/work/hub/manager.go +++ b/pkg/work/hub/manager.go @@ -68,7 +68,7 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller ) var workClient workclientset.Interface - var watcherStore *store.InformerWatcherStore + var watcherStore *store.SourceInformerWatcherStore if c.workOptions.WorkDriver == "kube" { config := controllerContext.KubeConfig @@ -88,7 +88,7 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller // ManifestWorkInterface and ManifestWork informer based on different driver configuration. // Refer to Event Based Manifestwork proposal in enhancements repo to get more details. - watcherStore = store.NewInformerWatcherStore(ctx) + watcherStore = store.NewSourceInformerWatcherStore(ctx) _, config, err := generic.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig). LoadConfig() diff --git a/pkg/work/spoke/spokeagent.go b/pkg/work/spoke/spokeagent.go index 32d522d42..b85489d86 100644 --- a/pkg/work/spoke/spokeagent.go +++ b/pkg/work/spoke/spokeagent.go @@ -22,6 +22,7 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" "open-cluster-management.io/ocm/pkg/common/options" "open-cluster-management.io/ocm/pkg/features" @@ -213,45 +214,60 @@ func (o *WorkAgentConfig) newHubWorkClientAndInformer( ctx context.Context, restMapper meta.RESTMapper, ) (string, workv1client.ManifestWorkInterface, workv1informers.ManifestWorkInformer, error) { + var workClient workclientset.Interface + var watcherStore *store.AgentInformerWatcherStore + var hubHost string + if o.workOptions.WorkloadSourceDriver == "kube" { config, err := clientcmd.BuildConfigFromFlags("", o.workOptions.WorkloadSourceConfig) if err != nil { return "", nil, nil, err } - kubeWorkClientSet, err := workclientset.NewForConfig(config) + workClient, err = workclientset.NewForConfig(config) if err != nil { return "", nil, nil, err } - factory := workinformers.NewSharedInformerFactoryWithOptions( - kubeWorkClientSet, - 5*time.Minute, - workinformers.WithNamespace(o.agentOptions.SpokeClusterName), - ) - informer := factory.Work().V1().ManifestWorks() + hubHost = config.Host + } else { + // For cloudevents drivers, we build ManifestWork client that implements the + // ManifestWorkInterface and ManifestWork informer based on different driver configuration. + // Refer to Event Based Manifestwork proposal in enhancements repo to get more details. - return config.Host, kubeWorkClientSet.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName), informer, nil - } + watcherStore = store.NewAgentInformerWatcherStore() - // For cloudevents drivers, we build ManifestWork client that implements the - // ManifestWorkInterface and ManifestWork informer based on different driver configuration. - // Refer to Event Based Manifestwork proposal in enhancements repo to get more details. - hubHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig). - LoadConfig() - if err != nil { - return "", nil, nil, err + serverHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig). + LoadConfig() + if err != nil { + return "", nil, nil, err + } + + clientHolder, err := cloudeventswork.NewClientHolderBuilder(config). + WithClientID(o.workOptions.CloudEventsClientID). + WithClusterName(o.agentOptions.SpokeClusterName). + WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...). + WithWorkClientWatcherStore(watcherStore). + NewAgentClientHolder(ctx) + if err != nil { + return "", nil, nil, err + } + + hubHost = serverHost + workClient = clientHolder.WorkInterface() } - clientHolder, informer, err := cloudeventswork.NewClientHolderBuilder(config). - WithClientID(o.workOptions.CloudEventsClientID). - WithInformerConfig(5*time.Minute, workinformers.WithNamespace(o.agentOptions.SpokeClusterName)). - WithClusterName(o.agentOptions.SpokeClusterName). - WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...). - NewAgentClientHolderWithInformer(ctx) - if err != nil { - return "", nil, nil, err + factory := workinformers.NewSharedInformerFactoryWithOptions( + workClient, + 5*time.Minute, + workinformers.WithNamespace(o.agentOptions.SpokeClusterName), + ) + informer := factory.Work().V1().ManifestWorks() + + // For cloudevents work client, we use the informer store as the client store + if watcherStore != nil { + watcherStore.SetStore(informer.Informer().GetStore()) } - return hubHost, clientHolder.ManifestWorks(o.agentOptions.SpokeClusterName), informer, err + return hubHost, workClient.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName), informer, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 883cda8b9..d13b7ab51 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1589,7 +1589,7 @@ open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/utils/work/v1/workvalidator open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523 +# open-cluster-management.io/sdk-go v0.13.1-0.20240614070053-a01091a14da7 ## explicit; go 1.21 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 @@ -1611,7 +1611,7 @@ open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types open-cluster-management.io/sdk-go/pkg/cloudevents/work open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec -open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler +open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister open-cluster-management.io/sdk-go/pkg/cloudevents/work/common open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go index 2b0624241..47754f67f 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go @@ -12,36 +12,37 @@ import ( "k8s.io/klog/v2" workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" - workv1lister "open-cluster-management.io/api/client/work/listers/work/v1" workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher" ) // ManifestWorkAgentClient implements the ManifestWorkInterface. It sends the manifestworks status back to source by // CloudEventAgentClient. type ManifestWorkAgentClient struct { cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork] - watcher *watcher.ManifestWorkWatcher - lister workv1lister.ManifestWorkNamespaceLister + watcherStore store.WorkClientWatcherStore + clusterName string } var _ workv1client.ManifestWorkInterface = &ManifestWorkAgentClient{} -func NewManifestWorkAgentClient(cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork], watcher *watcher.ManifestWorkWatcher) *ManifestWorkAgentClient { +func NewManifestWorkAgentClient( + cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork], + watcherStore store.WorkClientWatcherStore, + clusterName string, +) *ManifestWorkAgentClient { return &ManifestWorkAgentClient{ cloudEventsClient: cloudEventsClient, - watcher: watcher, + watcherStore: watcherStore, + clusterName: clusterName, } } -func (c *ManifestWorkAgentClient) SetLister(lister workv1lister.ManifestWorkNamespaceLister) { - c.lister = lister -} - func (c *ManifestWorkAgentClient) Create(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.CreateOptions) (*workv1.ManifestWork, error) { return nil, errors.NewMethodNotSupported(common.ManifestWorkGR, "create") } @@ -64,27 +65,32 @@ func (c *ManifestWorkAgentClient) DeleteCollection(ctx context.Context, opts met func (c *ManifestWorkAgentClient) Get(ctx context.Context, name string, opts metav1.GetOptions) (*workv1.ManifestWork, error) { klog.V(4).Infof("getting manifestwork %s", name) - return c.lister.Get(name) + return c.watcherStore.Get(c.clusterName, name) } func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) { - klog.V(4).Infof("sync manifestworks") - // send resync request to fetch manifestworks from source when the ManifestWorkInformer starts - if err := c.cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { + klog.V(4).Infof("list manifestworks") + works, err := c.watcherStore.List(opts) + if err != nil { return nil, err } - return &workv1.ManifestWorkList{}, nil + items := []workv1.ManifestWork{} + for _, work := range works { + items = append(items, *work) + } + + return &workv1.ManifestWorkList{Items: items}, nil } func (c *ManifestWorkAgentClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return c.watcher, nil + return c.watcherStore, nil } func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kubetypes.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *workv1.ManifestWork, err error) { klog.V(4).Infof("patching manifestwork %s", name) - lastWork, err := c.lister.Get(name) + lastWork, err := c.watcherStore.Get(c.clusterName, name) if err != nil { return nil, err } @@ -117,8 +123,10 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub return nil, err } - // refresh the work status in the ManifestWorkInformer local cache with patched work. - c.watcher.Receive(watch.Event{Type: watch.Modified, Object: newWork}) + if err := c.watcherStore.Update(newWork); err != nil { + return nil, err + + } return newWork, nil } @@ -137,13 +145,17 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub return nil, err } - // delete the manifestwork from the ManifestWorkInformer local cache. - c.watcher.Receive(watch.Event{Type: watch.Deleted, Object: newWork}) + if err := c.watcherStore.Delete(newWork); err != nil { + return nil, err + } + return newWork, nil } - // refresh the work in the ManifestWorkInformer local cache with patched work. - c.watcher.Receive(watch.Event{Type: watch.Modified, Object: newWork}) + if err := c.watcherStore.Update(newWork); err != nil { + return nil, err + } + return newWork, nil } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go deleted file mode 100644 index 983c7634c..000000000 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go +++ /dev/null @@ -1,59 +0,0 @@ -package handler - -import ( - "fmt" - - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/watch" - workv1lister "open-cluster-management.io/api/client/work/listers/work/v1" - - workv1 "open-cluster-management.io/api/work/v1" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher" -) - -// NewManifestWorkAgentHandler returns a ResourceHandler for a ManifestWork on managed cluster. It sends the kube events -// with ManifestWorWatcher after CloudEventAgentClient received the ManifestWork specs from source, then the -// ManifestWorkInformer handles the kube events in its local cache. -func NewManifestWorkAgentHandler(lister workv1lister.ManifestWorkNamespaceLister, watcher *watcher.ManifestWorkWatcher) generic.ResourceHandler[*workv1.ManifestWork] { - return func(action types.ResourceAction, work *workv1.ManifestWork) error { - switch action { - case types.Added: - watcher.Receive(watch.Event{Type: watch.Added, Object: work}) - case types.Modified: - lastWork, err := lister.Get(work.Name) - if err != nil { - return err - } - - updatedWork := work.DeepCopy() - - // restore the fields that are maintained by local agent - updatedWork.Labels = lastWork.Labels - updatedWork.Annotations = lastWork.Annotations - updatedWork.Finalizers = lastWork.Finalizers - updatedWork.Status = lastWork.Status - - watcher.Receive(watch.Event{Type: watch.Modified, Object: updatedWork}) - case types.Deleted: - // the manifestwork is deleting on the source, we just update its deletion timestamp. - lastWork, err := lister.Get(work.Name) - if errors.IsNotFound(err) { - return nil - } - - if err != nil { - return err - } - - updatedWork := lastWork.DeepCopy() - updatedWork.DeletionTimestamp = work.DeletionTimestamp - watcher.Receive(watch.Event{Type: watch.Modified, Object: updatedWork}) - default: - return fmt.Errorf("unsupported resource action %s", action) - } - - return nil - } -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go new file mode 100644 index 000000000..e67bd9c1b --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go @@ -0,0 +1,35 @@ +package lister + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" +) + +// WatcherStoreLister list the ManifestWorks from WorkClientWatcherStore +type WatcherStoreLister struct { + store store.WorkClientWatcherStore +} + +func NewWatcherStoreLister(store store.WorkClientWatcherStore) *WatcherStoreLister { + return &WatcherStoreLister{ + store: store, + } +} + +// List returns the ManifestWorks from a WorkClientWatcherStore with list options +func (l *WatcherStoreLister) List(options types.ListOptions) ([]*workv1.ManifestWork, error) { + opts := metav1.ListOptions{} + + if options.Source != types.SourceAll { + opts.LabelSelector = fmt.Sprintf("%s=%s", common.CloudEventsOriginalSourceLabelKey, options.Source) + } + + return l.store.List(opts) +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go index 7497ae5f9..4d2a96f26 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go @@ -3,25 +3,23 @@ package work import ( "context" "fmt" - "time" "k8s.io/klog/v2" workclientset "open-cluster-management.io/api/client/work/clientset/versioned" workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" - workinformers "open-cluster-management.io/api/client/work/informers/externalversions" workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" + agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client" + agentlister "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal" sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister" + sourcelister "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" ) -const defaultInformerResyncTime = 10 * time.Minute - // ClientHolder holds a manifestwork client that implements the ManifestWorkInterface based on different configuration // // ClientHolder also implements the ManifestWorksGetter interface. @@ -43,15 +41,13 @@ func (h *ClientHolder) ManifestWorks(namespace string) workv1client.ManifestWork // ClientHolderBuilder builds the ClientHolder with different configuration. type ClientHolderBuilder struct { - config any - watcherStore store.WorkClientWatcherStore - codecs []generic.Codec[*workv1.ManifestWork] - informerOptions []workinformers.SharedInformerOption - informerResyncTime time.Duration - sourceID string - clusterName string - clientID string - resync bool + config any + watcherStore store.WorkClientWatcherStore + codecs []generic.Codec[*workv1.ManifestWork] + sourceID string + clusterName string + clientID string + resync bool } // NewClientHolderBuilder returns a ClientHolderBuilder with a given configuration. @@ -64,9 +60,8 @@ type ClientHolderBuilder struct { // TODO using a specified config instead of any func NewClientHolderBuilder(config any) *ClientHolderBuilder { return &ClientHolderBuilder{ - config: config, - informerResyncTime: defaultInformerResyncTime, - resync: true, + config: config, + resync: true, } } @@ -94,16 +89,6 @@ func (b *ClientHolderBuilder) WithCodecs(codecs ...generic.Codec[*workv1.Manifes return b } -// WithInformerConfig set the ManifestWorkInformer configs. If the resync time is not set, the default time (10 minutes) -// will be used when building the ManifestWorkInformer. -func (b *ClientHolderBuilder) WithInformerConfig( - resyncTime time.Duration, options ...workinformers.SharedInformerOption, -) *ClientHolderBuilder { - b.informerResyncTime = resyncTime - b.informerOptions = options - return b -} - // WithWorkClientWatcherStore set the WorkClientWatcherStore. The client will use this store to caches the works and // watch the work events. func (b *ClientHolderBuilder) WithWorkClientWatcherStore(store store.WorkClientWatcherStore) *ClientHolderBuilder { @@ -141,7 +126,7 @@ func (b *ClientHolderBuilder) NewSourceClientHolder(ctx context.Context) (*Clien cloudEventsClient, err := generic.NewCloudEventSourceClient[*workv1.ManifestWork]( ctx, options, - lister.NewWatcherStoreLister(b.watcherStore), + sourcelister.NewWatcherStoreLister(b.watcherStore), ManifestWorkStatusHash, b.codecs..., ) @@ -179,7 +164,75 @@ func (b *ClientHolderBuilder) NewSourceClientHolder(ctx context.Context) (*Clien go func() { if store.WaitForStoreInit(ctx, b.watcherStore.HasInitiated) { if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil { - klog.Errorf("failed to resync") + klog.Errorf("failed to send resync request, %v", err) + } + } + }() + + return &ClientHolder{workClientSet: workClientSet}, nil +} + +// NewAgentClientHolder returns a ClientHolder for an agent +func (b *ClientHolderBuilder) NewAgentClientHolder(ctx context.Context) (*ClientHolder, error) { + if len(b.clientID) == 0 { + return nil, fmt.Errorf("client id is required") + } + + if len(b.clusterName) == 0 { + return nil, fmt.Errorf("cluster name is required") + } + + if b.watcherStore == nil { + return nil, fmt.Errorf("watcher store is required") + } + + options, err := generic.BuildCloudEventsAgentOptions(b.config, b.clusterName, b.clientID) + if err != nil { + return nil, err + } + + cloudEventsClient, err := generic.NewCloudEventAgentClient[*workv1.ManifestWork]( + ctx, + options, + agentlister.NewWatcherStoreLister(b.watcherStore), + ManifestWorkStatusHash, + b.codecs..., + ) + if err != nil { + return nil, err + } + + // start to subscribe + cloudEventsClient.Subscribe(ctx, b.watcherStore.HandleReceivedWork) + + manifestWorkClient := agentclient.NewManifestWorkAgentClient(cloudEventsClient, b.watcherStore, b.clusterName) + workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} + workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} + + if !b.resync { + return &ClientHolder{workClientSet: workClientSet}, nil + } + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-cloudEventsClient.ReconnectedChan(): + // when receiving a client reconnected signal, we resync all sources for this agent + // TODO after supporting multiple sources, we should only resync agent known sources + if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { + klog.Errorf("failed to send resync request, %v", err) + } + } + } + }() + + // start a go routine to resync the works after this client's store is initiated + go func() { + if store.WaitForStoreInit(ctx, b.watcherStore.HasInitiated) { + if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { + klog.Errorf("failed to send resync request, %v", err) } } }() diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientinformerbuilder.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientinformerbuilder.go deleted file mode 100644 index c7f443d2d..000000000 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientinformerbuilder.go +++ /dev/null @@ -1,107 +0,0 @@ -package work - -import ( - "context" - "fmt" - - "k8s.io/client-go/rest" - "k8s.io/klog/v2" - - workclientset "open-cluster-management.io/api/client/work/clientset/versioned" - workinformers "open-cluster-management.io/api/client/work/informers/externalversions" - workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" - workv1 "open-cluster-management.io/api/work/v1" - - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client" - agenthandler "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher" -) - -// NewAgentClientHolderWithInformer returns a ClientHolder with a ManifestWorkInformer for an agent. -// This function is suitable for the scenarios where a ManifestWorkInformer is required -// -// Note: the ClientHolder should be used after the ManifestWorkInformer starts -// -// TODO enhance the manifestwork agent client with WorkClientWatcherCache -func (b *ClientHolderBuilder) NewAgentClientHolderWithInformer( - ctx context.Context) (*ClientHolder, workv1informers.ManifestWorkInformer, error) { - switch config := b.config.(type) { - case *rest.Config: - kubeWorkClientSet, err := workclientset.NewForConfig(config) - if err != nil { - return nil, nil, err - } - - factory := workinformers.NewSharedInformerFactoryWithOptions(kubeWorkClientSet, b.informerResyncTime, b.informerOptions...) - return &ClientHolder{workClientSet: kubeWorkClientSet}, factory.Work().V1().ManifestWorks(), nil - default: - options, err := generic.BuildCloudEventsAgentOptions(config, b.clusterName, b.clientID) - if err != nil { - return nil, nil, err - } - return b.newAgentClients(ctx, options) - } -} - -func (b *ClientHolderBuilder) newAgentClients( - ctx context.Context, - agentOptions *options.CloudEventsAgentOptions, -) (*ClientHolder, workv1informers.ManifestWorkInformer, error) { - if len(b.clientID) == 0 { - return nil, nil, fmt.Errorf("client id is required") - } - - if len(b.clusterName) == 0 { - return nil, nil, fmt.Errorf("cluster name is required") - } - - workLister := &ManifestWorkLister{} - watcher := watcher.NewManifestWorkWatcher() - cloudEventsClient, err := generic.NewCloudEventAgentClient[*workv1.ManifestWork]( - ctx, - agentOptions, - workLister, - ManifestWorkStatusHash, - b.codecs..., - ) - if err != nil { - return nil, nil, err - } - - manifestWorkClient := agentclient.NewManifestWorkAgentClient(cloudEventsClient, watcher) - workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} - workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} - factory := workinformers.NewSharedInformerFactoryWithOptions(workClientSet, b.informerResyncTime, b.informerOptions...) - informers := factory.Work().V1().ManifestWorks() - manifestWorkLister := informers.Lister() - namespacedLister := manifestWorkLister.ManifestWorks(b.clusterName) - - // Set informer lister back to work lister and client. - workLister.Lister = manifestWorkLister - // TODO the work client and informer share a same store in the current implementation, ideally, the store should be - // only written from the server. we may need to revisit the implementation in the future. - manifestWorkClient.SetLister(namespacedLister) - - cloudEventsClient.Subscribe(ctx, agenthandler.NewManifestWorkAgentHandler(namespacedLister, watcher)) - - go func() { - for { - select { - case <-ctx.Done(): - return - case <-cloudEventsClient.ReconnectedChan(): - // when receiving a client reconnected signal, we resync all sources for this agent - // TODO after supporting multiple sources, we should only resync agent known sources - if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { - klog.Errorf("failed to send resync request, %v", err) - } - } - } - }() - - return &ClientHolder{workClientSet: workClientSet}, informers, nil -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/lister.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/lister.go deleted file mode 100644 index 0911ee381..000000000 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/lister.go +++ /dev/null @@ -1,31 +0,0 @@ -package work - -import ( - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" - - workv1lister "open-cluster-management.io/api/client/work/listers/work/v1" - workv1 "open-cluster-management.io/api/work/v1" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" -) - -// ManifestWorkLister list the ManifestWorks from a ManifestWorkInformer's local cache. -type ManifestWorkLister struct { - Lister workv1lister.ManifestWorkLister -} - -// List returns the ManifestWorks from a ManifestWorkInformer's local cache. -func (l *ManifestWorkLister) List(options types.ListOptions) ([]*workv1.ManifestWork, error) { - selector := labels.Everything() - if options.Source != types.SourceAll { - req, err := labels.NewRequirement(common.CloudEventsOriginalSourceLabelKey, selection.Equals, []string{options.Source}) - if err != nil { - return nil, err - } - - selector = labels.NewSelector().Add(*req) - } - - return l.Lister.ManifestWorks(options.ClusterName).List(selector) -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifestbundle.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifestbundle.go index f7ca9fa98..2b744b425 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifestbundle.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifestbundle.go @@ -28,6 +28,11 @@ type ManifestBundle struct { // ManifestBundleStatus represents the data in a cloudevent, it contains the status of a ManifestBundle on a managed // cluster. type ManifestBundleStatus struct { + // ManifestBundle represents the specific of this status. + // This is an optional field, it can be used by a source work client without local cache to + // rebuild a whole work when received the work's status update. + ManifestBundle *ManifestBundle `json:"manifestBundle,omitempty"` + // Conditions contains the different condition statuses for a ManifestBundle on managed cluster. // Valid condition types are: // 1. Applied represents the manifests in a ManifestBundle are applied successfully on a managed cluster. diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec/manifestbundle.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec/manifestbundle.go index f1e5a6510..8ff7dc7de 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec/manifestbundle.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec/manifestbundle.go @@ -1,6 +1,7 @@ package codec import ( + "encoding/json" "fmt" "strconv" @@ -15,6 +16,9 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" ) +// ExtensionWorkMeta is an extension attribute for work meta data. +const ExtensionWorkMeta = "metadata" + // ManifestBundleCodec is a codec to encode/decode a ManifestWork/cloudevent with ManifestBundle for a source. type ManifestBundleCodec struct{} @@ -43,6 +47,14 @@ func (c *ManifestBundleCodec) Encode(source string, eventType types.CloudEventsT WithResourceID(string(work.UID)). WithResourceVersion(int64(resourceVersion)). NewEvent() + + // set the work's meta data to its cloud event + metaJson, err := json.Marshal(work.ObjectMeta) + if err != nil { + return nil, err + } + evt.SetExtension(ExtensionWorkMeta, string(metaJson)) + if !work.DeletionTimestamp.IsZero() { evt.SetExtension(types.ExtensionDeletionTimestamp, work.DeletionTimestamp.Time) return &evt, nil @@ -83,12 +95,28 @@ func (c *ManifestBundleCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWo return nil, fmt.Errorf("failed to get resourceversion extension: %v", err) } + metaObj := metav1.ObjectMeta{} + + // the agent sends the work meta data back, restore the meta to the received work, otherwise only set the + // UID and ResourceVersion to the received work, for the work's other meta data will be got from the work + // client local cache. + if workMetaExtension, ok := evtExtensions[ExtensionWorkMeta]; ok { + metaJson, err := cloudeventstypes.ToString(workMetaExtension) + if err != nil { + return nil, err + } + + if err := json.Unmarshal([]byte(metaJson), &metaObj); err != nil { + return nil, err + } + } + + metaObj.UID = kubetypes.UID(resourceID) + metaObj.ResourceVersion = fmt.Sprintf("%d", resourceVersion) + work := &workv1.ManifestWork{ - TypeMeta: metav1.TypeMeta{}, - ObjectMeta: metav1.ObjectMeta{ - UID: kubetypes.UID(resourceID), - ResourceVersion: fmt.Sprintf("%d", resourceVersion), - }, + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metaObj, } manifestStatus := &payload.ManifestBundleStatus{} @@ -96,6 +124,13 @@ func (c *ManifestBundleCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWo return nil, fmt.Errorf("failed to unmarshal event data %s, %v", string(evt.Data()), err) } + // the agent sends the work spec back, restore it + if manifestStatus.ManifestBundle != nil { + work.Spec.Workload.Manifests = manifestStatus.ManifestBundle.Manifests + work.Spec.DeleteOption = manifestStatus.ManifestBundle.DeleteOption + work.Spec.ManifestConfigs = manifestStatus.ManifestBundle.ManifestConfigs + } + work.Status = workv1.ManifestWorkStatus{ Conditions: manifestStatus.Conditions, ResourceStatus: workv1.ManifestResourceStatus{ diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go index d2d6fb3c4..f00f2f617 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go @@ -2,26 +2,31 @@ package store import ( "context" + "fmt" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) -// InformerWatcherStore extends the baseStore. +// SourceInformerWatcherStore extends the baseStore. // It gets/lists the works from the given informer store and send // the work add/update/delete event to the watch channel directly. -type InformerWatcherStore struct { +// +// It is used for building ManifestWork source client. +type SourceInformerWatcherStore struct { baseStore } -var _ watch.Interface = &LocalWatcherStore{} -var _ WorkClientWatcherStore = &InformerWatcherStore{} +var _ watch.Interface = &SourceInformerWatcherStore{} +var _ WorkClientWatcherStore = &SourceInformerWatcherStore{} -func NewInformerWatcherStore(ctx context.Context) *InformerWatcherStore { - s := &InformerWatcherStore{ +func NewSourceInformerWatcherStore(ctx context.Context) *SourceInformerWatcherStore { + s := &SourceInformerWatcherStore{ baseStore: baseStore{ result: make(chan watch.Event), done: make(chan struct{}), @@ -35,26 +40,109 @@ func NewInformerWatcherStore(ctx context.Context) *InformerWatcherStore { return s } -func (s *InformerWatcherStore) Add(work *workv1.ManifestWork) error { +func (s *SourceInformerWatcherStore) Add(work *workv1.ManifestWork) error { s.result <- watch.Event{Type: watch.Added, Object: work} return nil } -func (s *InformerWatcherStore) Update(work *workv1.ManifestWork) error { +func (s *SourceInformerWatcherStore) Update(work *workv1.ManifestWork) error { s.result <- watch.Event{Type: watch.Modified, Object: work} return nil } -func (s *InformerWatcherStore) Delete(work *workv1.ManifestWork) error { +func (s *SourceInformerWatcherStore) Delete(work *workv1.ManifestWork) error { s.result <- watch.Event{Type: watch.Deleted, Object: work} return nil } -func (s *InformerWatcherStore) HasInitiated() bool { +func (s *SourceInformerWatcherStore) HasInitiated() bool { return s.initiated } -func (s *InformerWatcherStore) SetStore(store cache.Store) { +func (s *SourceInformerWatcherStore) SetStore(store cache.Store) { + s.store = store + s.initiated = true +} + +// AgentInformerWatcherStore extends the baseStore. +// It gets/lists the works from the given informer store and send +// the work add/update/delete event to the watch channel directly. +// +// It is used for building ManifestWork agent client. +type AgentInformerWatcherStore struct { + baseStore +} + +var _ watch.Interface = &AgentInformerWatcherStore{} +var _ WorkClientWatcherStore = &AgentInformerWatcherStore{} + +func NewAgentInformerWatcherStore() *AgentInformerWatcherStore { + return &AgentInformerWatcherStore{ + baseStore: baseStore{ + result: make(chan watch.Event), + done: make(chan struct{}), + }, + } +} + +func (s *AgentInformerWatcherStore) Add(work *workv1.ManifestWork) error { + s.result <- watch.Event{Type: watch.Added, Object: work} + return nil +} + +func (s *AgentInformerWatcherStore) Update(work *workv1.ManifestWork) error { + s.result <- watch.Event{Type: watch.Modified, Object: work} + return nil +} + +func (s *AgentInformerWatcherStore) Delete(work *workv1.ManifestWork) error { + s.result <- watch.Event{Type: watch.Deleted, Object: work} + return nil +} + +func (s *AgentInformerWatcherStore) HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error { + switch action { + case types.Added: + return s.Add(work.DeepCopy()) + case types.Modified: + lastWork, err := s.Get(work.Namespace, work.Name) + if err != nil { + return err + } + + updatedWork := work.DeepCopy() + + // restore the fields that are maintained by local agent + updatedWork.Labels = lastWork.Labels + updatedWork.Annotations = lastWork.Annotations + updatedWork.Finalizers = lastWork.Finalizers + updatedWork.Status = lastWork.Status + + return s.Update(updatedWork) + case types.Deleted: + // the manifestwork is deleting on the source, we just update its deletion timestamp. + lastWork, err := s.Get(work.Namespace, work.Name) + if errors.IsNotFound(err) { + return nil + } + + if err != nil { + return err + } + + updatedWork := lastWork.DeepCopy() + updatedWork.DeletionTimestamp = work.DeletionTimestamp + return s.Update(updatedWork) + default: + return fmt.Errorf("unsupported resource action %s", action) + } +} + +func (s *AgentInformerWatcherStore) HasInitiated() bool { + return s.initiated +} + +func (s *AgentInformerWatcherStore) SetStore(store cache.Store) { s.store = store s.initiated = true } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go index 5e3542b36..17e6579a9 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go @@ -25,17 +25,19 @@ type watchEvent struct { Type watch.EventType } -var _ watch.Interface = &LocalWatcherStore{} -var _ WorkClientWatcherStore = &LocalWatcherStore{} +var _ watch.Interface = &SourceLocalWatcherStore{} +var _ WorkClientWatcherStore = &SourceLocalWatcherStore{} -// LocalWatcherStore caches the works in this local store and provide the watch ability by watch event channel. -type LocalWatcherStore struct { +// SourceLocalWatcherStore caches the works in this local store and provide the watch ability by watch event channel. +// +// It is used for building ManifestWork source client. +type SourceLocalWatcherStore struct { baseStore eventQueue cache.Queue } -// NewLocalWatcherStore returns a LocalWatcherStore with works that list by ListLocalWorksFunc -func NewLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc) (*LocalWatcherStore, error) { +// NewSourceLocalWatcherStore returns a LocalWatcherStore with works that list by ListLocalWorksFunc +func NewSourceLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc) (*SourceLocalWatcherStore, error) { works, err := listFunc(ctx) if err != nil { return nil, err @@ -53,7 +55,7 @@ func NewLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc) (*Lo } } - s := &LocalWatcherStore{ + s := &SourceLocalWatcherStore{ baseStore: baseStore{ // A channel for watcher, it's easy for a consumer to add buffering via an extra // goroutine/channel, but impossible for them to remove it, so nonbuffered is better. @@ -95,7 +97,7 @@ func NewLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc) (*Lo } // Add a work to the cache and send an event to the event queue -func (s *LocalWatcherStore) Add(work *workv1.ManifestWork) error { +func (s *SourceLocalWatcherStore) Add(work *workv1.ManifestWork) error { s.Lock() defer s.Unlock() @@ -107,7 +109,7 @@ func (s *LocalWatcherStore) Add(work *workv1.ManifestWork) error { } // Update a work in the cache and send an event to the event queue -func (s *LocalWatcherStore) Update(work *workv1.ManifestWork) error { +func (s *SourceLocalWatcherStore) Update(work *workv1.ManifestWork) error { s.Lock() defer s.Unlock() @@ -119,7 +121,7 @@ func (s *LocalWatcherStore) Update(work *workv1.ManifestWork) error { } // Delete a work from the cache and send an event to the event queue -func (s *LocalWatcherStore) Delete(work *workv1.ManifestWork) error { +func (s *SourceLocalWatcherStore) Delete(work *workv1.ManifestWork) error { s.Lock() defer s.Unlock() @@ -131,7 +133,7 @@ func (s *LocalWatcherStore) Delete(work *workv1.ManifestWork) error { } // processLoop drains the work event queue and send the event to the watch channel. -func (s *LocalWatcherStore) processLoop() { +func (s *SourceLocalWatcherStore) processLoop() { for { // this will be blocked until the event queue has events obj, err := s.eventQueue.Pop(func(interface{}, bool) error { @@ -200,7 +202,7 @@ func (s *LocalWatcherStore) processLoop() { } } -func (c *LocalWatcherStore) HasInitiated() bool { +func (c *SourceLocalWatcherStore) HasInitiated() bool { return c.initiated }