-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix EndpointSlice reconciliation #518
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
ghcr.io/aenix-io/cozystack/kubevirt-cloud-provider:0.14.0@sha256:55b78220b60773eefb7b7d3451d7ab9fe89fb6b989e8fe2ae214aab164f00293 | ||
ghcr.io/aenix-io/cozystack/kubevirt-cloud-provider:latest@sha256:8fc186c44669c15d001d84035caae2fe4676dc8eb0bce75496cff500d36e7570 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,13 +3,14 @@ FROM --platform=linux/amd64 golang:1.20.6 AS builder | |
|
||
RUN git clone https://github.com/kubevirt/cloud-provider-kubevirt /go/src/kubevirt.io/cloud-provider-kubevirt \ | ||
&& cd /go/src/kubevirt.io/cloud-provider-kubevirt \ | ||
&& git checkout adbd6c27468b86b020cf38490e84f124ef24ab62 | ||
&& git checkout da9e0cf | ||
|
||
WORKDIR /go/src/kubevirt.io/cloud-provider-kubevirt | ||
|
||
# see: https://github.com/kubevirt/cloud-provider-kubevirt/pull/291 | ||
# see: https://github.com/kubevirt/cloud-provider-kubevirt/pull/335 | ||
# see: https://github.com/kubevirt/cloud-provider-kubevirt/pull/336 | ||
ADD patches /patches | ||
RUN git apply /patches/external-traffic-policy-local.diff | ||
RUN git apply /patches/*.diff | ||
RUN go get 'k8s.io/endpointslice/[email protected]' 'k8s.io/[email protected]' | ||
RUN go mod tidy | ||
Comment on lines
14
to
15
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Version mismatch detected between new dependencies and existing k8s.io packages The go.mod shows that the existing k8s.io dependencies are using version v0.31.2, but the new dependencies in the Dockerfile are being added with version v0.28. This version mismatch could lead to compatibility issues.
🔗 Analysis chainVerify compatibility of new dependencies. The addition of EndpointSlice-related dependencies ( 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Check for potential version conflicts in go.mod
rg -A 5 "k8s.io/(endpointslice|apiserver)" | grep "v0.28"
Length of output: 194 Script: #!/bin/bash
# Check go.mod for k8s.io dependencies and their versions
fd go.mod --exec cat {} | grep -A 10 "require ("
# Check for imports of endpointslice and apiserver packages
rg "import.*k8s.io/(endpointslice|apiserver)" -A 5
Length of output: 1008 |
||
RUN go mod vendor | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go | ||
index a3c1aa33..95c31438 100644 | ||
--- a/pkg/controller/kubevirteps/kubevirteps_controller.go | ||
+++ b/pkg/controller/kubevirteps/kubevirteps_controller.go | ||
@@ -412,11 +412,11 @@ func (c *Controller) reconcileByAddressType(service *v1.Service, tenantSlices [] | ||
// Create the desired port configuration | ||
var desiredPorts []discovery.EndpointPort | ||
|
||
- for _, port := range service.Spec.Ports { | ||
+ for i := range service.Spec.Ports { | ||
desiredPorts = append(desiredPorts, discovery.EndpointPort{ | ||
- Port: &port.TargetPort.IntVal, | ||
- Protocol: &port.Protocol, | ||
- Name: &port.Name, | ||
+ Port: &service.Spec.Ports[i].TargetPort.IntVal, | ||
+ Protocol: &service.Spec.Ports[i].Protocol, | ||
+ Name: &service.Spec.Ports[i].Name, | ||
}) | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,129 @@ | ||||||||||||||||||||||||||
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go | ||||||||||||||||||||||||||
index a3c1aa33..6f6e3d32 100644 | ||||||||||||||||||||||||||
--- a/pkg/controller/kubevirteps/kubevirteps_controller.go | ||||||||||||||||||||||||||
+++ b/pkg/controller/kubevirteps/kubevirteps_controller.go | ||||||||||||||||||||||||||
@@ -108,32 +108,24 @@ func newRequest(reqType ReqType, obj interface{}, oldObj interface{}) *Request { | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
func (c *Controller) Init() error { | ||||||||||||||||||||||||||
- | ||||||||||||||||||||||||||
- // Act on events from Services on the infra cluster. These are created by the EnsureLoadBalancer function. | ||||||||||||||||||||||||||
- // We need to watch for these events so that we can update the EndpointSlices in the infra cluster accordingly. | ||||||||||||||||||||||||||
+ // Existing Service event handlers... | ||||||||||||||||||||||||||
_, err := c.infraFactory.Core().V1().Services().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||||||||||||||||||||||||||
AddFunc: func(obj interface{}) { | ||||||||||||||||||||||||||
- // cast obj to Service | ||||||||||||||||||||||||||
svc := obj.(*v1.Service) | ||||||||||||||||||||||||||
- // Only act on Services of type LoadBalancer | ||||||||||||||||||||||||||
if svc.Spec.Type == v1.ServiceTypeLoadBalancer { | ||||||||||||||||||||||||||
klog.Infof("Service added: %v/%v", svc.Namespace, svc.Name) | ||||||||||||||||||||||||||
c.queue.Add(newRequest(AddReq, obj, nil)) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
UpdateFunc: func(oldObj, newObj interface{}) { | ||||||||||||||||||||||||||
- // cast obj to Service | ||||||||||||||||||||||||||
newSvc := newObj.(*v1.Service) | ||||||||||||||||||||||||||
- // Only act on Services of type LoadBalancer | ||||||||||||||||||||||||||
if newSvc.Spec.Type == v1.ServiceTypeLoadBalancer { | ||||||||||||||||||||||||||
klog.Infof("Service updated: %v/%v", newSvc.Namespace, newSvc.Name) | ||||||||||||||||||||||||||
c.queue.Add(newRequest(UpdateReq, newObj, oldObj)) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
DeleteFunc: func(obj interface{}) { | ||||||||||||||||||||||||||
- // cast obj to Service | ||||||||||||||||||||||||||
svc := obj.(*v1.Service) | ||||||||||||||||||||||||||
- // Only act on Services of type LoadBalancer | ||||||||||||||||||||||||||
if svc.Spec.Type == v1.ServiceTypeLoadBalancer { | ||||||||||||||||||||||||||
klog.Infof("Service deleted: %v/%v", svc.Namespace, svc.Name) | ||||||||||||||||||||||||||
c.queue.Add(newRequest(DeleteReq, obj, nil)) | ||||||||||||||||||||||||||
@@ -144,7 +136,7 @@ func (c *Controller) Init() error { | ||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
- // Monitor endpoint slices that we are interested in based on known services in the infra cluster | ||||||||||||||||||||||||||
+ // Existing EndpointSlice event handlers in tenant cluster... | ||||||||||||||||||||||||||
_, err = c.tenantFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||||||||||||||||||||||||||
AddFunc: func(obj interface{}) { | ||||||||||||||||||||||||||
eps := obj.(*discovery.EndpointSlice) | ||||||||||||||||||||||||||
@@ -194,10 +186,80 @@ func (c *Controller) Init() error { | ||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
- //TODO: Add informer for EndpointSlices in the infra cluster to watch for (unwanted) changes | ||||||||||||||||||||||||||
+ // Add an informer for EndpointSlices in the infra cluster | ||||||||||||||||||||||||||
+ _, err = c.infraFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||||||||||||||||||||||||||
+ AddFunc: func(obj interface{}) { | ||||||||||||||||||||||||||
+ eps := obj.(*discovery.EndpointSlice) | ||||||||||||||||||||||||||
+ if c.managedByController(eps) { | ||||||||||||||||||||||||||
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps) | ||||||||||||||||||||||||||
+ if svcErr != nil { | ||||||||||||||||||||||||||
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr) | ||||||||||||||||||||||||||
+ return | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ if svc != nil { | ||||||||||||||||||||||||||
+ klog.Infof("Infra EndpointSlice added: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name) | ||||||||||||||||||||||||||
+ c.queue.Add(newRequest(AddReq, svc, nil)) | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ }, | ||||||||||||||||||||||||||
+ UpdateFunc: func(oldObj, newObj interface{}) { | ||||||||||||||||||||||||||
+ eps := newObj.(*discovery.EndpointSlice) | ||||||||||||||||||||||||||
+ if c.managedByController(eps) { | ||||||||||||||||||||||||||
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps) | ||||||||||||||||||||||||||
+ if svcErr != nil { | ||||||||||||||||||||||||||
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr) | ||||||||||||||||||||||||||
+ return | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ if svc != nil { | ||||||||||||||||||||||||||
+ klog.Infof("Infra EndpointSlice updated: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name) | ||||||||||||||||||||||||||
+ c.queue.Add(newRequest(UpdateReq, svc, nil)) | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ }, | ||||||||||||||||||||||||||
+ DeleteFunc: func(obj interface{}) { | ||||||||||||||||||||||||||
+ eps := obj.(*discovery.EndpointSlice) | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure safe type assertion in In the Consider modifying the DeleteFunc: func(obj interface{}) {
- eps := obj.(*discovery.EndpointSlice)
+ var eps *discovery.EndpointSlice
+ if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
+ var ok bool
+ eps, ok = tombstone.Obj.(*discovery.EndpointSlice)
+ if !ok {
+ klog.Errorf("Failed to convert tombstone object to EndpointSlice: %v", tombstone.Obj)
+ return
+ }
+ } else {
+ eps = obj.(*discovery.EndpointSlice)
+ } 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||
+ if c.managedByController(eps) { | ||||||||||||||||||||||||||
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps) | ||||||||||||||||||||||||||
+ if svcErr != nil { | ||||||||||||||||||||||||||
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s on delete: %v", eps.Namespace, eps.Name, svcErr) | ||||||||||||||||||||||||||
+ return | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ if svc != nil { | ||||||||||||||||||||||||||
+ klog.Infof("Infra EndpointSlice deleted: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name) | ||||||||||||||||||||||||||
+ c.queue.Add(newRequest(DeleteReq, svc, nil)) | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ }, | ||||||||||||||||||||||||||
+ }) | ||||||||||||||||||||||||||
+ if err != nil { | ||||||||||||||||||||||||||
+ return err | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ | ||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
+// getInfraServiceForEPS returns the Service in the infra cluster associated with the given EndpointSlice. | ||||||||||||||||||||||||||
+// It does this by reading the "kubernetes.io/service-name" label from the EndpointSlice, which should correspond | ||||||||||||||||||||||||||
+// to the Service name. If not found or if the Service doesn't exist, it returns nil. | ||||||||||||||||||||||||||
+func (c *Controller) getInfraServiceForEPS(ctx context.Context, eps *discovery.EndpointSlice) (*v1.Service, error) { | ||||||||||||||||||||||||||
+ svcName := eps.Labels[discovery.LabelServiceName] | ||||||||||||||||||||||||||
+ if svcName == "" { | ||||||||||||||||||||||||||
+ // No service name label found, can't determine infra service. | ||||||||||||||||||||||||||
+ return nil, nil | ||||||||||||||||||||||||||
Comment on lines
+109
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check for nil labels in In Add a nil check for if eps.Labels == nil {
// No labels present; cannot determine infra service.
return nil, nil
}
svcName := eps.Labels[discovery.LabelServiceName] |
||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ | ||||||||||||||||||||||||||
+ svc, err := c.infraClient.CoreV1().Services(c.infraNamespace).Get(ctx, svcName, metav1.GetOptions{}) | ||||||||||||||||||||||||||
+ if err != nil { | ||||||||||||||||||||||||||
+ if k8serrors.IsNotFound(err) { | ||||||||||||||||||||||||||
+ // Service doesn't exist | ||||||||||||||||||||||||||
+ return nil, nil | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ return nil, err | ||||||||||||||||||||||||||
+ } | ||||||||||||||||||||||||||
+ | ||||||||||||||||||||||||||
+ return svc, nil | ||||||||||||||||||||||||||
+} | ||||||||||||||||||||||||||
+ | ||||||||||||||||||||||||||
// Run starts an asynchronous loop that monitors and updates GKENetworkParamSet in the cluster. | ||||||||||||||||||||||||||
func (c *Controller) Run(numWorkers int, stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { | ||||||||||||||||||||||||||
defer utilruntime.HandleCrash() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use full commit hash for better security and traceability.
Using a shortened commit hash (
da9e0cf
) reduces traceability and could potentially lead to ambiguity if hash collisions occur in the future.