Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey authored and haoqing0110 committed Jun 13, 2024
1 parent e3b484d commit 9edfa8b
Show file tree
Hide file tree
Showing 32 changed files with 1,207 additions and 524 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ require (
k8s.io/kube-aggregator v0.29.3
k8s.io/utils v0.0.0-20240310230437-4693a0247e57
open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556
open-cluster-management.io/api v0.13.1-0.20240506072237-800b00d9f0db
open-cluster-management.io/sdk-go v0.13.1-0.20240520073308-f18d198a844d
open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba
open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523
sigs.k8s.io/controller-runtime v0.17.3
sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96
)
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,10 @@ k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0g
k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556 h1:X3vJEx9agC94l7SitpWZFDshISdL1niqVH0+diyqfJo=
open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556/go.mod h1:HayKCznnlyW+0dUJQGj5sNR6i3tvylSySD3YnvZkBtY=
open-cluster-management.io/api v0.13.1-0.20240506072237-800b00d9f0db h1:puVfabidvMj0phg34e5PqAmC0jzFiVN5LCNlZIEk+CA=
open-cluster-management.io/api v0.13.1-0.20240506072237-800b00d9f0db/go.mod h1:yrNuMMpciXjXPnj2yznb6LTyrGliiTrFZAJDp/Ck3c4=
open-cluster-management.io/sdk-go v0.13.1-0.20240520073308-f18d198a844d h1:5lcrL1DsQdNtDQU6U2oXwLAN0EBczcvI421YNgEzL/4=
open-cluster-management.io/sdk-go v0.13.1-0.20240520073308-f18d198a844d/go.mod h1:XBrldz+AqVBy9miOVNIu+6l8JXS18i795XbTqIqURJU=
open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba h1:UsXnD4/N7pxYupPgoLvTq8wO73V72vD2D2ZkDd4iws0=
open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba/go.mod h1:yrNuMMpciXjXPnj2yznb6LTyrGliiTrFZAJDp/Ck3c4=
open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523 h1:XMtsnv0zT8UvXcH4JbqquzhK4BK/XrCg81pCmp50VDs=
open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y=
sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,16 @@ func NewManifestWorkReplicaSetController(
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) factory.Controller {

placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer,
) factory.Controller {
controller := newController(
workClient, workApplier, manifestWorkReplicaSetInformer, manifestWorkInformer, placementInformer, placeDecisionInformer)
workClient,
workApplier,
manifestWorkReplicaSetInformer,
manifestWorkInformer,
placementInformer,
placeDecisionInformer,
)

err := manifestWorkReplicaSetInformer.Informer().AddIndexers(
cache.Indexers{
Expand Down Expand Up @@ -114,12 +120,14 @@ func NewManifestWorkReplicaSetController(
WithSync(controller.sync).ToController("ManifestWorkReplicaSetController", recorder)
}

func newController(workClient workclientset.Interface,
func newController(
workClient workclientset.Interface,
workApplier *workapplier.WorkApplier,
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) *ManifestWorkReplicaSetController {
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer,
) *ManifestWorkReplicaSetController {
return &ManifestWorkReplicaSetController{
workClient: workClient,
manifestWorkReplicaSetLister: manifestWorkReplicaSetInformer.Lister(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
clustersdkv1alpha1 "open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"

"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/work/helper"
Expand Down Expand Up @@ -264,11 +262,17 @@ func getCondition(conditionType string, reason string, message string, status me
}
}

func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string, placementRefName string) (*workv1.ManifestWork, error) {
func CreateManifestWork(
mwrSet *workapiv1alpha1.ManifestWorkReplicaSet,
clusterNS string,
placementRefName string,
) (*workv1.ManifestWork, error) {
if clusterNS == "" {
return nil, fmt.Errorf("invalid cluster namespace")
}

// TODO consider how to trace the manifestworks spec changes for cloudevents work client

return &workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: mwrSet.Name,
Expand All @@ -277,12 +281,9 @@ func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterN
ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName,
},
Annotations: map[string]string{
common.CloudEventsDataTypeAnnotationKey: payload.ManifestBundleEventDataType.String(),
common.CloudEventsGenerationAnnotationKey: fmt.Sprintf("%d", mwrSet.Generation),
},
},
Spec: mwrSet.Spec.ManifestWorkTemplate}, nil
Spec: mwrSet.Spec.ManifestWorkTemplate,
}, nil
}

func getAvailableDecisionGroupProgressMessage(groupNum int, existingClsCount int, totalCls int32) string {
Expand Down
91 changes: 65 additions & 26 deletions pkg/work/hub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import (

"github.com/openshift/library-go/pkg/controller/controllercmd"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"

clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workv1informer "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"

"open-cluster-management.io/ocm/pkg/work/hub/controllers/manifestworkreplicasetcontroller"
)
Expand Down Expand Up @@ -48,17 +51,6 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
return err
}

// To support sending ManifestWorks to different drivers (like the Kubernetes apiserver or MQTT broker), we build
// ManifestWork client that implements the ManifestWorkInterface and ManifestWork informer based on different
// driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
_, config, err := generic.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig).
WithKubeConfig(controllerContext.KubeConfig).
LoadConfig()
if err != nil {
return err
}

