Skip to content

Commit

Permalink
upgrade sdk
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Jun 14, 2024
1 parent 462992f commit 391f32a
Show file tree
Hide file tree
Showing 19 changed files with 787 additions and 309 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
k8s.io/utils v0.0.0-20240310230437-4693a0247e57
open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc
open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523
open-cluster-management.io/sdk-go v0.13.1-0.20240614070053-a01091a14da7
sigs.k8s.io/controller-runtime v0.17.3
sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,8 @@ open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556
open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556/go.mod h1:HayKCznnlyW+0dUJQGj5sNR6i3tvylSySD3YnvZkBtY=
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc h1:tcfncubZRFphYtDXBE7ApBNlSnj1RNazhW+8F01XYYg=
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc/go.mod h1:ltijKJhDifrPH0csvCUmFt5lzaERv+BBfh6X3l83rT0=
open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523 h1:XMtsnv0zT8UvXcH4JbqquzhK4BK/XrCg81pCmp50VDs=
open-cluster-management.io/sdk-go v0.13.1-0.20240605055850-874c2c7ac523/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
open-cluster-management.io/sdk-go v0.13.1-0.20240614070053-a01091a14da7 h1:/Tit/ldsK/+gwYpljBPzOGpFwdN44+yIOiHO+kja5XU=
open-cluster-management.io/sdk-go v0.13.1-0.20240614070053-a01091a14da7/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y=
sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk=
Expand Down
4 changes: 2 additions & 2 deletions pkg/work/hub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
)

var workClient workclientset.Interface
var watcherStore *store.InformerWatcherStore
var watcherStore *store.SourceInformerWatcherStore

if c.workOptions.WorkDriver == "kube" {
config := controllerContext.KubeConfig
Expand All @@ -88,7 +88,7 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.

watcherStore = store.NewInformerWatcherStore(ctx)
watcherStore = store.NewSourceInformerWatcherStore(ctx)

_, config, err := generic.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig).
LoadConfig()
Expand Down
66 changes: 41 additions & 25 deletions pkg/work/spoke/spokeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"

"open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/features"
Expand Down Expand Up @@ -213,45 +214,60 @@ func (o *WorkAgentConfig) newHubWorkClientAndInformer(
ctx context.Context,
restMapper meta.RESTMapper,
) (string, workv1client.ManifestWorkInterface, workv1informers.ManifestWorkInformer, error) {
var workClient workclientset.Interface
var watcherStore *store.AgentInformerWatcherStore
var hubHost string

if o.workOptions.WorkloadSourceDriver == "kube" {
config, err := clientcmd.BuildConfigFromFlags("", o.workOptions.WorkloadSourceConfig)
if err != nil {
return "", nil, nil, err
}

kubeWorkClientSet, err := workclientset.NewForConfig(config)
workClient, err = workclientset.NewForConfig(config)
if err != nil {
return "", nil, nil, err
}

factory := workinformers.NewSharedInformerFactoryWithOptions(
kubeWorkClientSet,
5*time.Minute,
workinformers.WithNamespace(o.agentOptions.SpokeClusterName),
)
informer := factory.Work().V1().ManifestWorks()
hubHost = config.Host
} else {
// For cloudevents drivers, we build ManifestWork client that implements the
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.

return config.Host, kubeWorkClientSet.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName), informer, nil
}
watcherStore = store.NewAgentInformerWatcherStore()

// For cloudevents drivers, we build ManifestWork client that implements the
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
hubHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig).
LoadConfig()
if err != nil {
return "", nil, nil, err
serverHost, config, err := generic.NewConfigLoader(o.workOptions.WorkloadSourceDriver, o.workOptions.WorkloadSourceConfig).
LoadConfig()
if err != nil {
return "", nil, nil, err
}

clientHolder, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(o.workOptions.CloudEventsClientID).
WithClusterName(o.agentOptions.SpokeClusterName).
WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...).
WithWorkClientWatcherStore(watcherStore).
NewAgentClientHolder(ctx)
if err != nil {
return "", nil, nil, err
}

hubHost = serverHost
workClient = clientHolder.WorkInterface()
}

