Skip to content

Commit

Permalink
added handling for new 'kube-vip.io/loadbalancerIPs' annotation - see…
Browse files Browse the repository at this point in the history
… notes in 'README.md'!
  • Loading branch information
BBQigniter committed May 16, 2023
1 parent df93de9 commit bdb0d83
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 45 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ for better loadbalancing to pods (exposed with multiple VIPs - ergo RR-DNS) runn

> `kube-vip` itself must be running with the flag `svc_election` set to `true`
Tested with kube-vip version up to v0.5.11 - so far I've seen no real problems.
Successfully tested with `kube-vip` version up to `v0.5.12` and `kube-vip-cloud-provider` up to `v0.0.5` - so far I've seen no real problems.

> Currently the annotation `kube-vip.io/loadbalancerIPs` and `spec.loadBalancerIP` with the same VIP configured, must be used to make it work correctly!
With kube-vip v0.6.0 there is currently an issue resulting in unexpected behaviour - see https://github.com/kube-vip/kube-vip/issues/563!

## Workload examples

Expand Down Expand Up @@ -41,7 +45,8 @@ A simple echoserver example.

* possibly a few test-cases are not covered
* currently all logs are written to console, so if you send logs via syslog too and scrape logs of pods - logs might be duplicated
* the watcher is not yet working with new `kube-vip.io/loadbalancerIPs` annotation instead of `service.spec.loadbalancerIP`
* the watcher is now also working with new `kube-vip.io/loadbalancerIPs` annotation instead of only depending on `service.spec.loadbalancerIP`
but BOTH must be currently configured in service to make it work correctly!
* find better way to check if rebalancing is really needed :| - currently we patch the lease in some cases even though it's not really needed

# Libraries for Logging and Locking
Expand Down Expand Up @@ -138,3 +143,6 @@ autodiscover:
* v0.09 - 2023-05-09
- fixed balancing for multiple services pointing to same workload with different VIPs
- improved logging output a little bit
* v0.10 - 2023-05-16
- added fix so it should also work with the new annotation
- updated `workload-examples` to reflect notes from "Known Issues"
111 changes: 71 additions & 40 deletions kube-vip-watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
# place the "admin.conf" from one of the master-nodes as "~/.kube/config" in your home-folder
# if the "server:" is pointing to an IP/hostname managed via haproxy, haproxy might
# disconnect the session after a defined period set in the "haproxy.cfg". If possible you
# can change the "server:" value pointing directly to one of the master-nodes' API-port to
# can change the "server:" value pointing directly to one of the master-nodes API-port to
# simulate pretty much the same condition, like if the script is running as a pod.
# config.load_kube_config()
#config.load_kube_config()
# for loading config if script is running as pod/container
config.load_incluster_config()

Expand All @@ -57,7 +57,7 @@ def check_node_state(node_name):
logger.error("Exception when calling CoreV1Api->read_node_status: %s\n" % e)
node_conditions = []
# endtry

for condition in node_conditions:
logger.debug("Node %s condition: %s" % (node_name, str(condition).replace("\n", ""))) # ouput in single line which may be easier to be parsed by log-pattern analyzers
# logger.debug("Node %s condition: %s" % (node_name, condition)) # tried with "pretty=False" but somehow it's still "pretty-printed"
Expand All @@ -78,14 +78,14 @@ def check_container_state(pod_container_statuses):
logger.debug(pod_container_statuses)
number_of_containers = len(pod_container_statuses)
ready_containers = 0

for container_status in pod_container_statuses:
# container_status.ready is a boolean value
if container_status.ready:
ready_containers += 1
# endif
# endfor

if ready_containers == number_of_containers:
return True
else:
Expand All @@ -104,7 +104,7 @@ def get_namespaced_services_with_label(namespace, pod_labels_app, pod_name):
services = []
# endtry
list_of_services = []

# get all services with app-label
for service in services.items:
try:
Expand All @@ -116,7 +116,7 @@ def get_namespaced_services_with_label(namespace, pod_labels_app, pod_name):
continue
# endtry
# endfor

