Skip to content

Commit

Permalink
Update minor typos and add dummy endpoint when necessary
Browse files Browse the repository at this point in the history
Signed-off-by: Jian Wang <[email protected]>
  • Loading branch information
w13915984028 authored and starbops committed Aug 29, 2024
1 parent 428a595 commit df4c506
Show file tree
Hide file tree
Showing 5 changed files with 453 additions and 17 deletions.
99 changes: 85 additions & 14 deletions pkg/lb/servicelb/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"slices"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -132,7 +133,7 @@ func (m *Manager) updateAllConditions(lb *lbv1.LoadBalancer, eps *discoveryv1.En
epsCopy := eps.DeepCopy()
updated := false
for i := range epsCopy.Endpoints {
if needUpdateEndpointConditions(&epsCopy.Endpoints[i].Conditions, isHealthy) {
if !isDummyEndpoint(&epsCopy.Endpoints[i]) && needUpdateEndpointConditions(&epsCopy.Endpoints[i].Conditions, isHealthy) {
updateEndpointConditions(&epsCopy.Endpoints[i].Conditions, isHealthy)
updated = true
}
Expand Down Expand Up @@ -161,7 +162,7 @@ func (m *Manager) GetProbeReadyBackendServerCount(lb *lbv1.LoadBalancer) (int, e
// if use `for _, ep := range eps.Endpoints`
// get: G601: Implicit memory aliasing in for loop. (gosec)
for i := range eps.Endpoints {
if isEndpointConditionsReady(&eps.Endpoints[i].Conditions) {
if !isDummyEndpoint(&eps.Endpoints[i]) && isEndpointConditionsReady(&eps.Endpoints[i].Conditions) {
count++
}
}
Expand Down Expand Up @@ -209,7 +210,7 @@ func (m *Manager) getServiceBackendServers(lb *lbv1.LoadBalancer) ([]pkglb.Backe

servers := make([]pkglb.BackendServer, 0, len(vmis))

// skip being-deleting vmi, no-address vmi
// skip being-deleted vmi, no-address vmi
for _, vmi := range vmis {
if vmi.DeletionTimestamp != nil {
continue
Expand Down Expand Up @@ -244,22 +245,22 @@ func (m *Manager) EnsureBackendServers(lb *lbv1.LoadBalancer) ([]pkglb.BackendSe
return nil, err
}

epsCopy, err := m.constructEndpointSliceFromBackendServers(eps, lb, servers)
epsNew, err := m.constructEndpointSliceFromBackendServers(eps, lb, servers)
if err != nil {
return nil, err
}

// reate a new one
// create a new one
if eps == nil {
// do not check IsAlreadyExists
eps, err = m.endpointSliceClient.Create(epsCopy)
// it is ok to do not check IsAlreadyExists, reconciler will pass
eps, err = m.endpointSliceClient.Create(epsNew)
if err != nil {
return nil, fmt.Errorf("fail to create endpointslice, error: %w", err)
}
} else {
if !reflect.DeepEqual(eps, epsCopy) {
if !reflect.DeepEqual(eps, epsNew) {
logrus.Debugf("update endpointslice %s/%s", lb.Namespace, lb.Name)
eps, err = m.endpointSliceClient.Update(epsCopy)
eps, err = m.endpointSliceClient.Update(epsNew)
if err != nil {
return nil, fmt.Errorf("fail to update endpointslice, error: %w", err)
}
Expand All @@ -268,7 +269,12 @@ func (m *Manager) EnsureBackendServers(lb *lbv1.LoadBalancer) ([]pkglb.BackendSe

// always ensure probs
if err := m.ensureProbes(lb, eps); err != nil {
return nil, fmt.Errorf("fail to enuse probs, error: %w", err)
return nil, fmt.Errorf("fail to ensure probs, error: %w", err)
}

// always ensure dummy endpoint
if err := m.ensureDummyEndpoint(lb, eps); err != nil {
return nil, fmt.Errorf("fail to ensure dummy endpointslice, error: %w", err)
}

return servers, nil
Expand Down Expand Up @@ -313,7 +319,7 @@ func (m *Manager) ensureProbes(lb *lbv1.LoadBalancer, eps *discoveryv1.EndpointS
targetProbers := make(map[string]prober.HealthOption)
// indexing to skip G601 in go v121
for i := range eps.Endpoints {
if len(eps.Endpoints[i].Addresses) == 0 {
if len(eps.Endpoints[i].Addresses) == 0 || isDummyEndpoint(&eps.Endpoints[i]) {
continue
}
targetProbers[marshalPorberAddress(lb, &eps.Endpoints[i])] = m.generateOneProber(lb, &eps.Endpoints[i])
Expand Down Expand Up @@ -345,11 +351,11 @@ func (m *Manager) updateAllProbers(uid string, activeProbers, targetProbers map[
return err
}
}
// proessed above or equal; then delete it from both maps
// replaced or equal; then delete it from both maps
delete(activeProbers, ap.Address)
delete(targetProbers, tp.Address)
}
// not found in targetProbers, processed in next lines
// for those not found in the targetProbers, will be processed in next lines
}

// remove all remainings of activeProbers
Expand All @@ -362,7 +368,6 @@ func (m *Manager) updateAllProbers(uid string, activeProbers, targetProbers map[

// add all remainings of targetProbers
for _, tp := range targetProbers {
// already checked, skip error
logrus.Debugf("+probe %s %s", uid, tp.Address)
if err := m.AddWorker(uid, tp.Address, tp); err != nil {
return err
Expand All @@ -372,6 +377,45 @@ func (m *Manager) updateAllProbers(uid string, activeProbers, targetProbers map[
return nil
}

// without at least one Ready (dummy) endpoint, the service may route traffic to local host
func (m *Manager) ensureDummyEndpoint(lb *lbv1.LoadBalancer, eps *discoveryv1.EndpointSlice) error {
dummyCount := 0
activeCount := 0
// if use `for _, ep := range eps.Endpoints`
// get: G601: Implicit memory aliasing in for loop. (gosec)
for i := range eps.Endpoints {
if isDummyEndpoint(&eps.Endpoints[i]) {
dummyCount++
} else if isEndpointConditionsReady(&eps.Endpoints[i].Conditions) {
activeCount++
}
}

// add the dummy endpoint
if activeCount == 0 && dummyCount == 0 {
epsCopy := eps.DeepCopy()
epsCopy.Endpoints = appendDummyEndpoint(epsCopy.Endpoints, lb)
if _, err := m.endpointSliceClient.Update(epsCopy); err != nil {
return fmt.Errorf("fail to append dummy endpoint to lb %v endpoint, error: %w", lb.Name, err)
}
return nil
}

// remove the dummy endpoint
if activeCount > 0 && dummyCount > 0 {
epsCopy := eps.DeepCopy()
epsCopy.Endpoints = slices.DeleteFunc(epsCopy.Endpoints, func(ep discoveryv1.Endpoint) bool {
return ep.TargetRef.UID == dummyEndpointID
})
if _, err := m.endpointSliceClient.Update(epsCopy); err != nil {
return fmt.Errorf("fail to remove dummy endpoint from lb %v endpoint, error: %w", lb.Name, err)
}
return nil
}

return nil
}

func (m *Manager) removeLBProbers(lb *lbv1.LoadBalancer) (int, error) {
return m.RemoveWorkersByUid(marshalUID(lb.Namespace, lb.Name))
}
Expand Down Expand Up @@ -515,6 +559,29 @@ func constructService(cur *corev1.Service, lb *lbv1.LoadBalancer) *corev1.Servic
return svc
}

const dummyEndpointIPv4Address = "10.52.0.255"
const dummyEndpointID = "dummy347-546a-4642-9da6-5608endpoint"

func appendDummyEndpoint(eps []discoveryv1.Endpoint, lb *lbv1.LoadBalancer) []discoveryv1.Endpoint {
endpoint := discoveryv1.Endpoint{
Addresses: []string{dummyEndpointIPv4Address},
TargetRef: &corev1.ObjectReference{
Namespace: lb.Namespace,
Name: lb.Name,
UID: dummyEndpointID,
},
Conditions: discoveryv1.EndpointConditions{
Ready: pointer.Bool(true),
},
}
eps = append(eps, endpoint)
return eps
}

func isDummyEndpoint(ep *discoveryv1.Endpoint) bool {
return ep.TargetRef.UID == dummyEndpointID
}

func (m *Manager) constructEndpointSliceFromBackendServers(cur *discoveryv1.EndpointSlice, lb *lbv1.LoadBalancer, servers []pkglb.BackendServer) (*discoveryv1.EndpointSlice, error) {
eps := &discoveryv1.EndpointSlice{}
if cur != nil {
Expand Down Expand Up @@ -586,6 +653,10 @@ func (m *Manager) constructEndpointSliceFromBackendServers(cur *discoveryv1.Endp
endpoints = append(endpoints, endpoint)
}
}
// a dummy endpoint avoids the LB traffic is routed to other services/local host accidentally
if len(endpoints) == 0 {
endpoints = appendDummyEndpoint(endpoints, lb)
}
eps.Endpoints = endpoints

logrus.Debugln("constructEndpointSliceFromBackendServers: ", eps)
Expand Down
72 changes: 70 additions & 2 deletions pkg/webhook/loadbalancer/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,20 @@ func NewMutator(namespaceCache ctlcorev1.NamespaceCache,
func (m *mutator) Create(_ *admission.Request, newObj runtime.Object) (admission.Patch, error) {
lb := newObj.(*lbv1.LoadBalancer)

return m.getAnnotationsPatch(lb)
ap, err := m.getAnnotationsPatch(lb)
if err != nil {
return nil, err
}

hcp, err := m.getHealthyCheckPatch(lb)
if err != nil {
return nil, err
}

if len(ap) == 0 {
return hcp, nil
}
return append(ap, hcp...), nil
}

func (m *mutator) Update(_ *admission.Request, _, newObj runtime.Object) (admission.Patch, error) {
Expand All @@ -51,7 +64,62 @@ func (m *mutator) Update(_ *admission.Request, _, newObj runtime.Object) (admiss
return nil, nil
}

return m.getAnnotationsPatch(lb)
ap, err := m.getAnnotationsPatch(lb)
if err != nil {
return nil, err
}

hcp, err := m.getHealthyCheckPatch(lb)
if err != nil {
return nil, err
}

if len(ap) == 0 {
return hcp, nil
}
return append(ap, hcp...), nil
}

// those fields are not checked in the past, necessary to overwrite them to at least 1
func (m *mutator) getHealthyCheckPatch(lb *lbv1.LoadBalancer) (admission.Patch, error) {
if lb.Spec.HealthCheck == nil || lb.Spec.HealthCheck.Port == 0 {
return nil, nil
}

hc := *lb.Spec.HealthCheck
patched := false

if hc.SuccessThreshold == 0 {
hc.SuccessThreshold = 2
patched = true
}

if hc.FailureThreshold == 0 {
hc.FailureThreshold = 2
patched = true
}

if hc.PeriodSeconds == 0 {
hc.PeriodSeconds = 1
patched = true
}

if hc.TimeoutSeconds == 0 {
hc.TimeoutSeconds = 1
patched = true
}

if patched {
return []admission.PatchOp{
{
Op: admission.PatchOpReplace,
Path: "/spec/healthCheck",
Value: hc,
},
}, nil
}

return nil, nil
}

func (m *mutator) getAnnotationsPatch(lb *lbv1.LoadBalancer) (admission.Patch, error) {
Expand Down
82 changes: 82 additions & 0 deletions pkg/webhook/loadbalancer/mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
harvesterfakeclients "github.com/harvester/harvester/pkg/util/fakeclients"
corefake "k8s.io/client-go/kubernetes/fake"

lbv1 "github.com/harvester/harvester-load-balancer/pkg/apis/loadbalancer.harvesterhci.io/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/harvester/harvester-load-balancer/pkg/utils"
)

Expand Down Expand Up @@ -39,13 +43,91 @@ func TestFindProject(t *testing.T) {
},
}

testsHealthCheckMutatored := []struct {
name string
lb *lbv1.LoadBalancer
wantErr bool
opsLen int
}{
{
name: "health check mutatored case",
lb: &lbv1.LoadBalancer{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test",
},
Spec: lbv1.LoadBalancerSpec{
Listeners: []lbv1.Listener{
{Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP},
{Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP},
},
HealthCheck: &lbv1.HealthCheck{Port: 80, SuccessThreshold: 0, FailureThreshold: 1, PeriodSeconds: 1, TimeoutSeconds: 1},
},
},
wantErr: false,
opsLen: 2,
},
{
name: "health check right case: valid parameters",
lb: &lbv1.LoadBalancer{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test",
},
Spec: lbv1.LoadBalancerSpec{
Listeners: []lbv1.Listener{
{Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP},
{Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP},
},
HealthCheck: &lbv1.HealthCheck{Port: 80, SuccessThreshold: 1, FailureThreshold: 1, PeriodSeconds: 1, TimeoutSeconds: 1},
},
},
wantErr: false,
opsLen: 1,
},
{
name: "health check right case: no health check",
lb: &lbv1.LoadBalancer{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test",
},
Spec: lbv1.LoadBalancerSpec{
Listeners: []lbv1.Listener{
{Name: "a", BackendPort: 80, Protocol: corev1.ProtocolTCP},
{Name: "b", BackendPort: 32, Protocol: corev1.ProtocolUDP},
},
},
},
wantErr: false,
opsLen: 1,
},
}

for _, test := range tests {
if project, err := m.findProject(test.namespace); err != nil {
t.Error(err)
} else if project != test.wantProject {
t.Errorf("want project %s through namespace %s, got %s", test.wantProject, test.namespace, project)
}
}

for _, test := range testsHealthCheckMutatored {
if pt, err := m.Create(nil, test.lb); (err != nil) != test.wantErr {
t.Error(err)
} else if len(pt) != test.opsLen {
// return 2 ops
// [{Op:replace Path:/metadata/annotations Value:map[loadbalancer.harvesterhci.io/namespace:default loadbalancer.harvesterhci.io/network: loadbalancer.harvesterhci.io/project:local/p-abcde]}
// {Op:replace Path:/spec/healthCheck Value:{Port:80 SuccessThreshold:2 FailureThreshold:1 PeriodSeconds:1 TimeoutSeconds:1}}]
t.Errorf("create test %v return patchOps len %v != %v, %+v", test.name, len(pt), test.opsLen, pt)
}

if pt, err := m.Update(nil, nil, test.lb); (err != nil) != test.wantErr {
t.Error(err)
} else if len(pt) != test.opsLen {
t.Errorf("update test %v return patchOps len %v != %v, %+v", test.name, len(pt), test.opsLen, pt)
}
}
}

// TestFindNetwork tests the function findNetwork
Expand Down
Loading

0 comments on commit df4c506

Please sign in to comment.