Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Watcher to use Bookmark events and restart watch from last observed event #94

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions k8s/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import json
import logging
from urllib.parse import urlencode, urlsplit, urlunsplit, parse_qs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still support Python 2.7, so this fails. You can use six.moves to solve it: https://six.readthedocs.io/index.html?highlight=moves#module-six.moves

from collections import namedtuple

import six
Expand Down Expand Up @@ -117,20 +118,38 @@ def list(cls, namespace="default"):
return [cls.from_dict(item) for item in resp.json()[u"items"]]

@classmethod
def watch_list(cls, namespace=None):
def _watch_url_from_base_resource_url(cls, base_resource_url, start_at_resource_version=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the client.get call goes to requests.request in the end, passing any keyword arguments it gets, I think this whole thing could be replaced by simply passing a suitable params dictionary to the client.get call.
That would be much easier to work with, and easier to read than this method.

scheme, netloc, path, query, fragment = urlsplit(base_resource_url)
url_query_params = parse_qs(query)

url_query_params["watch"] = [1]
url_query_params["allowWatchBookmarks"] = ["true"]

if start_at_resource_version is None:
url_query_params["resourceVersion"] = []
else:
url_query_params["resourceVersion"] = [start_at_resource_version]

query = urlencode(url_query_params, doseq=True)
return urlunsplit((scheme, netloc, path, query, fragment))

@classmethod
def watch_list(cls, namespace=None, start_at_resource_version=None):
"""Return a generator that yields WatchEvents of cls"""
if namespace:
if cls._meta.watch_list_url_template:
url = cls._meta.watch_list_url_template.format(namespace=namespace)
base_resource_url = cls._meta.watch_list_url_template.format(namespace=namespace)
else:
raise NotImplementedError(
"Cannot watch_list with namespace, no watch_list_url_template defined on class {}".format(cls))
else:
url = cls._meta.watch_list_url
if not url:
base_resource_url = cls._meta.watch_list_url
if not base_resource_url:
raise NotImplementedError("Cannot watch_list, no watch_list_url defined on class {}".format(cls))

resp = cls._client.get(url, stream=True, timeout=config.stream_timeout)
watch_url = cls._watch_url_from_base_resource_url(base_resource_url, start_at_resource_version)

resp = cls._client.get(watch_url, stream=True, timeout=config.stream_timeout)
for line in resp.iter_lines(chunk_size=None):
if line:
try:
Expand Down Expand Up @@ -288,14 +307,20 @@ class WatchEvent(object):
ADDED = "ADDED"
MODIFIED = "MODIFIED"
DELETED = "DELETED"
BOOKMARK = "BOOKMARK"

def __init__(self, event_json, cls):
self.type = event_json["type"]
self.object = cls.from_dict(event_json["object"])
self.resourceVersion = event_json["object"]["metadata"]["resourceVersion"]

def __repr__(self):
return "{cls}(type={type}, object={object})".format(cls=self.__class__.__name__, type=self.type,
object=self.object)
return "{cls}(type={type}, object={object}, rv={resourceVersion})".format(
cls=self.__class__.__name__,
type=self.type,
object=self.object,
resourceVersion=self.resourceVersion
)


class LabelSelector(object):
Expand Down
34 changes: 17 additions & 17 deletions k8s/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import cachetools

from .base import WatchEvent

DEFAULT_CAPACITY = 1000
Expand All @@ -27,32 +24,35 @@ class Watcher(object):
"""Higher-level interface to watch for changes in objects

The low-level :py:meth:`~.watch_list` method will stop when the API-server drops the connection.
When reconnecting, the API-server will send a list of :py:const:`~k8s.base.WatchEvent.ADDED`
events for all objects, even if they have been seen before.
When reconnecting, the API-server lets the caller specify which watch event to restart from.

The Watcher will hide this complexity for you, and make sure to reconnect when the
connection drops, and skip events that have already been seen.
connection drops, and restart the watch from the last observed event.

:param Model model: The model class to watch
:param int capacity: How many seen objects to keep track of
"""
def __init__(self, model, capacity=DEFAULT_CAPACITY):
self._seen = cachetools.LRUCache(capacity)
def __init__(self, model):
self._model = model
self._run_forever = True

def watch(self, namespace=None):
def watch(self, namespace=None, start_at_resource_version=None):
"""Watch for events

:param str namespace: the namespace to watch for events in. The default (None) results in
watching for events in all namespaces.
:param int start_at_resource_version: the resource version to begin watching at, excluding
the version number itself. The default (None) will send the most recent event, then
begin watching events occuring thereafter.
:return: a generator that yields :py:class:`~.WatchEvent` objects not seen before
"""

last_seen_version = start_at_resource_version

while self._run_forever:
for event in self._model.watch_list(namespace=namespace):
o = event.object
key = (o.metadata.name, o.metadata.namespace)
if self._seen.get(key) == o.metadata.resourceVersion and event.type != WatchEvent.DELETED:
continue
self._seen[key] = o.metadata.resourceVersion
yield event
for event in self._model.watch_list(
namespace=namespace,
start_at_resource_version=last_seen_version
):
last_seen_version = event.resourceVersion
if event.type != WatchEvent.BOOKMARK:
yield event
14 changes: 11 additions & 3 deletions tests/k8s/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,29 @@ class Meta:


class TestWatchEvent(object):

common_object = {"value": 42, "metadata": {"resourceVersion": 1}}

def test_watch_event_added(self):
watch_event = WatchEvent({"type": "ADDED", "object": {"value": 42}}, Example)
watch_event = WatchEvent({"type": "ADDED", "object": self.common_object}, Example)
assert watch_event.type == WatchEvent.ADDED
assert watch_event.object == Example(value=42)

def test_watch_event_modified(self):
watch_event = WatchEvent({"type": "MODIFIED", "object": {"value": 42}}, Example)
watch_event = WatchEvent({"type": "MODIFIED", "object": self.common_object}, Example)
assert watch_event.type == WatchEvent.MODIFIED
assert watch_event.object == Example(value=42)

def test_watch_event_deleted(self):
watch_event = WatchEvent({"type": "DELETED", "object": {"value": 42}}, Example)
watch_event = WatchEvent({"type": "DELETED", "object": self.common_object}, Example)
assert watch_event.type == WatchEvent.DELETED
assert watch_event.object == Example(value=42)

def test_watch_event_bookmark(self):
watch_event = WatchEvent({"type": "BOOKMARK", "object": self.common_object}, Example)
assert watch_event.type == WatchEvent.BOOKMARK
assert watch_event.object == Example(value=42)


class TestFind(object):
@pytest.fixture
Expand Down
13 changes: 11 additions & 2 deletions tests/k8s/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,22 @@ def test_watch_list_with_namespace_should_raise_exception_when_watch_list_url_te
def test_watch_list(self, session):
list(WatchListExample.watch_list())
session.request.assert_called_once_with(
"GET", _absolute_url("/watch/example"), json=None, timeout=config.stream_timeout, stream=True
"GET", _absolute_url("/watch/example?watch=1&allowWatchBookmarks=true"),
json=None, timeout=config.stream_timeout, stream=True
)

def test_watch_list_with_namespace(self, session):
list(WatchListExample.watch_list(namespace="explicitly-set"))
session.request.assert_called_once_with(
"GET", _absolute_url("/watch/explicitly-set/example"), json=None, timeout=config.stream_timeout, stream=True
"GET", _absolute_url("/watch/explicitly-set/example?watch=1&allowWatchBookmarks=true"),
json=None, timeout=config.stream_timeout, stream=True
)

def test_watch_list_starting_from_resource_version(self, session):
list(WatchListExample.watch_list(start_at_resource_version=10))
session.request.assert_called_once_with(
"GET", _absolute_url("/watch/example?watch=1&allowWatchBookmarks=true&resourceVersion=10"),
json=None, timeout=config.stream_timeout, stream=True
)

def test_list_without_namespace_should_raise_exception_when_list_url_is_not_set_on_metaclass(self, session):
Expand Down
73 changes: 52 additions & 21 deletions tests/k8s/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


import mock
from mock import call
import pytest
import six

Expand All @@ -28,6 +29,7 @@
ADDED = WatchEvent.ADDED
MODIFIED = WatchEvent.MODIFIED
DELETED = WatchEvent.DELETED
BOOKMARK = WatchEvent.BOOKMARK


@pytest.mark.usefixtures("k8s_config", "logger")
Expand All @@ -49,7 +51,7 @@ def test_multiple_events(self, api_watch_list):
watcher._run_forever = False
assert list(gen) == []

api_watch_list.assert_called_with(namespace=None)
api_watch_list.assert_called_with(namespace=None, start_at_resource_version=None)

def test_handle_reconnect(self, api_watch_list):
events = [_event(0, ADDED, 1)]
Expand All @@ -73,11 +75,11 @@ def test_handle_changes(self, api_watch_list):
watcher._run_forever = False
assert list(gen) == []

def test_complicated(self, api_watch_list):
def test_many_reconnections(self, api_watch_list):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good rename, easier to understand what we are actually testing here. 👍

first = [_event(0, ADDED, 1), _event(1, ADDED, 1), _event(2, ADDED, 1)]
second = [_event(0, ADDED, 1), _event(1, ADDED, 2), _event(2, ADDED, 1), _event(0, MODIFIED, 2)]
third = [_event(0, ADDED, 2), _event(1, DELETED, 2), _event(2, ADDED, 1), _event(2, MODIFIED, 2)]
fourth = [_event(0, ADDED, 2), _event(0, ADDED, 1, "other"), _event(0, MODIFIED, 2, "other")]
second = [_event(3, ADDED, 1), _event(4, ADDED, 2), _event(5, ADDED, 1), _event(6, MODIFIED, 2)]
third = [_event(7, ADDED, 2), _event(8, DELETED, 2), _event(9, ADDED, 1), _event(20, BOOKMARK, 2)]
fourth = [_event(25, ADDED, 2), _event(26, ADDED, 1), _event(27, MODIFIED, 2)]
api_watch_list.side_effect = [first, second, third, fourth]
watcher = Watcher(WatchListExample)
gen = watcher.watch()
Expand All @@ -88,20 +90,44 @@ def test_complicated(self, api_watch_list):
_assert_event(next(gen), 2, ADDED, 1)

# Second batch
_assert_event(next(gen), 1, ADDED, 2)
_assert_event(next(gen), 0, MODIFIED, 2)
_assert_event(next(gen), 3, ADDED, 1)
_assert_event(next(gen), 4, ADDED, 2)
_assert_event(next(gen), 5, ADDED, 1)
_assert_event(next(gen), 6, MODIFIED, 2)

# Third batch
_assert_event(next(gen), 1, DELETED, 2)
_assert_event(next(gen), 2, MODIFIED, 2)
_assert_event(next(gen), 7, ADDED, 2)
_assert_event(next(gen), 8, DELETED, 2)
_assert_event(next(gen), 9, ADDED, 1)

# Fourth batch
_assert_event(next(gen), 0, ADDED, 1, "other")
_assert_event(next(gen), 0, MODIFIED, 2, "other")
_assert_event(next(gen), 25, ADDED, 2)
_assert_event(next(gen), 26, ADDED, 1)
_assert_event(next(gen), 27, MODIFIED, 2)

watcher._run_forever = False
assert list(gen) == []

def test_start_watching_from_bookmark(self, api_watch_list):

bookmark_event = _event(30, BOOKMARK, 1)
first = [_event(0, ADDED, 1), _event(1, ADDED, 1), _event(2, ADDED, 1), bookmark_event]
second = [_event(31, ADDED, 1)]

api_watch_list.side_effect = [first, second]
watcher = Watcher(WatchListExample)
gen = watcher.watch()

_assert_event(next(gen), 0, ADDED, 1)
_assert_event(next(gen), 1, ADDED, 1)
_assert_event(next(gen), 2, ADDED, 1)
_assert_event(next(gen), 31, ADDED, 1)

api_watch_list.assert_has_calls([
call(namespace=None, start_at_resource_version=None),
call(namespace=None, start_at_resource_version=bookmark_event.resourceVersion),
])

def test_namespace(self, api_watch_list):
namespace = "the-namespace"
watcher = Watcher(WatchListExample)
Expand All @@ -115,22 +141,27 @@ def stop_iteration(*args, **kwargs):

assert list(gen) == []

api_watch_list.assert_called_with(namespace=namespace)
api_watch_list.assert_called_with(namespace=namespace, start_at_resource_version=None)


def _event(id, event_type, rv, namespace="default"):
metadict = {"name": "name{}".format(id), "namespace": namespace, "resourceVersion": rv}
metadata = ObjectMeta.from_dict(metadict)
wle = WatchListExample(metadata=metadata, value=(id * 100) + rv)
return mock.NonCallableMagicMock(type=event_type, object=wle)
def _event(id, event_type, rv):
event_json = {
"type": event_type,
"object": {
"value": (id * 100) + rv,
"metadata": {
"resourceVersion": rv
}
},
}
we = WatchEvent(event_json, WatchListExample)
return mock.NonCallableMagicMock(type=event_type, object=we)


def _assert_event(event, id, event_type, rv, namespace="default"):
def _assert_event(event, id, event_type, rv):
assert event.type == event_type
o = event.object
o = event.object.object
assert o.kind == "Example"
assert o.metadata.name == "name{}".format(id)
assert o.metadata.namespace == namespace
assert o.value == (id * 100) + rv


Expand Down