Skip to content

Commit

Permalink
upgrade sdk-go
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Jun 6, 2024
1 parent 0357cb9 commit 8e3d0cf
Show file tree
Hide file tree
Showing 27 changed files with 1,170 additions and 504 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
k8s.io/utils v0.0.0-20240310230437-4693a0247e57
open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556
open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba
open-cluster-management.io/sdk-go v0.13.1-0.20240520073308-f18d198a844d
open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523
sigs.k8s.io/controller-runtime v0.17.3
sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,8 @@ open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556
open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556/go.mod h1:HayKCznnlyW+0dUJQGj5sNR6i3tvylSySD3YnvZkBtY=
open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba h1:UsXnD4/N7pxYupPgoLvTq8wO73V72vD2D2ZkDd4iws0=
open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba/go.mod h1:yrNuMMpciXjXPnj2yznb6LTyrGliiTrFZAJDp/Ck3c4=
open-cluster-management.io/sdk-go v0.13.1-0.20240520073308-f18d198a844d h1:5lcrL1DsQdNtDQU6U2oXwLAN0EBczcvI421YNgEzL/4=
open-cluster-management.io/sdk-go v0.13.1-0.20240520073308-f18d198a844d/go.mod h1:XBrldz+AqVBy9miOVNIu+6l8JXS18i795XbTqIqURJU=
open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523 h1:XMtsnv0zT8UvXcH4JbqquzhK4BK/XrCg81pCmp50VDs=
open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y=
sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,18 @@ func NewManifestWorkReplicaSetController(
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) factory.Controller {

placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer,
cloudEventsUsed bool,
) factory.Controller {
controller := newController(
workClient, workApplier, manifestWorkReplicaSetInformer, manifestWorkInformer, placementInformer, placeDecisionInformer)
workClient,
workApplier,
manifestWorkReplicaSetInformer,
manifestWorkInformer,
placementInformer,
placeDecisionInformer,
cloudEventsUsed,
)

err := manifestWorkReplicaSetInformer.Informer().AddIndexers(
cache.Indexers{
Expand Down Expand Up @@ -114,12 +122,15 @@ func NewManifestWorkReplicaSetController(
WithSync(controller.sync).ToController("ManifestWorkReplicaSetController", recorder)
}

func newController(workClient workclientset.Interface,
func newController(
workClient workclientset.Interface,
workApplier *workapplier.WorkApplier,
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) *ManifestWorkReplicaSetController {
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer,
cloudEventsUsed bool,
) *ManifestWorkReplicaSetController {
return &ManifestWorkReplicaSetController{
workClient: workClient,
manifestWorkReplicaSetLister: manifestWorkReplicaSetInformer.Lister(),
Expand All @@ -139,6 +150,7 @@ func newController(workClient workclientset.Interface,
manifestWorkLister: manifestWorkInformer.Lister(),
placementLister: placementInformer.Lister(),
placeDecisionLister: placeDecisionInformer.Lister(),
cloudEventsUsed: cloudEventsUsed,
},
&statusReconciler{manifestWorkLister: manifestWorkInformer.Lister()},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func TestManifestWorkReplicaSetControllerPatchStatus(t *testing.T) {
workInformers.Work().V1().ManifestWorks(),
clusterInformers.Cluster().V1beta1().Placements(),
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
false,
)

controllerContext := testingcommon.NewFakeSyncContext(t, c.mwrSet.Namespace+"/"+c.mwrSet.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type deployReconciler struct {
manifestWorkLister worklisterv1.ManifestWorkLister
placeDecisionLister clusterlister.PlacementDecisionLister
placementLister clusterlister.PlacementLister
cloudEventsUsed bool
}

func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha1.ManifestWorkReplicaSet,
Expand Down Expand Up @@ -115,6 +116,19 @@ func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha
continue
}

if d.cloudEventsUsed {
// Add cloudevents annotations to support cloudevents integration.
mw.Annotations = map[string]string{
// CloudEventsDataTypeAnnotationKey tells the cloudevents work client using ManifestBundle
// as its payload format
common.CloudEventsDataTypeAnnotationKey: payload.ManifestBundleEventDataType.String(),
// CloudEventsResourceVersionAnnotationKey is used by cloudevents work client for tracing the
// ManifestWork specific changes, we use the generation of the ManifestWorkReplicaset to trace
// its ManifestWorks specific changes here.
common.CloudEventsResourceVersionAnnotationKey: fmt.Sprintf("%d", mwrSet.Generation),
}
}

_, err = d.workApplier.Apply(ctx, mw)
if err != nil {
fmt.Printf("err is %v\n", err)
Expand Down Expand Up @@ -264,7 +278,11 @@ func getCondition(conditionType string, reason string, message string, status me
}
}

func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string, placementRefName string) (*workv1.ManifestWork, error) {
func CreateManifestWork(
mwrSet *workapiv1alpha1.ManifestWorkReplicaSet,
clusterNS string,
placementRefName string,
) (*workv1.ManifestWork, error) {
if clusterNS == "" {
return nil, fmt.Errorf("invalid cluster namespace")
}
Expand All @@ -277,12 +295,9 @@ func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterN
ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName,
},
Annotations: map[string]string{
common.CloudEventsDataTypeAnnotationKey: payload.ManifestBundleEventDataType.String(),
common.CloudEventsGenerationAnnotationKey: fmt.Sprintf("%d", mwrSet.Generation),
},
},
Spec: mwrSet.Spec.ManifestWorkTemplate}, nil
Spec: mwrSet.Spec.ManifestWorkTemplate,
}, nil
}

func getAvailableDecisionGroupProgressMessage(groupNum int, existingClsCount int, totalCls int32) string {
Expand Down
97 changes: 71 additions & 26 deletions pkg/work/hub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import (

"github.com/openshift/library-go/pkg/controller/controllercmd"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"

clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workv1informer "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"

"open-cluster-management.io/ocm/pkg/work/hub/controllers/manifestworkreplicasetcontroller"
)
Expand Down Expand Up @@ -48,17 +51,6 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
return err
}

// To support sending ManifestWorks to different drivers (like the Kubernetes apiserver or MQTT broker), we build
// ManifestWork client that implements the ManifestWorkInterface and ManifestWork informer based on different
// driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
_, config, err := generic.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig).
WithKubeConfig(controllerContext.KubeConfig).
LoadConfig()
if err != nil {
return err
}