// we need a separated filtered manifestwork informers so we only watch the manifestworks that manifestworkreplicaset cares.
// This could reduce a lot of memory consumptions
workInformOption := workinformers.WithTweakListOptions(
Expand All @@ -75,35 +67,82 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
},
)

clientHolder, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(c.workOptions.CloudEventsClientID).
WithSourceID(sourceID).
WithInformerConfig(30*time.Minute, workInformOption).
WithCodecs(codec.NewManifestBundleCodec()).
NewSourceClientHolder(ctx)
if err != nil {
return err
var workClient workclientset.Interface
var watcherStore *store.InformerWatcherStore

if c.workOptions.WorkDriver == "kube" {
config := controllerContext.KubeConfig
if c.workOptions.WorkDriverConfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", c.workOptions.WorkDriverConfig)
if err != nil {
return err
}
}

workClient, err = workclientset.NewForConfig(config)
if err != nil {
return err
}
} else {
// For cloudevents drivers, we build ManifestWork client that implements the
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.

watcherStore = store.NewInformerWatcherStore(ctx)

_, config, err := generic.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig).
LoadConfig()
if err != nil {
return err
}

clientHolder, err := work.NewClientHolderBuilder(config).
WithClientID(c.workOptions.CloudEventsClientID).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewSourceClientHolder(ctx)
if err != nil {
return err
}

workClient = clientHolder.WorkInterface()
}

factory := workinformers.NewSharedInformerFactoryWithOptions(workClient, 30*time.Minute, workInformOption)
informer := factory.Work().V1().ManifestWorks()

// For cloudevents work client, we use the informer store as the client store
if watcherStore != nil {
watcherStore.SetStore(informer.Informer().GetStore())
}

return RunControllerManagerWithInformers(ctx, controllerContext, replicaSetsClient, clientHolder, clusterInformerFactory)
return RunControllerManagerWithInformers(
ctx,
controllerContext,
replicaSetsClient,
workClient,
informer,
clusterInformerFactory,
)
}

func RunControllerManagerWithInformers(
ctx context.Context,
controllerContext *controllercmd.ControllerContext,
replicaSetClient workclientset.Interface,
hubWorkClientHolder *cloudeventswork.ClientHolder,
workClient workclientset.Interface,
workInformer workv1informer.ManifestWorkInformer,
clusterInformers clusterinformers.SharedInformerFactory,
) error {
replicaSetInformerFactory := workinformers.NewSharedInformerFactory(replicaSetClient, 30*time.Minute)
hubWorkInformer := hubWorkClientHolder.ManifestWorkInformer()

manifestWorkReplicaSetController := manifestworkreplicasetcontroller.NewManifestWorkReplicaSetController(
controllerContext.EventRecorder,
replicaSetClient,
workapplier.NewWorkApplierWithTypedClient(hubWorkClientHolder.WorkInterface(), hubWorkInformer.Lister()),
workapplier.NewWorkApplierWithTypedClient(workClient, workInformer.Lister()),
replicaSetInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets(),
hubWorkInformer,
workInformer,
clusterInformers.Cluster().V1beta1().Placements(),
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
)
Expand All @@ -112,7 +151,7 @@ func RunControllerManagerWithInformers(
go replicaSetInformerFactory.Start(ctx.Done())
go manifestWorkReplicaSetController.Run(ctx, 5)

go hubWorkInformer.Informer().Run(ctx.Done())
go workInformer.Informer().Run(ctx.Done())

<-ctx.Done()
return nil
Expand Down
4 changes: 1 addition & 3 deletions pkg/work/spoke/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"time"

"github.com/spf13/pflag"

cloudeventsconstants "open-cluster-management.io/sdk-go/pkg/cloudevents/constants"
)

const (
Expand All @@ -30,7 +28,7 @@ func NewWorkloadAgentOptions() *WorkloadAgentOptions {
MaxJSONRawLength: 1024,
StatusSyncInterval: 10 * time.Second,
AppliedManifestWorkEvictionGracePeriod: 60 * time.Minute,
WorkloadSourceDriver: cloudeventsconstants.ConfigTypeKube,
WorkloadSourceDriver: "kube",
WorkloadSourceConfig: "/spoke/hub-kubeconfig/kubeconfig",
}
}
Expand Down
70 changes: 51 additions & 19 deletions pkg/work/spoke/spokeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
ocmfeature "open-cluster-management.io/api/feature"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
Expand Down Expand Up @@ -91,22 +94,7 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
return err
}