clientHolder, informer, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(o.workOptions.CloudEventsClientID).
WithInformerConfig(5*time.Minute, workinformers.WithNamespace(o.agentOptions.SpokeClusterName)).
WithClusterName(o.agentOptions.SpokeClusterName).
WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...).
NewAgentClientHolderWithInformer(ctx)
if err != nil {
return "", nil, nil, err
factory := workinformers.NewSharedInformerFactoryWithOptions(
workClient,
5*time.Minute,
workinformers.WithNamespace(o.agentOptions.SpokeClusterName),
)
informer := factory.Work().V1().ManifestWorks()

// For cloudevents work client, we use the informer store as the client store
if watcherStore != nil {
watcherStore.SetStore(informer.Informer().GetStore())
}

return hubHost, clientHolder.ManifestWorks(o.agentOptions.SpokeClusterName), informer, err
return hubHost, workClient.WorkV1().ManifestWorks(o.agentOptions.SpokeClusterName), informer, nil
}
16 changes: 16 additions & 0 deletions pkg/work/spoke/statusfeedback/rules/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ var deploymentRule = []workapiv1.JsonPath{
},
}

var daemonsetRule = []workapiv1.JsonPath{
{
Name: "NumberReady",
Path: ".status.numberReady",
},
{
Name: "DesiredNumberScheduled",
Path: ".status.desiredNumberScheduled",
},
{
Name: "NumberAvailable",
Path: ".status.numberAvailable",
},
}

var jobRule = []workapiv1.JsonPath{
{
Name: "JobComplete",
Expand Down Expand Up @@ -57,6 +72,7 @@ func DefaultWellKnownStatusRule() WellKnownStatusRuleResolver {
{Group: "apps", Version: "v1", Kind: "Deployment"}: deploymentRule,
{Group: "batch", Version: "v1", Kind: "Job"}: jobRule,
{Group: "", Version: "v1", Kind: "Pod"}: podRule,
{Group: "apps", Version: "v1", Kind: "DaemonSet"}: daemonsetRule,
},
}
}
Expand Down
182 changes: 182 additions & 0 deletions test/integration/cloudevents/statusfeedback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,4 +541,186 @@ var _ = ginkgo.Describe("ManifestWork Status Feedback", func() {
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
})
})

