Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-0.9] Use Pod IP for peer communication #234

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 36 additions & 35 deletions pkg/apicheck/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/go-logr/logr"
"google.golang.org/grpc/credentials"

v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -128,21 +128,21 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response {
}

c.config.Log.Info("Error count exceeds threshold, trying to ask other nodes if I'm healthy")
nodesToAsk := c.config.Peers.GetPeersAddresses(peers.Worker)
if nodesToAsk == nil || len(nodesToAsk) == 0 {
peersToAsk := c.config.Peers.GetPeersAddresses(peers.Worker)
if peersToAsk == nil || len(peersToAsk) == 0 {
c.config.Log.Info("Peers list is empty and / or couldn't be retrieved from server, nothing we can do, so consider the node being healthy")
//todo maybe we need to check if this happens too much and reboot
// TODO: maybe we need to check if this happens too much and reboot
return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseNoPeersWereFound}
}

apiErrorsResponsesSum := 0
nrAllNodes := len(nodesToAsk)
// nodesToAsk is being reduced in every iteration, iterate until no nodes left to ask
for i := 0; len(nodesToAsk) > 0; i++ {
nrAllPeers := len(peersToAsk)
// peersToAsk is being reduced at every iteration, iterate until no peers left to ask
for i := 0; len(peersToAsk) > 0; i++ {

batchSize := utils.GetNextBatchSize(nrAllNodes, len(nodesToAsk))
chosenNodesAddresses := c.popNodes(&nodesToAsk, batchSize)
healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenNodesAddresses)
batchSize := utils.GetNextBatchSize(nrAllPeers, len(peersToAsk))
chosenPeersIPs := c.popPeerIPs(&peersToAsk, batchSize)
healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenPeersIPs)
if healthyResponses+unhealthyResponses+apiErrorsResponses > 0 {
c.timeOfLastPeerResponse = time.Now()
}
Expand All @@ -161,9 +161,9 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response {
if apiErrorsResponses > 0 {
c.config.Log.Info("Peer can't access the api-server")
apiErrorsResponsesSum += apiErrorsResponses
//todo consider using [m|n]hc.spec.maxUnhealthy instead of 50%
if apiErrorsResponsesSum > nrAllNodes/2 { //already reached more than 50% of the nodes and all of them returned api error
//assuming this is a control plane failure as others can't access api-server as well
// TODO: consider using [m|n]hc.spec.maxUnhealthy instead of 50%
if apiErrorsResponsesSum > nrAllPeers/2 { // already reached more than 50% of the peers and all of them returned api error
// assuming this is a control plane failure as others can't access api-server as well
c.config.Log.Info("More than 50% of the nodes couldn't access the api-server, assuming this is a control plane failure")
return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseMostPeersCantAccessAPIServer}
}
Expand All @@ -185,47 +185,48 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response {
}

func (c *ApiConnectivityCheck) canOtherControlPlanesBeReached() bool {
nodesToAsk := c.config.Peers.GetPeersAddresses(peers.ControlPlane)
numOfControlPlanePeers := len(nodesToAsk)
peersToAsk := c.config.Peers.GetPeersAddresses(peers.ControlPlane)
numOfControlPlanePeers := len(peersToAsk)
if numOfControlPlanePeers == 0 {
c.config.Log.Info("Peers list is empty and / or couldn't be retrieved from server, other control planes can't be reached")
return false
}

chosenNodesAddresses := c.popNodes(&nodesToAsk, numOfControlPlanePeers)
healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenNodesAddresses)
chosenPeersIPs := c.popPeerIPs(&peersToAsk, numOfControlPlanePeers)
healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenPeersIPs)

// Any response is an indication of communication with a peer
return (healthyResponses + unhealthyResponses + apiErrorsResponses) > 0
}

func (c *ApiConnectivityCheck) popNodes(nodes *[][]v1.NodeAddress, count int) []string {
nrOfNodes := len(*nodes)
if nrOfNodes == 0 {
return []string{}
func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]corev1.PodIP, count int) []corev1.PodIP {
nrOfPeers := len(*peersIPs)
if nrOfPeers == 0 {
return []corev1.PodIP{}
}

if count > nrOfNodes {
count = nrOfNodes
if count > nrOfPeers {
count = nrOfPeers
}

//todo maybe we should pick nodes randomly rather than relying on the order returned from api-server
addresses := make([]string, count)
// TODO: maybe we should pick nodes randomly rather than relying on the order returned from api-server
selectedIPs := make([]corev1.PodIP, count)
for i := 0; i < count; i++ {
nodeAddresses := (*nodes)[i]
if len(nodeAddresses) == 0 || nodeAddresses[0].Address == "" {
c.config.Log.Info("ignoring node without IP address")
ip := (*peersIPs)[i]
if ip.IP == "" {
// This should not happen, but keeping it for good measure.
c.config.Log.Info("ignoring peers without IP address")
continue
}
addresses[i] = nodeAddresses[0].Address //todo node might have multiple addresses or none
selectedIPs[i] = ip
}

*nodes = (*nodes)[count:] //remove popped nodes from the list
*peersIPs = (*peersIPs)[count:] //remove popped nodes from the list

return addresses
return selectedIPs
}

func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []string) (int, int, int, int) {
func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []corev1.PodIP) (int, int, int, int) {
nrAddresses := len(addresses)
responsesChan := make(chan selfNodeRemediation.HealthCheckResponseCode, nrAddresses)

Expand All @@ -237,9 +238,9 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []string) (int
}

// getHealthStatusFromPeer issues a GET request to the specified IP and returns the result from the peer into the given channel
func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, results chan<- selfNodeRemediation.HealthCheckResponseCode) {
func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) {

logger := c.config.Log.WithValues("IP", endpointIp)
logger := c.config.Log.WithValues("IP", endpointIp.IP)
logger.Info("getting health status from peer")

if err := c.initClientCreds(); err != nil {
Expand All @@ -249,7 +250,7 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, result
}

// TODO does this work with IPv6?
phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds)
phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp.IP, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds)
if err != nil {
logger.Error(err, "failed to init grpc client")
results <- selfNodeRemediation.RequestFailed
Expand Down
81 changes: 53 additions & 28 deletions pkg/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-logr/logr"
commonlabels "github.com/medik8s/common/pkg/labels"
pkgerrors "github.com/pkg/errors"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -36,7 +37,7 @@ type Peers struct {
myNodeName string
mutex sync.Mutex
apiServerTimeout time.Duration
workerPeersAddresses, controlPlanePeersAddresses [][]v1.NodeAddress
workerPeersAddresses, controlPlanePeersAddresses []v1.PodIP
}

func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Reader, log logr.Logger, apiServerTimeout time.Duration) *Peers {
Expand All @@ -47,8 +48,8 @@ func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Read
myNodeName: myNodeName,
mutex: sync.Mutex{},
apiServerTimeout: apiServerTimeout,
workerPeersAddresses: [][]v1.NodeAddress{},
controlPlanePeersAddresses: [][]v1.NodeAddress{},
workerPeersAddresses: []v1.PodIP{},
controlPlanePeersAddresses: []v1.PodIP{},
}
}