// To support consuming ManifestWorks from different drivers (like the Kubernetes apiserver or MQTT broker), we build
// ManifestWork client that implements the ManifestWorkInterface and ManifestWork informer based on different
// driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
hubHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig).
LoadConfig()
if err != nil {
return err
}

clientHolder, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(o.workOptions.CloudEventsClientID).
WithInformerConfig(5*time.Minute, workinformers.WithNamespace(o.agentOptions.SpokeClusterName)).
WithClusterName(o.agentOptions.SpokeClusterName).
WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...).
NewAgentClientHolder(ctx)
hubHost, hubWorkClient, hubWorkInformer, err := o.newHubWorkClientAndInformer(ctx, restMapper)
if err != nil {
return err
}
Expand All @@ -117,9 +105,6 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
agentID = hubHash
}

hubWorkClient := clientHolder.ManifestWorks(o.agentOptions.SpokeClusterName)
hubWorkInformer := clientHolder.ManifestWorkInformer()

// create controllers
validator := auth.NewFactory(
spokeRestConfig,
Expand Down Expand Up @@ -223,3 +208,50 @@ func buildCodecs(codecNames []string, restMapper meta.RESTMapper) []generic.Code
}
return codecs
}

func (o *WorkAgentConfig) newHubWorkClientAndInformer(
ctx context.Context,
restMapper meta.RESTMapper,
) (string, workv1client.ManifestWorkInterface, workv1informers.ManifestWorkInformer, error) {
if o.workOptions.WorkloadSourceDriver == "kube" {
config, err := clientcmd.BuildConfigFromFlags("", o.workOptions.WorkloadSourceConfig)
if err != nil {
return "", nil, nil, err
}

kubeWorkClientSet, err := workclientset.NewForConfig(config)
if err != nil {
return "", nil, nil, err
}

factory := workinformers.NewSharedInformerFactoryWithOptions(
kubeWorkClientSet,
5*time.Minute,
workinformers.WithNamespace(o.agentOptions.SpokeClusterName),
)
informer := factory.Work().V1().ManifestWorks()

return config.Host, kubeWorkClientSet.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName), informer, nil
}

// For cloudevents drivers, we build ManifestWork client that implements the
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
hubHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig).
LoadConfig()
if err != nil {
return "", nil, nil, err
}

clientHolder, informer, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(o.workOptions.CloudEventsClientID).
WithInformerConfig(5*time.Minute, workinformers.WithNamespace(o.agentOptions.SpokeClusterName)).
WithClusterName(o.agentOptions.SpokeClusterName).
WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...).
NewAgentClientHolderWithInformer(ctx)
if err != nil {
return "", nil, nil, err
}

return hubHost, clientHolder.ManifestWorks(o.agentOptions.SpokeClusterName), informer, err
}
7 changes: 4 additions & 3 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,7 @@ open-cluster-management.io/addon-framework/pkg/basecontroller/events
open-cluster-management.io/addon-framework/pkg/basecontroller/factory
open-cluster-management.io/addon-framework/pkg/index
open-cluster-management.io/addon-framework/pkg/utils
# open-cluster-management.io/api v0.13.1-0.20240506072237-800b00d9f0db
# open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba
## explicit; go 1.21
open-cluster-management.io/api/addon/v1alpha1
open-cluster-management.io/api/client/addon/clientset/versioned
Expand Down Expand Up @@ -1589,7 +1589,7 @@ open-cluster-management.io/api/utils/work/v1/workapplier
open-cluster-management.io/api/utils/work/v1/workvalidator
open-cluster-management.io/api/work/v1
open-cluster-management.io/api/work/v1alpha1
# open-cluster-management.io/sdk-go v0.13.1-0.20240520073308-f18d198a844d
# open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523
## explicit; go 1.21
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1
Expand Down Expand Up @@ -1617,7 +1617,8 @@ open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal
open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload
open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client
open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec
open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/handler
open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister
open-cluster-management.io/sdk-go/pkg/cloudevents/work/store
open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils
open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher
open-cluster-management.io/sdk-go/pkg/patcher
Expand Down
Loading

0 comments on commit 9edfa8b

Please sign in to comment.