Skip to content

Commit

Permalink
code refactoring and implementation of startTimes status field
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgarcia committed Mar 20, 2019
1 parent 343c800 commit 7d1096b
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 92 deletions.
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@ dep:
docker-build:
docker build . -t ${IMG}

# Install CRDs and RBACs into a cluster
install:
crds:
kubectl apply -f config/crds

permissions:
kubectl apply -f config/rbac

run: crds
python3 src/main.py --kubeconfig ~/.kube/config

install: crds permissions

# Deploy controller in the configured Kubernetes cluster in ~/.kube/config
deploy: install
kubectl apply -f config/default --namespace=system
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ INFO:controller:Controller starting
Once the operator is running you can create immortal containers using a custom resource like this one:

```yaml
apiVersion: exampleoperator.flugel.it/v1alpha1
apiVersion: immortalcontainer.flugel.it/v1alpha1
kind: ImmortalContainer
metadata:
name: example-immortal-container
Expand Down
33 changes: 11 additions & 22 deletions config/crds/exampleoperator_v1alpha1_immortalcontainer.yaml
Original file line number Diff line number Diff line change
@@ -1,34 +1,30 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
creationTimestamp: null
labels:
controller-tools.k8s.io: "1.0"
name: immortalcontainers.exampleoperator.flugel.it
name: immortalcontainers.immortalcontainer.flugel.it
spec:
group: exampleoperator.flugel.it
group: immortalcontainer.flugel.it
names:
kind: ImmortalContainer
listKind: ImmortalContainerList
plural: immortalcontainers
singular: immortalcontainer
scope: Namespaced
subresources:
status: {}
validation:
openAPIV3Schema:
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
properties:
image:
minLength: 1
type: string
required:
- image
Expand All @@ -37,15 +33,8 @@ spec:
properties:
currentPod:
type: string
required:
- currentPod
startTimes:
format: int64
type: integer
type: object
required:
- spec
version: v1alpha1
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
version: v1alpha1
3 changes: 2 additions & 1 deletion config/default/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ spec:
labels:
app: exampleoperatorpy-controller
spec:
serviceAccountName: immortalcontainer-operator
containers:
- image: exampleoperatorpy:dev
- image: flugelit/immortalcontainer-operator-py
name: exampleoperatorpy-controller
2 changes: 1 addition & 1 deletion config/example-use.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: exampleoperator.flugel.it/v1alpha1
apiVersion: immortalcontainer.flugel.it/v1alpha1
kind: ImmortalContainer
metadata:
name: example-immortal-container
Expand Down
4 changes: 2 additions & 2 deletions config/rbac/rbac_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
creationTimestamp: null
name: manager-role
name: immortalcontainer-operator
rules:
- apiGroups:
- ""
Expand All @@ -29,7 +29,7 @@ rules:
- patch
- delete
- apiGroups:
- exampleoperator.flugel.it
- immortalcontainer.flugel.it
resources:
- immortalcontainers
verbs:
Expand Down
7 changes: 3 additions & 4 deletions config/rbac/rbac_role_binding.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
creationTimestamp: null
name: manager-rolebinding
name: immortalcontainer-operator-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: manager-role
name: immortalcontainer-operator
subjects:
- kind: ServiceAccount
name: default
name: immortalcontainer-operator
namespace: system
4 changes: 4 additions & 0 deletions config/rbac/service_account.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: immortalcontainer-operator
Binary file modified docs/components_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/components_diagram.xml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
<mxfile modified="2019-02-26T19:39:42.196Z" host="www.draw.io" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 Safari/537.36" etag="1Zw0wu8jTkUS1dCgj4mi" version="10.2.8" type="device"><diagram name="Page-1" id="e56a1550-8fbb-45ad-956c-1786394a9013">7Vpbc9soFP4t++DHeITufoyTuO1sdyaz2Zlt92WHSNhigoWKcOL01y8gsC7ItpJYSZtuMpOIAxwdzuXjHNDEu1hvPzBYZH/QFJGJ66TbiXc5cd0oDsVfSXisCGHsVIQVw2lFAjXhBn9HmmiGbXCKytZATinhuGgTE5rnKOEtGmSMPrSHLSlpv7WAK2QRbhJIbOrfOOWZpgLHqTs+IrzK9KvjQHfcwuRuxegm1++buN5S/VTda2h46fFlBlP60CB5VxPvglHKq6f19gIRqVqjtmreYk/vTm6Gcj5kQpZeBdD5XqJ/fkefr+B8cfnh5swLKjb3kGyQWYeSlj8aDak1IsnFmXjzhwxzdFPARPY+CJcQtIyviWgB8bhbpRxbckbv0AUllClWXpjE6HYpepaYkAY9hSheJoKupUGMo+3edYKd9oRTIrpGnD2KIXpCEGqFa4d0jUM+1OYNfU3LmpbVNKg9arVjXWtVPGjFPkXJ/rtXMojeXMneu1dy4AbDlAyiYCQtA0vJ1zQtpUiQJxliEzckQoj5rXxa8Z0enmuEU2gx7rhqDx4At0eL4Viu6lpK/LReU8ahcKScQ5wj1lLpD6ZAN/bfWIEDNi25Yix2+c/wFpFrWmKOaS66binndN3WkRl7TvBKjuFUKhHqViLUJKzQxYCc5sgOf0f9VuhRSFHW25VMm6bw+4ah6bcN2qB/V0hYGEuQEH0pFvw7TFvQsyS4+Hg6a0Zu25p+D6iAPlDxR8OUXlARBHQv1ySkgHlKegLB6BhtJdDUYQHa5u2LkMqWJuHzTqPa2O8iTY9q+wIlGCtQgJ15pLgsJLBY2jwQELugQXl6LnNuSSM0uZOkLeZf9Dz5/FW67TTQrcut9mLVeDSNXCzuS7PRmCWb9TTVMvMqmVFqZfQdC4l10Q1L0CHNVOM4ZCvEjzqnbfKGSQ08MkQgx/dt2fpMqrldU6zce09g7iokw6Jak57VTPQ7jGLnCKNq0RYj5WG7Jb7A6Wx4tva3nzC6o6i7Dc6s6HZfN7qj/6P7YIZ1PLqDV4tu0PEe339mdAdHGI0d3XFP8lUl/UuqBK29Lvy2oabjrFRHP+diAAiLbd1pCoUHyu5Ep0qRDEchYcXU1BJHqwtewcXRXE2TGmEhX6SPp1zftDXjvrBZ4zSVnT2JYjN9OwXydEzu9xTbu/OqVso2WgVjlzCWJRpokhBYlirhbej7ZdAydVzQhhcQxUcARrWuReotlCANdfKcIhgIO8dRJziwj7wQibrlXNTdnwYjUei23dILByGR8Ar42BhWyAHlfoFns47766Oe2oErjieFOXfAcdLYHu4EbQ+fRW/t4EOz5vfi4B3HC71RHBw43eQAOAcFsyb44ewVQsIuJv9ECc0TLAJCBgbkaMAO/aPl9UEHXoI3r9rdAedbz8QecBh8RKMLHw1AasPR62XzQ7fVykF/etgJvQ4jZ1j9/lTYCbtppR+PDyJmbT3lAyXPLx4MD4INRbqK6/QdPEig2sgDdpinslmgpFFv1Bxsnp+Wevz0YsOkH1fnlL8peZ2WVOZJyVzJUsjBw9+jhjs5lVNFYJbyP2W9Asj3HxEhYUjB8xOk2BRpNeWQEg9wEkRp0pMXan07w97ijIqeJVE4mQkayseozzonQ4EfWjuI5/UgD/DG2kK8AQWa2YjxWn0icfxG5OidCpEd8913EY17Ef1lhDdXLzuXLsy1AaBpLPFWJgJzLc9lxrn8BuRcasJdJGnuT7FIN5ZYJAxsmog3ugvhoVD8k/RSNtE9IrRA7KxE/ExkawvgClBblAk4E0/TIl917W/d7DrqZ7L/amfPpc0JHCk0H9MccCS1IMuRDPH0jhRajiRhgFGizmydvzIBLOm+6G5epw8MdOs0ZW9o9+FA+2701IEOnFnHQF6PgfpOYtzRAt0+A+7eyP/iNgJR9NY2mlk2Onzh/4tbzI3HiyrRrD+5q5La+rNG7+o/</diagram></mxfile>
<mxfile modified="2019-03-20T16:37:01.258Z" host="www.draw.io" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36" etag="1QIccmz3M2DotoC2LbBq" version="10.5.2" type="device"><diagram name="Page-1" id="e56a1550-8fbb-45ad-956c-1786394a9013">7VpZU+M4EP41eSTlO84jCYSZhamilt2a42VKsRVbi2x5bIWE+fWrtuX4UhIDDswyG6ooqyW1pD4+dUsamfNoe5WiJPzEfExHhuZvR+bFyDAmtiX+A+FREjSzIAQp8QuSXhHuyE8siZqkromPs0ZDzhjlJGkSPRbH2OMNGkpTtmk2WzHaHDVBAe4Q7jxEu9TPxOehpOqaVlV8wCQI5dCuLSuWyLsPUraO5Xgjw1zlv6I6QiUv2T4Lkc82NZJ5OTLnKWO8+Iq2c0xBtKXYin6LPbW7eac45n06hP6ljbSfGf52jW8u0WxxcXV3ZtoFmwdE17hcRz5b/lhKKF8jBi7ayJxtQsLxXYI8qN0IkxC0kEdUlHTxuVsltM14yu7xnFGW5qxMx3PxciVqVoTSGt1H2F15gt5dlFznA0453tZIcpFXmEWYp4+iiay1HSlwaZCGK8ubSr2OJWlhXbOShqRFBTvWlVTFhxTsU4RsvXsh65M3F7L57oVsG3Y/IetT+0RS1jtCvmV+BlNC3AtxOjIcKiYxW8JXwHdyeK4ShpCi2zJVBR7ohkKKzqlM1egI8WMUsZQjYUgxRyTGaUOkv5gADdd6YwH22LRgeUTs8jdoiektywgnLBZVS8Y5i5oyKtueUxJAG85AiEiWPCE5oYU2BsQsxl331/K/Aj0SmEq0DSBsGqOf6xSPf6zxGn8PsNAwAZAQdT4R/FtMG9CzoiT5MJw2J0ZTm5YCVHQVqFgnwxQlqAgCfoBlilmg2KcKRyhljLcANJVb6E31qjyk0GUZ8JnDiNa12kijEK3KUexTOYrejTx8kiUALB1pHnCIndPg2D+HmBtolHn3QNoS/kX2g++vYLZjW5YuttKK88JjWYjF4r7UC7VeUKy65aWy314dZWydeviQIIp2HKUB5j1sEfuNrKGr8ppKS3hMMUWcPDTzCpVKJbdbRnLz3uOYuwypZFEsUvaqB/otRq52hFEhhQ6j3MJ2S3yB0XXhubO//Qe9ezJpb4PTjncbr+vdk/+9ux5QHfdu++28W29Zj2U907vtI4xO7d2uIvgqgv4VyydaWZ3zY83KirMsP/o5Fw10J9lWlWWisGHpvajMQ6SSo5hhwbTMJY5mF7yAi6OxmiTV3AIGksdThlWWJWOV20TE96FSESjWw7chkKelckuRbO/Oqxoh28kymG4K09FEDU08irIsD3hr8n4ZtIw1Q2/Ciz5xjwBMXroVobcQAijqpTGF3RN2noE69oF95IVI1E7nJu39qTcSOUbTLE2nFxIJq0CPtWYJNMj2T3g6bZm/POqpDLjgOCjMGT2Ok05t4ZrdtPDp5JUNvG/U/G4NvGV4jnkSA9e1dnCgawcn1ulgOdNXcIluMvkn9ljsEeEQ4BiI4x479K8W19steLHfPGs3epxvPRN79MPgIwpt+KgBUhOOThbN991WS3t8b7DjmC1GWr/8/amw47TDSss9PYiUa1OkD4w+P3koeVBSUhY4T4M1HgI4qQ4g2PIfuFA25tCK5QaAfJg/CE3LEuyBMGNfYts6G9fSkmqg7tAfV6JLDBwpi4PisGNLMmA7z4fh6zTuy2wuZgW4KheS5CejPl6RuDjSfsKU0K5/BGcEJA52bOsMh5gsDBXjTTlgr85/J37RWaWsQgO5M3NQ63ydArIUB8WFku6EX/C/BHhlBwcURLC0wfNH1Ya1N2dkomZFc/gOBQ3Hp0gbJ3pzYzPczsZmaU4XEHW3FecMd0nbI28s4wMS5S83jl/UHL3qoVAx2z3XqF3XyAcb5iwf7Bx8nksFoLKwIluIT2ZyPhch5/A05RwkYSw8P7bGRERBwn98nI49MaKxEIYsPGAB9AyK+AFTluD0LMP8TAh3oYMuFpmnn4mvcSI88diFs5b/RvtvnPbcJQ1gSM7EaRpSmejVIyTHUhiSY73ckCJf/5qG139cG9bN9/jim/bpJutxDy2ArcBztYPXL/p7+nrnnGevd6ugoHlrWz9cMgZQka61b7lVZ0SqKNYYIIpVqqibQB++5f7dFNZ5pqS4VbfMV1TY4fSOMgG2v5+SnLHd2kRNhZqG8StRrJ4DFgF39eTSvPwX</diagram></mxfile>
96 changes: 47 additions & 49 deletions src/controller.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import queue
import threading
from pprint import pprint

from kubernetes.client.rest import ApiException
from kubernetes.client import models
Expand Down Expand Up @@ -62,19 +61,19 @@ def _handle_pod_event(self, event):

def _handle_immortalcontainer_event(self, event):
"""Handle an event from the immortalcontainers watcher putting the
resource name in the `workqueue`."""
object name in the `workqueue`."""
self._queue_work(event['object']['metadata']['namespace'] +
"/"+event['object']['metadata']['name'])

def _queue_work(self, resource_key):
"""Add a resource name to the work queue."""
if len(resource_key.split("/")) != 2:
logger.error("Invalid resource key: {:s}".format(resource_key))
def _queue_work(self, object_key):
"""Add a object name to the work queue."""
if len(object_key.split("/")) != 2:
logger.error("Invalid object key: {:s}".format(object_key))
return
self.workqueue.put(resource_key)
self.workqueue.put(object_key)

def run(self):
"""Dequeue and process resources from the `workqueue`. This method
"""Dequeue and process objects from the `workqueue`. This method
should not be called directly, but using `start()"""
self.running = True
logger.info('Controller starting')
Expand All @@ -88,76 +87,75 @@ def run(self):
self.workqueue.task_done()
except Exception as ex:
logger.error(
"Error _reconcile state {:s} {:s}".format(e, str(ex)))
"Error _reconcile state {:s}".format(e),
exc_info=True)

def stop(self):
"""Stops this controller thread"""
self.running = False
self.workqueue.put(None)

def _reconcile_state(self, resource_key):
"""Make changes to go from current state to desired state and update
resource status."""
logger.info("Reconcile state: {:s}".format(resource_key))
ns, name = resource_key.split("/")
def _reconcile_state(self, object_key):
"""Make changes to go from current state to desired state and updates
object status."""
logger.info("Reconcile state: {:s}".format(object_key))
ns, name = object_key.split("/")

# Get resource if it exists
# Get object if it exists
try:
immortalcontainer = self.customsapi.get_namespaced_custom_object(
self.custom_group, self.custom_version, ns, self.custom_plural, name)
except ApiException as e:
if e.status == 404:
logger.info(
"Element {:s} in workqueue no longel exist".format(resource_key))
"Element {:s} in workqueue no longer exist".format(object_key))
return
raise e

# Get resource status
status = self._get_status(immortalcontainer)

# Get associated pod
# Create pod definition
pod_definition = self._new_pod(immortalcontainer)
pod = None
if status['currentPod'] != "":
try:
pod = self.corev1api.read_namespaced_pod(
status['currentPod'], ns)
except ApiException as e:
if e.status != 404:
logger.info("Error retrieving pod {:s} for immortalcontainer {:s}".format(
status['currentPod'], resource_key))
raise e
try:
pod = self.corev1api.read_namespaced_pod(
pod_definition.metadata.name, ns)
except ApiException as e:
if e.status != 404:
logger.info("Error retrieving pod {:s} for immortalcontainer {:s}".format(
pod_definition.metadata.name, object_key))
raise e

if pod is None:
# If no pod exists create one
pod_request = self._new_pod(immortalcontainer)
pod = self.corev1api.create_namespaced_pod(ns, pod_request)

# update status
self._update_status(immortalcontainer, pod)
pod = self.corev1api.create_namespaced_pod(ns, pod_definition)
# update status
self._update_status(immortalcontainer, pod)

def _update_status(self, immortalcontainer, pod):
"""Updates an ImmortalContainer status"""
new_status = self._calculate_status(immortalcontainer, pod)
self.customsapi.patch_namespaced_custom_object(
self.custom_group, self.custom_version,
immortalcontainer['metadata']['namespace'],
self.custom_plural, immortalcontainer['metadata']['name'],
new_status
)
try:
self.customsapi.patch_namespaced_custom_object_status(
self.custom_group, self.custom_version,
immortalcontainer['metadata']['namespace'],
self.custom_plural, immortalcontainer['metadata']['name'],
new_status
)
except Exception as e:
logger.error("Error updating status for ImmortalContainer {:s}/{:s}".format(
immortalcontainer['metadata']['namespace'], immortalcontainer['metadata']['name']))

def _calculate_status(self, immortalcontainer, pod):
"""Calculates what the status of an ImmortalContainer should be """
new_status = copy.deepcopy(immortalcontainer)
new_status['status'] = dict(currentPod=pod.metadata.name)
return new_status

def _get_status(self, immortalcontainer):
"""Get the status from an ImmortalContainer. If `immortalcontainer`
has no status, returns a default status."""
if 'status' in immortalcontainer:
return immortalcontainer['status']
if 'status' in immortalcontainer and 'startTimes' in immortalcontainer['status']:
startTimes = immortalcontainer['status']['startTimes']+1
else:
return dict(currentPod='')
startTimes = 1
new_status['status'] = dict(
currentPod=pod.metadata.name,
startTimes=startTimes
)
return new_status

def _new_pod(self, immortalcontainer):
"""Returns the pod definition to create the pod for an ImmortalContainer"""
Expand Down
3 changes: 1 addition & 2 deletions src/defs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
CUSTOM_GROUP = 'exampleoperator.flugel.it'
CUSTOM_GROUP = 'immortalcontainer.flugel.it'
CUSTOM_VERSION = 'v1alpha1'
CUSTOM_PLURAL = 'immortalcontainers'
CUSTOM_KIND = 'ImmortalContainer'

8 changes: 4 additions & 4 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import defs
from controller import Controller
from threadedwatch import ThreadedWatchStream
from threadedwatch import ThreadedWatcher

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
Expand All @@ -15,7 +15,7 @@
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'--kubeconfig', help='path tu kubeconfig file, only required if running outside of a cluster')
'--kubeconfig', help='path to kubeconfig file, only required if running outside of a cluster')
args = parser.parse_args()
if args.kubeconfig is not None:
config.load_kube_config()
Expand All @@ -26,8 +26,8 @@ def main():
customsapi = client.CustomObjectsApi()

# Changing this it's possible to work on all the namespaces or choose only one
pods_watcher = ThreadedWatchStream(corev1api.list_pod_for_all_namespaces)
immortalcontainers_watcher = ThreadedWatchStream(
pods_watcher = ThreadedWatcher(corev1api.list_pod_for_all_namespaces)
immortalcontainers_watcher = ThreadedWatcher(
customsapi.list_cluster_custom_object, defs.CUSTOM_GROUP,
defs.CUSTOM_VERSION, defs.CUSTOM_PLURAL
)
Expand Down
9 changes: 6 additions & 3 deletions src/threadedwatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
logger = logging.getLogger('threadedwatch')


class ThreadedWatchStream(threading.Thread):
class ThreadedWatcher(threading.Thread):
"""Watches Kubernetes resources event in a separate thread. Handlers for
events can be registered using `add_handler`.
Example:
v1 = kubernetes.client.CoreV1Api()
watcher = ThreadedWatchStream(v1.list_pod_for_all_namespaces)
watcher = ThreadedWatcher(v1.list_pod_for_all_namespaces)
def on_event(event):
print(event)
watcher.add_handler(on_event)
Expand Down Expand Up @@ -46,7 +46,10 @@ def run(self):
self.func, *self.func_args, **self.func_kwargs)
for event in stream:
for handler in self.handlers:
handler(event)
try:
handler(event)
except:
logger.error("Error in event handler", exc_info=True)

def stop(self):
"""Stops listening and dispatching events."""
Expand Down

0 comments on commit 7d1096b

Please sign in to comment.