ginkgo.Context("DaemonSet Status feedback", func() {
ginkgo.BeforeEach(func() {
u, _, err := util.NewDaesonSet(commOptions.SpokeClusterName, "ds1")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
manifests = append(manifests, util.ToManifest(u))

var ctx context.Context
ctx, cancel = context.WithCancel(context.Background())
go runWorkAgent(ctx, o, commOptions)
})

ginkgo.AfterEach(func() {
if cancel != nil {
cancel()
}
})

ginkgo.It("should return well known statuses", func() {
work.Spec.ManifestConfigs = []workapiv1.ManifestConfigOption{
{
ResourceIdentifier: workapiv1.ResourceIdentifier{
Group: "apps",
Resource: "daemonsets",
Namespace: commOptions.SpokeClusterName,
Name: "ds1",
},
FeedbackRules: []workapiv1.FeedbackRule{
{
Type: workapiv1.WellKnownStatusType,
},
},
},
}

work, err = workSourceWorkClient.WorkV1().ManifestWorks(commOptions.SpokeClusterName).
Create(context.Background(), work, metav1.CreateOptions{})
gomega.Expect(err).ToNot(gomega.HaveOccurred())

util.AssertWorkCondition(work.Namespace, work.Name, workSourceWorkClient,
workapiv1.WorkApplied, metav1.ConditionTrue, []metav1.ConditionStatus{metav1.ConditionTrue},
eventuallyTimeout, eventuallyInterval)
util.AssertWorkCondition(work.Namespace, work.Name, workSourceWorkClient,
workapiv1.WorkAvailable, metav1.ConditionTrue, []metav1.ConditionStatus{metav1.ConditionTrue},
eventuallyTimeout, eventuallyInterval)

// Update DaemonSet status on spoke
gomega.Eventually(func() error {
ds, err := spokeKubeClient.AppsV1().DaemonSets(commOptions.SpokeClusterName).
Get(context.Background(), "ds1", metav1.GetOptions{})
if err != nil {
return err
}

ds.Status.NumberAvailable = 2
ds.Status.DesiredNumberScheduled = 3
ds.Status.NumberReady = 2

_, err = spokeKubeClient.AppsV1().DaemonSets(commOptions.SpokeClusterName).
UpdateStatus(context.Background(), ds, metav1.UpdateOptions{})
return err
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())

// Check if we get status of daemonset on work api
gomega.Eventually(func() error {
work, err = workSourceWorkClient.WorkV1().ManifestWorks(commOptions.SpokeClusterName).
Get(context.Background(), work.Name, metav1.GetOptions{})
if err != nil {
return err
}

if len(work.Status.ResourceStatus.Manifests) != 1 {
return fmt.Errorf("the size of resource status is not correct, expect to be 1 but got %d",
len(work.Status.ResourceStatus.Manifests))
}

values := work.Status.ResourceStatus.Manifests[0].StatusFeedbacks.Values

expectedValues := []workapiv1.FeedbackValue{
{
Name: "NumberReady",
Value: workapiv1.FieldValue{
Type: workapiv1.Integer,
Integer: ptr.To[int64](2),
},
},
{
Name: "DesiredNumberScheduled",
Value: workapiv1.FieldValue{
Type: workapiv1.Integer,
Integer: ptr.To[int64](3),
},
},
{
Name: "NumberAvailable",
Value: workapiv1.FieldValue{
Type: workapiv1.Integer,
Integer: ptr.To[int64](2),
},
},
}
if !apiequality.Semantic.DeepEqual(values, expectedValues) {
return fmt.Errorf("status feedback values are not correct, we got %v", values)
}

if !util.HaveManifestCondition(work.Status.ResourceStatus.Manifests,
"StatusFeedbackSynced", []metav1.ConditionStatus{metav1.ConditionTrue}) {
return fmt.Errorf("status sync condition should be True")
}

return err
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())

// Update replica of deployment
gomega.Eventually(func() error {
ds, err := spokeKubeClient.AppsV1().DaemonSets(commOptions.SpokeClusterName).
Get(context.Background(), "ds1", metav1.GetOptions{})
if err != nil {
return err
}

ds.Status.NumberAvailable = 3
ds.Status.DesiredNumberScheduled = 3
ds.Status.NumberReady = 3

_, err = spokeKubeClient.AppsV1().DaemonSets(commOptions.SpokeClusterName).
UpdateStatus(context.Background(), ds, metav1.UpdateOptions{})
return err
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())

// Check if the status of the daemonset is synced on work api
gomega.Eventually(func() error {
work, err = workSourceWorkClient.WorkV1().ManifestWorks(commOptions.SpokeClusterName).
Get(context.Background(), work.Name, metav1.GetOptions{})
if err != nil {
return err
}

if len(work.Status.ResourceStatus.Manifests) != 1 {
return fmt.Errorf("the size of resource status is not correct, expect to be 1 but got %d",
len(work.Status.ResourceStatus.Manifests))
}

values := work.Status.ResourceStatus.Manifests[0].StatusFeedbacks.Values

expectedValues := []workapiv1.FeedbackValue{
{
Name: "NumberReady",
Value: workapiv1.FieldValue{
Type: workapiv1.Integer,
Integer: ptr.To[int64](3),
},
},
{
Name: "DesiredNumberScheduled",
Value: workapiv1.FieldValue{
Type: workapiv1.Integer,
Integer: ptr.To[int64](3),
},
},
{
Name: "NumberAvailable",
Value: workapiv1.FieldValue{
Type: workapiv1.Integer,
Integer: ptr.To[int64](3),
},
},
}
if !apiequality.Semantic.DeepEqual(values, expectedValues) {
return fmt.Errorf("status feedback values are not correct, we got %v", values)
}

if !util.HaveManifestCondition(work.Status.ResourceStatus.Manifests,
"StatusFeedbackSynced", []metav1.ConditionStatus{metav1.ConditionTrue}) {
return fmt.Errorf("status sync condition should be True")
}

return nil
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
})
})

})
Loading

0 comments on commit 391f32a

Please sign in to comment.