// we need a separated filtered manifestwork informers so we only watch the manifestworks that manifestworkreplicaset cares.
// This could reduce a lot of memory consumptions
workInformOption := workinformers.WithTweakListOptions(
Expand All @@ -75,44 +67,97 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
},
)

clientHolder, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(c.workOptions.CloudEventsClientID).
WithSourceID(sourceID).
WithInformerConfig(30*time.Minute, workInformOption).
WithCodecs(codec.NewManifestBundleCodec()).
NewSourceClientHolder(ctx)
if err != nil {
return err
cloudEventsUsed := false

var workClient workclientset.Interface
var watcherStore *store.InformerWatcherStore

if c.workOptions.WorkDriver == "kube" {
config := controllerContext.KubeConfig
if c.workOptions.WorkDriverConfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", c.workOptions.WorkDriverConfig)
if err != nil {
return err
}
}

workClient, err = workclientset.NewForConfig(config)
if err != nil {
return err
}
} else {
// For cloudevents drivers, we build ManifestWork client that implements the
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.

watcherStore = store.NewInformerWatcherStore(ctx)

_, config, err := generic.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig).
LoadConfig()
if err != nil {
return err
}

clientHolder, err := work.NewClientHolderBuilder(config).
WithClientID(c.workOptions.CloudEventsClientID).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewSourceClientHolder(ctx)
if err != nil {
return err
}

workClient = clientHolder.WorkInterface()
cloudEventsUsed = true
}

factory := workinformers.NewSharedInformerFactoryWithOptions(workClient, 30*time.Minute, workInformOption)
informer := factory.Work().V1().ManifestWorks()

// For cloudevents work client, we use the informer store as the client store
if cloudEventsUsed {
watcherStore.SetStore(informer.Informer().GetStore())
}

return RunControllerManagerWithInformers(ctx, controllerContext, replicaSetsClient, clientHolder, clusterInformerFactory)
return RunControllerManagerWithInformers(
ctx,
controllerContext,
replicaSetsClient,
workClient,
informer,
clusterInformerFactory,
cloudEventsUsed,
)
}

