Skip to content

Commit

Permalink
supporting cloudevents for work agent
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Nov 28, 2023
1 parent 8a4c834 commit e6815ab
Show file tree
Hide file tree
Showing 162 changed files with 17,935 additions and 42 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ require (
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995 // indirect
github.com/cloudevents/sdk-go/v2 v2.14.0 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/eclipse/paho.golang v0.11.0 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995 h1:pXyRKZ0T5WoB6X9QnHS5cEyW0Got39bNQIECxGUKVO4=
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995/go.mod h1:mz9oS2Yhh/S7cvrrsgGMMR+6Shy0ZyL2lDN1sHQO1wE=
github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s=
github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
Expand All @@ -87,6 +91,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/eclipse/paho.golang v0.11.0 h1:6Avu5dkkCfcB61/y1vx+XrPQ0oAl4TPYtY0uw3HbQdM=
github.com/eclipse/paho.golang v0.11.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs=
github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE=
github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -205,6 +211,7 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
Expand Down Expand Up @@ -481,6 +488,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
Expand Down
5 changes: 5 additions & 0 deletions pkg/work/spoke/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ import (
"time"

"github.com/spf13/pflag"
"open-cluster-management.io/api/cloudevents/generic/options/mqtt"
)

// WorkloadAgentOptions defines the flags for workload agent
type WorkloadAgentOptions struct {
StatusSyncInterval time.Duration
AppliedManifestWorkEvictionGracePeriod time.Duration
MQTTOptions *mqtt.MQTTOptions
}

// NewWorkloadAgentOptions returns the flags with default value set
func NewWorkloadAgentOptions() *WorkloadAgentOptions {
return &WorkloadAgentOptions{
StatusSyncInterval: 10 * time.Second,
AppliedManifestWorkEvictionGracePeriod: 60 * time.Minute,
MQTTOptions: mqtt.NewMQTTOptions(),
}
}

Expand All @@ -25,4 +28,6 @@ func (o *WorkloadAgentOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.StatusSyncInterval, "status-sync-interval", o.StatusSyncInterval, "Interval to sync resource status to hub.")
fs.DurationVar(&o.AppliedManifestWorkEvictionGracePeriod, "appliedmanifestwork-eviction-grace-period",
o.AppliedManifestWorkEvictionGracePeriod, "Grace period for appliedmanifestwork eviction")

o.MQTTOptions.AddFlags(fs)
}
130 changes: 88 additions & 42 deletions pkg/work/spoke/spokeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package spoke

import (
"context"
"fmt"
"time"

"github.com/openshift/library-go/pkg/controller/controllercmd"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -14,6 +16,8 @@ import (

workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
cloudeventswork "open-cluster-management.io/api/cloudevents/work"
"open-cluster-management.io/api/cloudevents/work/agent/codec"
ocmfeature "open-cluster-management.io/api/feature"

commonoptions "open-cluster-management.io/ocm/pkg/common/options"
Expand Down Expand Up @@ -53,26 +57,6 @@ func NewWorkAgentConfig(commonOpts *commonoptions.AgentOptions, opts *WorkloadAg

// RunWorkloadAgent starts the controllers on agent to process work from hub.
func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
// build hub client and informer
hubRestConfig, err := clientcmd.BuildConfigFromFlags("" /* leave masterurl as empty */, o.agentOptions.HubKubeconfigFile)
if err != nil {
return err
}
hubhash := helper.HubHash(hubRestConfig.Host)

agentID := o.agentOptions.AgentID
if len(agentID) == 0 {
agentID = hubhash
}

hubWorkClient, err := workclientset.NewForConfig(hubRestConfig)
if err != nil {
return err
}
// Only watch the cluster namespace on hub
workInformerFactory := workinformers.NewSharedInformerFactoryWithOptions(hubWorkClient, 5*time.Minute,
workinformers.WithNamespace(o.agentOptions.SpokeClusterName))

// load spoke client config and create spoke clients,
// the work agent may not running in the spoke/managed cluster.
spokeRestConfig, err := o.agentOptions.SpokeKubeConfig(controllerContext.KubeConfig)
Expand Down Expand Up @@ -107,10 +91,20 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
return err
}

// build hub client and informer
clientHolder, hubHash, agentID, err := o.buildHubClientHolder(ctx, o.agentOptions.SpokeClusterName, restMapper)
if err != nil {
return err
}

hubWorkClient := clientHolder.ManifestWorks(o.agentOptions.SpokeClusterName)
hubWorkInformer := clientHolder.ManifestWorkInformer()

// create controllers
validator := auth.NewFactory(
spokeRestConfig,
spokeKubeClient,
workInformerFactory.Work().V1().ManifestWorks(),
hubWorkInformer,
o.agentOptions.SpokeClusterName,
controllerContext.EventRecorder,
restMapper,
Expand All @@ -121,20 +115,20 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
spokeDynamicClient,
spokeKubeClient,
spokeAPIExtensionClient,
hubWorkClient.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName),
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
hubWorkClient,
hubWorkInformer,
hubWorkInformer.Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
spokeWorkClient.WorkV1().AppliedManifestWorks(),
spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(),
hubhash, agentID,
hubHash, agentID,
restMapper,
validator,
)
addFinalizerController := finalizercontroller.NewAddFinalizerController(
controllerContext.EventRecorder,
hubWorkClient.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName),
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
hubWorkClient,
hubWorkInformer,
hubWorkInformer.Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
)
appliedManifestWorkFinalizeController := finalizercontroller.NewAppliedManifestWorkFinalizeController(
controllerContext.EventRecorder,
Expand All @@ -145,49 +139,101 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
)
manifestWorkFinalizeController := finalizercontroller.NewManifestWorkFinalizeController(
controllerContext.EventRecorder,
hubWorkClient.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName),
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
hubWorkClient,
hubWorkInformer,
hubWorkInformer.Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
spokeWorkClient.WorkV1().AppliedManifestWorks(),
spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(),
hubhash,
hubHash,
)
unmanagedAppliedManifestWorkController := finalizercontroller.NewUnManagedAppliedWorkController(
controllerContext.EventRecorder,
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
hubWorkInformer,
hubWorkInformer.Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
spokeWorkClient.WorkV1().AppliedManifestWorks(),
spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(),
o.workOptions.AppliedManifestWorkEvictionGracePeriod,
hubhash, agentID,
hubHash, agentID,
)
appliedManifestWorkController := appliedmanifestcontroller.NewAppliedManifestWorkController(
controllerContext.EventRecorder,
spokeDynamicClient,
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
hubWorkInformer,
hubWorkInformer.Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
spokeWorkClient.WorkV1().AppliedManifestWorks(),
spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(),
hubhash,
hubHash,
)
availableStatusController := statuscontroller.NewAvailableStatusController(
controllerContext.EventRecorder,
spokeDynamicClient,
hubWorkClient.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName),
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
hubWorkClient,
hubWorkInformer,
hubWorkInformer.Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
o.workOptions.StatusSyncInterval,
)