Expand Down Expand Up @@ -76,30 +77,37 @@ func (p *Peers) Start(ctx context.Context) error {
p.controlPlanePeerSelector = createSelector(hostname, getControlPlaneLabel(myNode))
}

go wait.UntilWithContext(ctx, func(ctx context.Context) {
p.updateWorkerPeers(ctx)
p.updateControlPlanePeers(ctx)
}, p.peerUpdateInterval)
var updatePeersError error
cancellableCtx, cancel := context.WithCancel(ctx)

p.log.Info("peers started")
p.log.Info("peer starting", "name", p.myNodeName)
wait.UntilWithContext(cancellableCtx, func(ctx context.Context) {
updatePeersError = p.updateWorkerPeers(ctx)
if updatePeersError != nil {
cancel()
}
updatePeersError = p.updateControlPlanePeers(ctx)
if updatePeersError != nil {
cancel()
}
}, p.peerUpdateInterval)

<-ctx.Done()
return nil
return updatePeersError
}

func (p *Peers) updateWorkerPeers(ctx context.Context) {
setterFunc := func(addresses [][]v1.NodeAddress) { p.workerPeersAddresses = addresses }
func (p *Peers) updateWorkerPeers(ctx context.Context) error {
setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.workerPeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
return p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updateControlPlanePeers(ctx context.Context) {
setterFunc := func(addresses [][]v1.NodeAddress) { p.controlPlanePeersAddresses = addresses }
func (p *Peers) updateControlPlanePeers(ctx context.Context) error {
setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
return p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses [][]v1.NodeAddress)) {
func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) error {
p.mutex.Lock()
defer p.mutex.Unlock()

Expand All @@ -111,37 +119,54 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec
if err := p.List(readerCtx, &nodes, client.MatchingLabelsSelector{Selector: getSelector()}); err != nil {
if errors.IsNotFound(err) {
// we are the only node at the moment... reset peerList
p.workerPeersAddresses = [][]v1.NodeAddress{}
p.workerPeersAddresses = []v1.PodIP{}
}
p.log.Error(err, "failed to update peer list")
return
return pkgerrors.Wrap(err, "failed to update peer list")
}

pods := v1.PodList{}
listOptions := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
"app.kubernetes.io/name": "self-node-remediation",
"app.kubernetes.io/component": "agent",
}),
}
if err := p.List(readerCtx, &pods, listOptions); err != nil {
p.log.Error(err, "could not get pods")
return pkgerrors.Wrap(err, "could not get pods")
}

nodesCount := len(nodes.Items)
addresses := make([][]v1.NodeAddress, nodesCount)
addresses := make([]v1.PodIP, nodesCount)
for i, node := range nodes.Items {
addresses[i] = node.Status.Addresses
for _, pod := range pods.Items {
if pod.Spec.NodeName == node.Name {
if pod.Status.PodIPs == nil || len(pod.Status.PodIPs) == 0 {
return pkgerrors.New(fmt.Sprintf("empty Pod IP for Pod %s on Node %s", pod.Name, node.Name))
}
addresses[i] = pod.Status.PodIPs[0]
}
}
}
setAddresses(addresses)
return nil
}

func (p *Peers) GetPeersAddresses(role Role) [][]v1.NodeAddress {
func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP {
p.mutex.Lock()
defer p.mutex.Unlock()

var addresses [][]v1.NodeAddress
var addresses []v1.PodIP
if role == Worker {
addresses = p.workerPeersAddresses
} else {
addresses = p.controlPlanePeersAddresses
}
//we don't want the caller to be able to change the addresses
//so we create a deep copy and return it
addressesCopy := make([][]v1.NodeAddress, len(addresses))
for i := range addressesCopy {
addressesCopy[i] = make([]v1.NodeAddress, len(addresses[i]))
copy(addressesCopy, addresses)
}
addressesCopy := make([]v1.PodIP, len(addresses))
copy(addressesCopy, addresses)

return addressesCopy
}
Expand Down
Loading