# now check if there is a service named exactly like the pod
single_service = next((item for item in list_of_services if item.metadata.name == pod_name), None)
# we need to return a list for the next function - it can be empty if no service with app-label was found
Expand Down Expand Up @@ -161,7 +161,7 @@ def get_namespaced_pods_with_label_on_node(namespace, pod_labels_app, pod_node_n
pods = []
# endtry
list_of_pods = []

# get all pods with app-label and node_name
for pod in pods.items:
try:
Expand All @@ -175,7 +175,7 @@ def get_namespaced_pods_with_label_on_node(namespace, pod_labels_app, pod_node_n
continue
# endtry
# endfor

logger.info("Number of suitable pods on node %s: %d" % (pod_node_name, len(list_of_pods)))
return list_of_pods
# enddef
Expand All @@ -194,40 +194,68 @@ def balance(list_of_services, pod_container_statuses, pod_node_name, pod_labels_
service_name = service.metadata.name
service_counter += 1
# logger.info("service_count: %d" % service_counter)

try:
# get the value of annotation 'kubeVipBalancePriority' and other values
namespace = service.metadata.namespace
load_balancer_ip = service.spec.load_balancer_ip

# from Kubernetes v1.24+ on "spec.loadBalancerIP" is going to be deprecated and should be changed to
# annotations. But currently both, the annotation "kube-vip.io/loadbalancerIPs" and the spec-parameter,
# must be used in a service-manifest to make it work correctly.
try:
load_balancer_ip = service.spec.load_balancer_ip # see https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ServiceSpec.md
logger.warning("Service: %s - 'spec.loadBalancerIP' is deprecated and will eventually be removed in some future Kubernetes version to be on the save side, use the annotation 'kube-vip.io/loadbalancerIPs' too." % service_name)
except:
load_balancer_ip = None
# endtry

# for kube-vip v0.5.12+ the VIP that should be used for the service is going to be saved in the annotation "kube-vip.io/loadbalancerIPs"
if load_balancer_ip is None:
try:
load_balancer_ip = service.metadata.annotations['kube-vip.io/loadbalancerIPs']
except:
# not really tested - theoretically it should work :)
logger.warning("Service: %s - No dedicated VIPs defined! - trying to get IPs from status" % service_name)

load_balancer_ip_list = []
for vip in service.status.load_balancer.ingress:
load_balancer_ip_list = load_balancer_ip.append(vip.ip)
# endfor

load_balancer_ip = ','.join(load_balancer_ip_list)
# endtry
# endif

try:
traffic_policy = service.spec.external_traffic_policy
except:
traffic_policy = None
# endtry

balance_priority_string = service.metadata.annotations['kubeVipBalancePriority']
balance_priority_order = str(balance_priority_string).split(",")
balance_priority_order = [sub_element.strip() for sub_element in balance_priority_order] # remove whitespaces for each element in the list fixed_section
logger.info("Service: %s - Balance Priority: %s - Traffic Policy: %s - Loadbalancer-IP: %s" % (service_name, balance_priority_order, traffic_policy, load_balancer_ip))
except:
logger.warning("Service: %s - Service missing annotation 'kubeVipBalancePriority'" % service_name)

# check if we need to continue with next service
if number_of_services == service_counter:
return False # end def
else:
continue # check next service
# endif
# endtry

try:
lease_holder = get_namespaced_leases(service_name, namespace)

# now we check if pod is running and if the lease_holder is corresponding to the first node in the priority list
# if not we patch the holder-value in the lease after checking that the node to swtich to is available and has a running pod
# TODO: find better way to check if rebalancing is really needed :| - currently we patch the lease in some cases even though it's not really needed
if lease_holder == balance_priority_order[0] and check_container_state(pod_container_statuses) and check_node_state(pod_node_name):
logger.info("Service: %s - Current lease holder OK and pod's containers are ready" % service_name)

# check if we need to continue with next service
if number_of_services == service_counter:
return True # end def
Expand All @@ -236,7 +264,7 @@ def balance(list_of_services, pod_container_statuses, pod_node_name, pod_labels_
# endif
else:
logger.warning("Service: %s - Detected wrong lease holder OR issues with pod's containers OR the node - further checking if VIP must be moved" % service_name)

service_ok = False
for node in balance_priority_order:
# we now go through the nodes until we find a suitable pod
Expand All @@ -260,13 +288,13 @@ def balance(list_of_services, pod_container_statuses, pod_node_name, pod_labels_
logger.debug(item)
pod_deletion_timestamp = None
"""

# if the pod is on the same node where the other failed and the remaining pod is healthy, the VIP doesn't need to be moved
if check_container_state(pod.status.container_statuses) \
and pod.spec.node_name == pod_node_name \
and lease_holder == balance_priority_order[0]:
logger.info("Service: %s - Found healthy pod %s on node %s. Current lease holder OK and no need to move VIP" % (service_name, pod.metadata.name, pod.spec.node_name))

# check if we need to continue with next service
if number_of_services == service_counter:
return True # end def
Expand All @@ -278,31 +306,33 @@ def balance(list_of_services, pod_container_statuses, pod_node_name, pod_labels_
# optional possible to only move if really needed with:
# if traffic_policy == "Local": # and indent code below a little
# We have found another ready pod on another node - we have to update the lease and the service manifest
try:
# we have to use "holderIdentity" instead of "holder_identity"
lease_body_patch = {"spec": {"holderIdentity": node}}
lease_patch_response = v1_coordination.patch_namespaced_lease("kubevip-" + service.metadata.name, namespace, lease_body_patch)
except Exception as e:
logger.error("Exception when calling CoordinationV1Api->patch_namespaced_lease: %s\n" % e)
sys.exit(1)
# endtry

try:
# if we do not patch this value kube-vip removes the VIP sometimes completely for about a minute
service_body_patch = {"metadata": {"annotations": {"kube-vip.io/vipHost": node}}}
service_patch_response = v1_core.patch_namespaced_service(service.metadata.name, namespace, service_body_patch)
logger.info("Service: %s - Patched service with annotation 'kube-vip.io/vipHost' %s" % (service_name, node))
except Exception as e:
logger.error("Exception when calling CoreV1Api->patch_namespaced_service: %s\n" % e)
sys.exit(1)
# endtry


try:
# we have to use "holderIdentity" instead of "holder_identity"
lease_body_patch = {"spec": {"holderIdentity": node}}
lease_patch_response = v1_coordination.patch_namespaced_lease("kubevip-" + service.metadata.name, namespace, lease_body_patch)
logger.info("Service: %s - Patched lease with 'holderIdentity' %s" % (service_name, node))
except Exception as e:
logger.error("Exception when calling CoordinationV1Api->patch_namespaced_lease: %s\n" % e)
sys.exit(1)
# endtry

if node == balance_priority_order[0]:
logger.info("Service: %s - HOLDER CHANGED TO PRIMARY NODE %s and service's annotation 'kube-vip.io/vipHost' updated to %s" % (
service_name,
lease_patch_response.spec.holder_identity,
service_patch_response.metadata.annotations['kube-vip.io/vipHost'])
)

# check if we need to continue with next service
if number_of_services == service_counter:
return True # end def
Expand All @@ -316,7 +346,7 @@ def balance(list_of_services, pod_container_statuses, pod_node_name, pod_labels_
lease_patch_response.spec.holder_identity,
service_patch_response.metadata.annotations['kube-vip.io/vipHost'])
)

# check if we need to continue with next service
if number_of_services == service_counter:
return True # end def
Expand Down Expand Up @@ -370,7 +400,7 @@ def balance(list_of_services, pod_container_statuses, pod_node_name, pod_labels_
def main():
logger_name = "main"
logger = Cplogging(logger_name)

# timeout_seconds=0 ... the connection will be closed by Kubernetes after about 1 hour
for item in w.stream(v1_core.list_pod_for_all_namespaces, timeout_seconds=0):
try:
Expand All @@ -382,22 +412,22 @@ def main():
# logger.debug(str(item))
pod_name = item['object'].metadata.name
namespace = item['object'].metadata.namespace

try:
pod_labels_app = item['object'].metadata.labels['app']
except:
pod_labels_app = None
logger.warning("App-Label not set")
# endtry

if pod_labels_app is not None:
pod_node_name = item['object'].spec.node_name
pod_status_phase = item['object'].status.phase
pod_container_statuses = item['object'].status.container_statuses

logger.info("Namespace %s - Pod: %s - App-Label: %s - Node-Name: %s - Status: %s - Stream-Event-Type: %s" % (namespace, pod_name, pod_labels_app, pod_node_name, pod_status_phase, item['type']))
# in the next step we will check if a rebalance is needed

# then we get the service corresponding to the pod
list_of_services = get_namespaced_services_with_label(namespace, pod_labels_app, pod_name)
if len(list_of_services) >= 1:
Expand All @@ -419,13 +449,13 @@ def main():
if __name__ == '__main__':
logger_name = "if_main"
logger = Cplogging(logger_name)

reconnect_time_threshold = 1 # if a reconnect happens within less than a second
reconnect_count_too_fast = 0 # init counter value
reconnect_max_tries = 5 # maximum ammount of reconnects in sequence
reconnect_tries_left = reconnect_max_tries # init variable to determine how many reconnects in sequence are still allowed
reconnect_in_seq = False # init value for dertermining if reconnect happened in sequence in set time_threshold

try:
# ugly solution for reconnect
# if reconnects happen too fast in sequence, this might indicate that there is some problem. So we exit the script/pod so that we do not hammer the Kubernetes API too much :)
Expand All @@ -434,7 +464,7 @@ def main():
main()
connect_stop = time.time()
connect_duration = connect_stop - connect_start

if connect_duration < reconnect_time_threshold:
reconnect_count_too_fast += 1
reconnect_tries_left -= 1
Expand All @@ -444,11 +474,11 @@ def main():
reconnect_in_seq = False
logger.info("reconnected to Kubernetes-API")
# endif

if reconnect_tries_left <= 0:
logger.error("reconnected too often, too fast in sequence which might indicate a problem. Restarting...")
sys.exit(1)

# if there was a successful reconnect in between, the recconnect_tries_left variable gets reset to the initial value
if not reconnect_in_seq:
if reconnect_count_too_fast > 0:
Expand All @@ -463,3 +493,4 @@ def main():
sys.exit(1)
# endtry
# endif

2 changes: 1 addition & 1 deletion kube-vip-watcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ spec:
app: kube-vip-watcher
spec:
containers:
- image: my-repo.home.arpa/kube-vip-watcher:v0.08
- image: my-repo.home.arpa/kube-vip-watcher:v0.10
name: kube-vip-watcher
livenessProbe:
failureThreshold: 3
Expand Down
Loading

0 comments on commit bdb0d83

Please sign in to comment.