diff --git a/pkg/apicheck/check.go b/pkg/apicheck/check.go index 0a5d87c3..fda7e901 100644 --- a/pkg/apicheck/check.go +++ b/pkg/apicheck/check.go @@ -10,7 +10,7 @@ import ( "github.com/go-logr/logr" "google.golang.org/grpc/credentials" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" @@ -128,21 +128,21 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response { } c.config.Log.Info("Error count exceeds threshold, trying to ask other nodes if I'm healthy") - nodesToAsk := c.config.Peers.GetPeersAddresses(peers.Worker) - if nodesToAsk == nil || len(nodesToAsk) == 0 { + peersToAsk := c.config.Peers.GetPeersAddresses(peers.Worker) + if peersToAsk == nil || len(peersToAsk) == 0 { c.config.Log.Info("Peers list is empty and / or couldn't be retrieved from server, nothing we can do, so consider the node being healthy") - //todo maybe we need to check if this happens too much and reboot + // TODO: maybe we need to check if this happens too much and reboot return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseNoPeersWereFound} } apiErrorsResponsesSum := 0 - nrAllNodes := len(nodesToAsk) - // nodesToAsk is being reduced in every iteration, iterate until no nodes left to ask - for i := 0; len(nodesToAsk) > 0; i++ { + nrAllPeers := len(peersToAsk) + // peersToAsk is being reduced at every iteration, iterate until no peers left to ask + for i := 0; len(peersToAsk) > 0; i++ { - batchSize := utils.GetNextBatchSize(nrAllNodes, len(nodesToAsk)) - chosenNodesAddresses := c.popNodes(&nodesToAsk, batchSize) - healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenNodesAddresses) + batchSize := utils.GetNextBatchSize(nrAllPeers, len(peersToAsk)) + chosenPeersIPs := c.popPeerIPs(&peersToAsk, batchSize) + healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenPeersIPs) if healthyResponses+unhealthyResponses+apiErrorsResponses > 0 { c.timeOfLastPeerResponse = time.Now() } @@ -161,9 +161,9 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response { if apiErrorsResponses > 0 { c.config.Log.Info("Peer can't access the api-server") apiErrorsResponsesSum += apiErrorsResponses - //todo consider using [m|n]hc.spec.maxUnhealthy instead of 50% - if apiErrorsResponsesSum > nrAllNodes/2 { //already reached more than 50% of the nodes and all of them returned api error - //assuming this is a control plane failure as others can't access api-server as well + // TODO: consider using [m|n]hc.spec.maxUnhealthy instead of 50% + if apiErrorsResponsesSum > nrAllPeers/2 { // already reached more than 50% of the peers and all of them returned api error + // assuming this is a control plane failure as others can't access api-server as well c.config.Log.Info("More than 50% of the nodes couldn't access the api-server, assuming this is a control plane failure") return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseMostPeersCantAccessAPIServer} } @@ -185,47 +185,48 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response { } func (c *ApiConnectivityCheck) canOtherControlPlanesBeReached() bool { - nodesToAsk := c.config.Peers.GetPeersAddresses(peers.ControlPlane) - numOfControlPlanePeers := len(nodesToAsk) + peersToAsk := c.config.Peers.GetPeersAddresses(peers.ControlPlane) + numOfControlPlanePeers := len(peersToAsk) if numOfControlPlanePeers == 0 { c.config.Log.Info("Peers list is empty and / or couldn't be retrieved from server, other control planes can't be reached") return false } - chosenNodesAddresses := c.popNodes(&nodesToAsk, numOfControlPlanePeers) - healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenNodesAddresses) + chosenPeersIPs := c.popPeerIPs(&peersToAsk, numOfControlPlanePeers) + healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenPeersIPs) // Any response is an indication of communication with a peer return (healthyResponses + unhealthyResponses + apiErrorsResponses) > 0 } -func (c *ApiConnectivityCheck) popNodes(nodes *[][]v1.NodeAddress, count int) []string { - nrOfNodes := len(*nodes) - if nrOfNodes == 0 { - return []string{} +func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]corev1.PodIP, count int) []corev1.PodIP { + nrOfPeers := len(*peersIPs) + if nrOfPeers == 0 { + return []corev1.PodIP{} } - if count > nrOfNodes { - count = nrOfNodes + if count > nrOfPeers { + count = nrOfPeers } - //todo maybe we should pick nodes randomly rather than relying on the order returned from api-server - addresses := make([]string, count) + // TODO: maybe we should pick nodes randomly rather than relying on the order returned from api-server + selectedIPs := make([]corev1.PodIP, count) for i := 0; i < count; i++ { - nodeAddresses := (*nodes)[i] - if len(nodeAddresses) == 0 || nodeAddresses[0].Address == "" { - c.config.Log.Info("ignoring node without IP address") + ip := (*peersIPs)[i] + if ip.IP == "" { + // This should not happen, but keeping it for good measure. + c.config.Log.Info("ignoring peers without IP address") continue } - addresses[i] = nodeAddresses[0].Address //todo node might have multiple addresses or none + selectedIPs[i] = ip } - *nodes = (*nodes)[count:] //remove popped nodes from the list + *peersIPs = (*peersIPs)[count:] //remove popped nodes from the list - return addresses + return selectedIPs } -func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []string) (int, int, int, int) { +func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []corev1.PodIP) (int, int, int, int) { nrAddresses := len(addresses) responsesChan := make(chan selfNodeRemediation.HealthCheckResponseCode, nrAddresses) @@ -237,9 +238,9 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []string) (int } // getHealthStatusFromPeer issues a GET request to the specified IP and returns the result from the peer into the given channel -func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, results chan<- selfNodeRemediation.HealthCheckResponseCode) { +func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { - logger := c.config.Log.WithValues("IP", endpointIp) + logger := c.config.Log.WithValues("IP", endpointIp.IP) logger.Info("getting health status from peer") if err := c.initClientCreds(); err != nil { @@ -249,7 +250,7 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, result } // TODO does this work with IPv6? - phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds) + phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp.IP, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds) if err != nil { logger.Error(err, "failed to init grpc client") results <- selfNodeRemediation.RequestFailed diff --git a/pkg/peers/peers.go b/pkg/peers/peers.go index ce28dc02..3978af5d 100644 --- a/pkg/peers/peers.go +++ b/pkg/peers/peers.go @@ -8,6 +8,7 @@ import ( "github.com/go-logr/logr" commonlabels "github.com/medik8s/common/pkg/labels" + pkgerrors "github.com/pkg/errors" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -36,7 +37,7 @@ type Peers struct { myNodeName string mutex sync.Mutex apiServerTimeout time.Duration - workerPeersAddresses, controlPlanePeersAddresses [][]v1.NodeAddress + workerPeersAddresses, controlPlanePeersAddresses []v1.PodIP } func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Reader, log logr.Logger, apiServerTimeout time.Duration) *Peers { @@ -47,8 +48,8 @@ func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Read myNodeName: myNodeName, mutex: sync.Mutex{}, apiServerTimeout: apiServerTimeout, - workerPeersAddresses: [][]v1.NodeAddress{}, - controlPlanePeersAddresses: [][]v1.NodeAddress{}, + workerPeersAddresses: []v1.PodIP{}, + controlPlanePeersAddresses: []v1.PodIP{}, } } @@ -76,30 +77,37 @@ func (p *Peers) Start(ctx context.Context) error { p.controlPlanePeerSelector = createSelector(hostname, getControlPlaneLabel(myNode)) } - go wait.UntilWithContext(ctx, func(ctx context.Context) { - p.updateWorkerPeers(ctx) - p.updateControlPlanePeers(ctx) - }, p.peerUpdateInterval) + var updatePeersError error + cancellableCtx, cancel := context.WithCancel(ctx) - p.log.Info("peers started") + p.log.Info("peer starting", "name", p.myNodeName) + wait.UntilWithContext(cancellableCtx, func(ctx context.Context) { + updatePeersError = p.updateWorkerPeers(ctx) + if updatePeersError != nil { + cancel() + } + updatePeersError = p.updateControlPlanePeers(ctx) + if updatePeersError != nil { + cancel() + } + }, p.peerUpdateInterval) - <-ctx.Done() - return nil + return updatePeersError } -func (p *Peers) updateWorkerPeers(ctx context.Context) { - setterFunc := func(addresses [][]v1.NodeAddress) { p.workerPeersAddresses = addresses } +func (p *Peers) updateWorkerPeers(ctx context.Context) error { + setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.workerPeerSelector } - p.updatePeers(ctx, selectorGetter, setterFunc) + return p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updateControlPlanePeers(ctx context.Context) { - setterFunc := func(addresses [][]v1.NodeAddress) { p.controlPlanePeersAddresses = addresses } +func (p *Peers) updateControlPlanePeers(ctx context.Context) error { + setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector } - p.updatePeers(ctx, selectorGetter, setterFunc) + return p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses [][]v1.NodeAddress)) { +func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) error { p.mutex.Lock() defer p.mutex.Unlock() @@ -111,25 +119,45 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec if err := p.List(readerCtx, &nodes, client.MatchingLabelsSelector{Selector: getSelector()}); err != nil { if errors.IsNotFound(err) { // we are the only node at the moment... reset peerList - p.workerPeersAddresses = [][]v1.NodeAddress{} + p.workerPeersAddresses = []v1.PodIP{} } p.log.Error(err, "failed to update peer list") - return + return pkgerrors.Wrap(err, "failed to update peer list") + } + + pods := v1.PodList{} + listOptions := &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{ + "app.kubernetes.io/name": "self-node-remediation", + "app.kubernetes.io/component": "agent", + }), + } + if err := p.List(readerCtx, &pods, listOptions); err != nil { + p.log.Error(err, "could not get pods") + return pkgerrors.Wrap(err, "could not get pods") } nodesCount := len(nodes.Items) - addresses := make([][]v1.NodeAddress, nodesCount) + addresses := make([]v1.PodIP, nodesCount) for i, node := range nodes.Items { - addresses[i] = node.Status.Addresses + for _, pod := range pods.Items { + if pod.Spec.NodeName == node.Name { + if pod.Status.PodIPs == nil || len(pod.Status.PodIPs) == 0 { + return pkgerrors.New(fmt.Sprintf("empty Pod IP for Pod %s on Node %s", pod.Name, node.Name)) + } + addresses[i] = pod.Status.PodIPs[0] + } + } } setAddresses(addresses) + return nil } -func (p *Peers) GetPeersAddresses(role Role) [][]v1.NodeAddress { +func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP { p.mutex.Lock() defer p.mutex.Unlock() - var addresses [][]v1.NodeAddress + var addresses []v1.PodIP if role == Worker { addresses = p.workerPeersAddresses } else { @@ -137,11 +165,8 @@ func (p *Peers) GetPeersAddresses(role Role) [][]v1.NodeAddress { } //we don't want the caller to be able to change the addresses //so we create a deep copy and return it - addressesCopy := make([][]v1.NodeAddress, len(addresses)) - for i := range addressesCopy { - addressesCopy[i] = make([]v1.NodeAddress, len(addresses[i])) - copy(addressesCopy, addresses) - } + addressesCopy := make([]v1.PodIP, len(addresses)) + copy(addressesCopy, addresses) return addressesCopy }