diff --git a/k8s/base.py b/k8s/base.py index 98c78ef..851736f 100644 --- a/k8s/base.py +++ b/k8s/base.py @@ -19,6 +19,7 @@ import json import logging +from urllib.parse import urlencode, urlsplit, urlunsplit, parse_qs from collections import namedtuple import six @@ -117,20 +118,38 @@ def list(cls, namespace="default"): return [cls.from_dict(item) for item in resp.json()[u"items"]] @classmethod - def watch_list(cls, namespace=None): + def _watch_url_from_base_resource_url(cls, base_resource_url, start_at_resource_version=None): + scheme, netloc, path, query, fragment = urlsplit(base_resource_url) + url_query_params = parse_qs(query) + + url_query_params["watch"] = [1] + url_query_params["allowWatchBookmarks"] = ["true"] + + if start_at_resource_version is None: + url_query_params["resourceVersion"] = [] + else: + url_query_params["resourceVersion"] = [start_at_resource_version] + + query = urlencode(url_query_params, doseq=True) + return urlunsplit((scheme, netloc, path, query, fragment)) + + @classmethod + def watch_list(cls, namespace=None, start_at_resource_version=None): """Return a generator that yields WatchEvents of cls""" if namespace: if cls._meta.watch_list_url_template: - url = cls._meta.watch_list_url_template.format(namespace=namespace) + base_resource_url = cls._meta.watch_list_url_template.format(namespace=namespace) else: raise NotImplementedError( "Cannot watch_list with namespace, no watch_list_url_template defined on class {}".format(cls)) else: - url = cls._meta.watch_list_url - if not url: + base_resource_url = cls._meta.watch_list_url + if not base_resource_url: raise NotImplementedError("Cannot watch_list, no watch_list_url defined on class {}".format(cls)) - resp = cls._client.get(url, stream=True, timeout=config.stream_timeout) + watch_url = cls._watch_url_from_base_resource_url(base_resource_url, start_at_resource_version) + + resp = cls._client.get(watch_url, stream=True, timeout=config.stream_timeout) for line in resp.iter_lines(chunk_size=None): if line: try: @@ -288,14 +307,20 @@ class WatchEvent(object): ADDED = "ADDED" MODIFIED = "MODIFIED" DELETED = "DELETED" + BOOKMARK = "BOOKMARK" def __init__(self, event_json, cls): self.type = event_json["type"] self.object = cls.from_dict(event_json["object"]) + self.resourceVersion = event_json["object"]["metadata"]["resourceVersion"] def __repr__(self): - return "{cls}(type={type}, object={object})".format(cls=self.__class__.__name__, type=self.type, - object=self.object) + return "{cls}(type={type}, object={object}, rv={resourceVersion})".format( + cls=self.__class__.__name__, + type=self.type, + object=self.object, + resourceVersion=self.resourceVersion + ) class LabelSelector(object): diff --git a/k8s/watcher.py b/k8s/watcher.py index fe4305c..3b87543 100644 --- a/k8s/watcher.py +++ b/k8s/watcher.py @@ -15,9 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - -import cachetools - from .base import WatchEvent DEFAULT_CAPACITY = 1000 @@ -27,32 +24,35 @@ class Watcher(object): """Higher-level interface to watch for changes in objects The low-level :py:meth:`~.watch_list` method will stop when the API-server drops the connection. - When reconnecting, the API-server will send a list of :py:const:`~k8s.base.WatchEvent.ADDED` - events for all objects, even if they have been seen before. + When reconnecting, the API-server lets the caller specify which watch event to restart from. The Watcher will hide this complexity for you, and make sure to reconnect when the - connection drops, and skip events that have already been seen. + connection drops, and restart the watch from the last observed event. :param Model model: The model class to watch - :param int capacity: How many seen objects to keep track of """ - def __init__(self, model, capacity=DEFAULT_CAPACITY): - self._seen = cachetools.LRUCache(capacity) + def __init__(self, model): self._model = model self._run_forever = True - def watch(self, namespace=None): + def watch(self, namespace=None, start_at_resource_version=None): """Watch for events :param str namespace: the namespace to watch for events in. The default (None) results in watching for events in all namespaces. + :param int start_at_resource_version: the resource version to begin watching at, excluding + the version number itself. The default (None) will send the most recent event, then + begin watching events occuring thereafter. :return: a generator that yields :py:class:`~.WatchEvent` objects not seen before """ + + last_seen_version = start_at_resource_version + while self._run_forever: - for event in self._model.watch_list(namespace=namespace): - o = event.object - key = (o.metadata.name, o.metadata.namespace) - if self._seen.get(key) == o.metadata.resourceVersion and event.type != WatchEvent.DELETED: - continue - self._seen[key] = o.metadata.resourceVersion - yield event + for event in self._model.watch_list( + namespace=namespace, + start_at_resource_version=last_seen_version + ): + last_seen_version = event.resourceVersion + if event.type != WatchEvent.BOOKMARK: + yield event diff --git a/tests/k8s/test_base.py b/tests/k8s/test_base.py index ac7d8f4..fb82672 100644 --- a/tests/k8s/test_base.py +++ b/tests/k8s/test_base.py @@ -31,21 +31,29 @@ class Meta: class TestWatchEvent(object): + + common_object = {"value": 42, "metadata": {"resourceVersion": 1}} + def test_watch_event_added(self): - watch_event = WatchEvent({"type": "ADDED", "object": {"value": 42}}, Example) + watch_event = WatchEvent({"type": "ADDED", "object": self.common_object}, Example) assert watch_event.type == WatchEvent.ADDED assert watch_event.object == Example(value=42) def test_watch_event_modified(self): - watch_event = WatchEvent({"type": "MODIFIED", "object": {"value": 42}}, Example) + watch_event = WatchEvent({"type": "MODIFIED", "object": self.common_object}, Example) assert watch_event.type == WatchEvent.MODIFIED assert watch_event.object == Example(value=42) def test_watch_event_deleted(self): - watch_event = WatchEvent({"type": "DELETED", "object": {"value": 42}}, Example) + watch_event = WatchEvent({"type": "DELETED", "object": self.common_object}, Example) assert watch_event.type == WatchEvent.DELETED assert watch_event.object == Example(value=42) + def test_watch_event_bookmark(self): + watch_event = WatchEvent({"type": "BOOKMARK", "object": self.common_object}, Example) + assert watch_event.type == WatchEvent.BOOKMARK + assert watch_event.object == Example(value=42) + class TestFind(object): @pytest.fixture diff --git a/tests/k8s/test_client.py b/tests/k8s/test_client.py index a0dc552..bfd75b0 100644 --- a/tests/k8s/test_client.py +++ b/tests/k8s/test_client.py @@ -122,13 +122,22 @@ def test_watch_list_with_namespace_should_raise_exception_when_watch_list_url_te def test_watch_list(self, session): list(WatchListExample.watch_list()) session.request.assert_called_once_with( - "GET", _absolute_url("/watch/example"), json=None, timeout=config.stream_timeout, stream=True + "GET", _absolute_url("/watch/example?watch=1&allowWatchBookmarks=true"), + json=None, timeout=config.stream_timeout, stream=True ) def test_watch_list_with_namespace(self, session): list(WatchListExample.watch_list(namespace="explicitly-set")) session.request.assert_called_once_with( - "GET", _absolute_url("/watch/explicitly-set/example"), json=None, timeout=config.stream_timeout, stream=True + "GET", _absolute_url("/watch/explicitly-set/example?watch=1&allowWatchBookmarks=true"), + json=None, timeout=config.stream_timeout, stream=True + ) + + def test_watch_list_starting_from_resource_version(self, session): + list(WatchListExample.watch_list(start_at_resource_version=10)) + session.request.assert_called_once_with( + "GET", _absolute_url("/watch/example?watch=1&allowWatchBookmarks=true&resourceVersion=10"), + json=None, timeout=config.stream_timeout, stream=True ) def test_list_without_namespace_should_raise_exception_when_list_url_is_not_set_on_metaclass(self, session): diff --git a/tests/k8s/test_watcher.py b/tests/k8s/test_watcher.py index f03e500..858c753 100644 --- a/tests/k8s/test_watcher.py +++ b/tests/k8s/test_watcher.py @@ -17,6 +17,7 @@ import mock +from mock import call import pytest import six @@ -28,6 +29,7 @@ ADDED = WatchEvent.ADDED MODIFIED = WatchEvent.MODIFIED DELETED = WatchEvent.DELETED +BOOKMARK = WatchEvent.BOOKMARK @pytest.mark.usefixtures("k8s_config", "logger") @@ -49,7 +51,7 @@ def test_multiple_events(self, api_watch_list): watcher._run_forever = False assert list(gen) == [] - api_watch_list.assert_called_with(namespace=None) + api_watch_list.assert_called_with(namespace=None, start_at_resource_version=None) def test_handle_reconnect(self, api_watch_list): events = [_event(0, ADDED, 1)] @@ -73,11 +75,11 @@ def test_handle_changes(self, api_watch_list): watcher._run_forever = False assert list(gen) == [] - def test_complicated(self, api_watch_list): + def test_many_reconnections(self, api_watch_list): first = [_event(0, ADDED, 1), _event(1, ADDED, 1), _event(2, ADDED, 1)] - second = [_event(0, ADDED, 1), _event(1, ADDED, 2), _event(2, ADDED, 1), _event(0, MODIFIED, 2)] - third = [_event(0, ADDED, 2), _event(1, DELETED, 2), _event(2, ADDED, 1), _event(2, MODIFIED, 2)] - fourth = [_event(0, ADDED, 2), _event(0, ADDED, 1, "other"), _event(0, MODIFIED, 2, "other")] + second = [_event(3, ADDED, 1), _event(4, ADDED, 2), _event(5, ADDED, 1), _event(6, MODIFIED, 2)] + third = [_event(7, ADDED, 2), _event(8, DELETED, 2), _event(9, ADDED, 1), _event(20, BOOKMARK, 2)] + fourth = [_event(25, ADDED, 2), _event(26, ADDED, 1), _event(27, MODIFIED, 2)] api_watch_list.side_effect = [first, second, third, fourth] watcher = Watcher(WatchListExample) gen = watcher.watch() @@ -88,20 +90,44 @@ def test_complicated(self, api_watch_list): _assert_event(next(gen), 2, ADDED, 1) # Second batch - _assert_event(next(gen), 1, ADDED, 2) - _assert_event(next(gen), 0, MODIFIED, 2) + _assert_event(next(gen), 3, ADDED, 1) + _assert_event(next(gen), 4, ADDED, 2) + _assert_event(next(gen), 5, ADDED, 1) + _assert_event(next(gen), 6, MODIFIED, 2) # Third batch - _assert_event(next(gen), 1, DELETED, 2) - _assert_event(next(gen), 2, MODIFIED, 2) + _assert_event(next(gen), 7, ADDED, 2) + _assert_event(next(gen), 8, DELETED, 2) + _assert_event(next(gen), 9, ADDED, 1) # Fourth batch - _assert_event(next(gen), 0, ADDED, 1, "other") - _assert_event(next(gen), 0, MODIFIED, 2, "other") + _assert_event(next(gen), 25, ADDED, 2) + _assert_event(next(gen), 26, ADDED, 1) + _assert_event(next(gen), 27, MODIFIED, 2) watcher._run_forever = False assert list(gen) == [] + def test_start_watching_from_bookmark(self, api_watch_list): + + bookmark_event = _event(30, BOOKMARK, 1) + first = [_event(0, ADDED, 1), _event(1, ADDED, 1), _event(2, ADDED, 1), bookmark_event] + second = [_event(31, ADDED, 1)] + + api_watch_list.side_effect = [first, second] + watcher = Watcher(WatchListExample) + gen = watcher.watch() + + _assert_event(next(gen), 0, ADDED, 1) + _assert_event(next(gen), 1, ADDED, 1) + _assert_event(next(gen), 2, ADDED, 1) + _assert_event(next(gen), 31, ADDED, 1) + + api_watch_list.assert_has_calls([ + call(namespace=None, start_at_resource_version=None), + call(namespace=None, start_at_resource_version=bookmark_event.resourceVersion), + ]) + def test_namespace(self, api_watch_list): namespace = "the-namespace" watcher = Watcher(WatchListExample) @@ -115,22 +141,27 @@ def stop_iteration(*args, **kwargs): assert list(gen) == [] - api_watch_list.assert_called_with(namespace=namespace) + api_watch_list.assert_called_with(namespace=namespace, start_at_resource_version=None) -def _event(id, event_type, rv, namespace="default"): - metadict = {"name": "name{}".format(id), "namespace": namespace, "resourceVersion": rv} - metadata = ObjectMeta.from_dict(metadict) - wle = WatchListExample(metadata=metadata, value=(id * 100) + rv) - return mock.NonCallableMagicMock(type=event_type, object=wle) +def _event(id, event_type, rv): + event_json = { + "type": event_type, + "object": { + "value": (id * 100) + rv, + "metadata": { + "resourceVersion": rv + } + }, + } + we = WatchEvent(event_json, WatchListExample) + return mock.NonCallableMagicMock(type=event_type, object=we) -def _assert_event(event, id, event_type, rv, namespace="default"): +def _assert_event(event, id, event_type, rv): assert event.type == event_type - o = event.object + o = event.object.object assert o.kind == "Example" - assert o.metadata.name == "name{}".format(id) - assert o.metadata.namespace == namespace assert o.value == (id * 100) + rv