func RunControllerManagerWithInformers(
ctx context.Context,
controllerContext *controllercmd.ControllerContext,
replicaSetClient workclientset.Interface,
hubWorkClientHolder *cloudeventswork.ClientHolder,
workClient workclientset.Interface,
workInformer workv1informer.ManifestWorkInformer,
clusterInformers clusterinformers.SharedInformerFactory,
cloudEventsUsed bool,
) error {
replicaSetInformerFactory := workinformers.NewSharedInformerFactory(replicaSetClient, 30*time.Minute)
hubWorkInformer := hubWorkClientHolder.ManifestWorkInformer()

manifestWorkReplicaSetController := manifestworkreplicasetcontroller.NewManifestWorkReplicaSetController(
controllerContext.EventRecorder,
replicaSetClient,
workapplier.NewWorkApplierWithTypedClient(hubWorkClientHolder.WorkInterface(), hubWorkInformer.Lister()),
workapplier.NewWorkApplierWithTypedClient(workClient, workInformer.Lister()),
replicaSetInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets(),
hubWorkInformer,
workInformer,
clusterInformers.Cluster().V1beta1().Placements(),
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
cloudEventsUsed,
)

go clusterInformers.Start(ctx.Done())
go replicaSetInformerFactory.Start(ctx.Done())
go manifestWorkReplicaSetController.Run(ctx, 5)

go hubWorkInformer.Informer().Run(ctx.Done())
go workInformer.Informer().Run(ctx.Done())

<-ctx.Done()
return nil
Expand Down
4 changes: 1 addition & 3 deletions pkg/work/spoke/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"time"

"github.com/spf13/pflag"

cloudeventsconstants "open-cluster-management.io/sdk-go/pkg/cloudevents/constants"
)

const (
Expand All @@ -30,7 +28,7 @@ func NewWorkloadAgentOptions() *WorkloadAgentOptions {
MaxJSONRawLength: 1024,
StatusSyncInterval: 10 * time.Second,
AppliedManifestWorkEvictionGracePeriod: 60 * time.Minute,
WorkloadSourceDriver: cloudeventsconstants.ConfigTypeKube,
WorkloadSourceDriver: "kube",
WorkloadSourceConfig: "/spoke/hub-kubeconfig/kubeconfig",
}
}
Expand Down
70 changes: 51 additions & 19 deletions pkg/work/spoke/spokeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
ocmfeature "open-cluster-management.io/api/feature"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
Expand Down Expand Up @@ -91,22 +94,7 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
return err
}

// To support consuming ManifestWorks from different drivers (like the Kubernetes apiserver or MQTT broker), we build
// ManifestWork client that implements the ManifestWorkInterface and ManifestWork informer based on different
// driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
hubHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig).
LoadConfig()
if err != nil {
return err
}

clientHolder, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(o.workOptions.CloudEventsClientID).
WithInformerConfig(5*time.Minute, workinformers.WithNamespace(o.agentOptions.SpokeClusterName)).
WithClusterName(o.agentOptions.SpokeClusterName).
WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...).
NewAgentClientHolder(ctx)
hubHost, hubWorkClient, hubWorkInformer, err := o.newHubWorkClientAndInformer(ctx, restMapper)
if err != nil {
return err
}
Expand All @@ -117,9 +105,6 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
agentID = hubHash
}

hubWorkClient := clientHolder.ManifestWorks(o.agentOptions.SpokeClusterName)
hubWorkInformer := clientHolder.ManifestWorkInformer()

// create controllers
validator := auth.NewFactory(
spokeRestConfig,
Expand Down Expand Up @@ -223,3 +208,50 @@ func buildCodecs(codecNames []string, restMapper meta.RESTMapper) []generic.Code
}
return codecs
}

func (o *WorkAgentConfig) newHubWorkClientAndInformer(
ctx context.Context,
restMapper meta.RESTMapper,
) (string, workv1client.ManifestWorkInterface, workv1informers.ManifestWorkInformer, error) {
if o.workOptions.WorkloadSourceDriver == "kube" {
config, err := clientcmd.BuildConfigFromFlags("", o.workOptions.WorkloadSourceConfig)
if err != nil {
return "", nil, nil, err
}

kubeWorkClientSet, err := workclientset.NewForConfig(config)
if err != nil {
return "", nil, nil, err
}

factory := workinformers.NewSharedInformerFactoryWithOptions(
kubeWorkClientSet,
5*time.Minute,
workinformers.WithNamespace(o.agentOptions.SpokeClusterName),
)
informer := factory.Work().V1().ManifestWorks()

return config.Host, kubeWorkClientSet.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName), informer, nil
}

// For cloudevents drivers, we build ManifestWork client that implements the
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
hubHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig).
LoadConfig()
if err != nil {
return "", nil, nil, err
}

clientHolder, informer, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(o.workOptions.CloudEventsClientID).
WithInformerConfig(5*time.Minute, workinformers.WithNamespace(o.agentOptions.SpokeClusterName)).
WithClusterName(o.agentOptions.SpokeClusterName).
WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...).
NewAgentClientHolderWithInformer(ctx)
if err != nil {
return "", nil, nil, err
}

return hubHost, clientHolder.ManifestWorks(o.agentOptions.SpokeClusterName), informer, err
}
Loading

0 comments on commit 8e3d0cf

Please sign in to comment.