go workInformerFactory.Start(ctx.Done())
go spokeWorkInformerFactory.Start(ctx.Done())
go hubWorkInformer.Informer().Run(ctx.Done())

go addFinalizerController.Run(ctx, 1)
go appliedManifestWorkFinalizeController.Run(ctx, appliedManifestWorkFinalizeControllerWorkers)
go unmanagedAppliedManifestWorkController.Run(ctx, 1)
go appliedManifestWorkController.Run(ctx, 1)
go manifestWorkController.Run(ctx, 1)
go manifestWorkFinalizeController.Run(ctx, manifestWorkFinalizeControllerWorkers)
go availableStatusController.Run(ctx, availableStatusControllerWorkers)

<-ctx.Done()

return nil
}

func (o *WorkAgentConfig) buildHubClientHolder(ctx context.Context,
clusterName string, restMapper meta.RESTMapper) (*cloudeventswork.ClientHolder, string, string, error) {
if o.agentOptions.HubKubeconfigFile != "" && o.workOptions.MQTTOptions.BrokerHost != "" {
return nil, "", "", fmt.Errorf("the hub kubeconfig and MQTT broker cannot be specified at the same time")
}

agentID := o.agentOptions.AgentID
if o.agentOptions.HubKubeconfigFile != "" {
hubRestConfig, err := clientcmd.BuildConfigFromFlags("", o.agentOptions.HubKubeconfigFile)
if err != nil {
return nil, "", "", err
}

hubHash := helper.HubHash(hubRestConfig.Host)
if len(agentID) == 0 {
agentID = hubHash
}

// Only watch the cluster namespace on hub
clientHolder, err := cloudeventswork.NewClientHolderBuilder(agentID, hubRestConfig).
WithInformerConfig(5*time.Minute, workinformers.WithNamespace(o.agentOptions.SpokeClusterName)).
NewClientHolder(ctx)
if err != nil {
return nil, "", "", err
}

return clientHolder, hubHash, agentID, nil
}

if o.workOptions.MQTTOptions.BrokerHost != "" {
hubHash := helper.HubHash(o.workOptions.MQTTOptions.BrokerHost)
if len(agentID) == 0 {
agentID = fmt.Sprintf("%s-work-agent", clusterName)
}

clientHolder, err := cloudeventswork.NewClientHolderBuilder(agentID, o.workOptions.MQTTOptions).
WithClusterName(clusterName).
WithCodecs(codec.NewManifestCodec(restMapper)).
NewClientHolder(ctx)
if err != nil {
return nil, "", "", err
}

return clientHolder, hubHash, agentID, nil
}

return nil, "", "", fmt.Errorf("the hub kubeconfig or MQTT broker is not specified")
}
Loading

0 comments on commit e6815ab

Please sign in to comment.