-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updated Watcher to use Bookmark events and restart watch from last observed event #94
Changes from all commits
d9ff459
9953966
597e8f7
b0d535c
24a4794
fbaa07e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
import json | ||
import logging | ||
from urllib.parse import urlencode, urlsplit, urlunsplit, parse_qs | ||
from collections import namedtuple | ||
|
||
import six | ||
|
@@ -117,20 +118,38 @@ def list(cls, namespace="default"): | |
return [cls.from_dict(item) for item in resp.json()[u"items"]] | ||
|
||
@classmethod | ||
def watch_list(cls, namespace=None): | ||
def _watch_url_from_base_resource_url(cls, base_resource_url, start_at_resource_version=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the |
||
scheme, netloc, path, query, fragment = urlsplit(base_resource_url) | ||
url_query_params = parse_qs(query) | ||
|
||
url_query_params["watch"] = [1] | ||
url_query_params["allowWatchBookmarks"] = ["true"] | ||
|
||
if start_at_resource_version is None: | ||
url_query_params["resourceVersion"] = [] | ||
else: | ||
url_query_params["resourceVersion"] = [start_at_resource_version] | ||
|
||
query = urlencode(url_query_params, doseq=True) | ||
return urlunsplit((scheme, netloc, path, query, fragment)) | ||
|
||
@classmethod | ||
def watch_list(cls, namespace=None, start_at_resource_version=None): | ||
"""Return a generator that yields WatchEvents of cls""" | ||
if namespace: | ||
if cls._meta.watch_list_url_template: | ||
url = cls._meta.watch_list_url_template.format(namespace=namespace) | ||
base_resource_url = cls._meta.watch_list_url_template.format(namespace=namespace) | ||
else: | ||
raise NotImplementedError( | ||
"Cannot watch_list with namespace, no watch_list_url_template defined on class {}".format(cls)) | ||
else: | ||
url = cls._meta.watch_list_url | ||
if not url: | ||
base_resource_url = cls._meta.watch_list_url | ||
if not base_resource_url: | ||
raise NotImplementedError("Cannot watch_list, no watch_list_url defined on class {}".format(cls)) | ||
|
||
resp = cls._client.get(url, stream=True, timeout=config.stream_timeout) | ||
watch_url = cls._watch_url_from_base_resource_url(base_resource_url, start_at_resource_version) | ||
|
||
resp = cls._client.get(watch_url, stream=True, timeout=config.stream_timeout) | ||
for line in resp.iter_lines(chunk_size=None): | ||
if line: | ||
try: | ||
|
@@ -288,14 +307,20 @@ class WatchEvent(object): | |
ADDED = "ADDED" | ||
MODIFIED = "MODIFIED" | ||
DELETED = "DELETED" | ||
BOOKMARK = "BOOKMARK" | ||
|
||
def __init__(self, event_json, cls): | ||
self.type = event_json["type"] | ||
self.object = cls.from_dict(event_json["object"]) | ||
self.resourceVersion = event_json["object"]["metadata"]["resourceVersion"] | ||
|
||
def __repr__(self): | ||
return "{cls}(type={type}, object={object})".format(cls=self.__class__.__name__, type=self.type, | ||
object=self.object) | ||
return "{cls}(type={type}, object={object}, rv={resourceVersion})".format( | ||
cls=self.__class__.__name__, | ||
type=self.type, | ||
object=self.object, | ||
resourceVersion=self.resourceVersion | ||
) | ||
|
||
|
||
class LabelSelector(object): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
|
||
import mock | ||
from mock import call | ||
import pytest | ||
import six | ||
|
||
|
@@ -28,6 +29,7 @@ | |
ADDED = WatchEvent.ADDED | ||
MODIFIED = WatchEvent.MODIFIED | ||
DELETED = WatchEvent.DELETED | ||
BOOKMARK = WatchEvent.BOOKMARK | ||
|
||
|
||
@pytest.mark.usefixtures("k8s_config", "logger") | ||
|
@@ -49,7 +51,7 @@ def test_multiple_events(self, api_watch_list): | |
watcher._run_forever = False | ||
assert list(gen) == [] | ||
|
||
api_watch_list.assert_called_with(namespace=None) | ||
api_watch_list.assert_called_with(namespace=None, start_at_resource_version=None) | ||
|
||
def test_handle_reconnect(self, api_watch_list): | ||
events = [_event(0, ADDED, 1)] | ||
|
@@ -73,11 +75,11 @@ def test_handle_changes(self, api_watch_list): | |
watcher._run_forever = False | ||
assert list(gen) == [] | ||
|
||
def test_complicated(self, api_watch_list): | ||
def test_many_reconnections(self, api_watch_list): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good rename, easier to understand what we are actually testing here. 👍 |
||
first = [_event(0, ADDED, 1), _event(1, ADDED, 1), _event(2, ADDED, 1)] | ||
second = [_event(0, ADDED, 1), _event(1, ADDED, 2), _event(2, ADDED, 1), _event(0, MODIFIED, 2)] | ||
third = [_event(0, ADDED, 2), _event(1, DELETED, 2), _event(2, ADDED, 1), _event(2, MODIFIED, 2)] | ||
fourth = [_event(0, ADDED, 2), _event(0, ADDED, 1, "other"), _event(0, MODIFIED, 2, "other")] | ||
second = [_event(3, ADDED, 1), _event(4, ADDED, 2), _event(5, ADDED, 1), _event(6, MODIFIED, 2)] | ||
third = [_event(7, ADDED, 2), _event(8, DELETED, 2), _event(9, ADDED, 1), _event(20, BOOKMARK, 2)] | ||
fourth = [_event(25, ADDED, 2), _event(26, ADDED, 1), _event(27, MODIFIED, 2)] | ||
api_watch_list.side_effect = [first, second, third, fourth] | ||
watcher = Watcher(WatchListExample) | ||
gen = watcher.watch() | ||
|
@@ -88,20 +90,44 @@ def test_complicated(self, api_watch_list): | |
_assert_event(next(gen), 2, ADDED, 1) | ||
|
||
# Second batch | ||
_assert_event(next(gen), 1, ADDED, 2) | ||
_assert_event(next(gen), 0, MODIFIED, 2) | ||
_assert_event(next(gen), 3, ADDED, 1) | ||
_assert_event(next(gen), 4, ADDED, 2) | ||
_assert_event(next(gen), 5, ADDED, 1) | ||
_assert_event(next(gen), 6, MODIFIED, 2) | ||
|
||
# Third batch | ||
_assert_event(next(gen), 1, DELETED, 2) | ||
_assert_event(next(gen), 2, MODIFIED, 2) | ||
_assert_event(next(gen), 7, ADDED, 2) | ||
_assert_event(next(gen), 8, DELETED, 2) | ||
_assert_event(next(gen), 9, ADDED, 1) | ||
|
||
# Fourth batch | ||
_assert_event(next(gen), 0, ADDED, 1, "other") | ||
_assert_event(next(gen), 0, MODIFIED, 2, "other") | ||
_assert_event(next(gen), 25, ADDED, 2) | ||
_assert_event(next(gen), 26, ADDED, 1) | ||
_assert_event(next(gen), 27, MODIFIED, 2) | ||
|
||
watcher._run_forever = False | ||
assert list(gen) == [] | ||
|
||
def test_start_watching_from_bookmark(self, api_watch_list): | ||
|
||
bookmark_event = _event(30, BOOKMARK, 1) | ||
first = [_event(0, ADDED, 1), _event(1, ADDED, 1), _event(2, ADDED, 1), bookmark_event] | ||
second = [_event(31, ADDED, 1)] | ||
|
||
api_watch_list.side_effect = [first, second] | ||
watcher = Watcher(WatchListExample) | ||
gen = watcher.watch() | ||
|
||
_assert_event(next(gen), 0, ADDED, 1) | ||
_assert_event(next(gen), 1, ADDED, 1) | ||
_assert_event(next(gen), 2, ADDED, 1) | ||
_assert_event(next(gen), 31, ADDED, 1) | ||
|
||
api_watch_list.assert_has_calls([ | ||
call(namespace=None, start_at_resource_version=None), | ||
call(namespace=None, start_at_resource_version=bookmark_event.resourceVersion), | ||
]) | ||
|
||
def test_namespace(self, api_watch_list): | ||
namespace = "the-namespace" | ||
watcher = Watcher(WatchListExample) | ||
|
@@ -115,22 +141,27 @@ def stop_iteration(*args, **kwargs): | |
|
||
assert list(gen) == [] | ||
|
||
api_watch_list.assert_called_with(namespace=namespace) | ||
api_watch_list.assert_called_with(namespace=namespace, start_at_resource_version=None) | ||
|
||
|
||
def _event(id, event_type, rv, namespace="default"): | ||
metadict = {"name": "name{}".format(id), "namespace": namespace, "resourceVersion": rv} | ||
metadata = ObjectMeta.from_dict(metadict) | ||
wle = WatchListExample(metadata=metadata, value=(id * 100) + rv) | ||
return mock.NonCallableMagicMock(type=event_type, object=wle) | ||
def _event(id, event_type, rv): | ||
event_json = { | ||
"type": event_type, | ||
"object": { | ||
"value": (id * 100) + rv, | ||
"metadata": { | ||
"resourceVersion": rv | ||
} | ||
}, | ||
} | ||
we = WatchEvent(event_json, WatchListExample) | ||
return mock.NonCallableMagicMock(type=event_type, object=we) | ||
|
||
|
||
def _assert_event(event, id, event_type, rv, namespace="default"): | ||
def _assert_event(event, id, event_type, rv): | ||
assert event.type == event_type | ||
o = event.object | ||
o = event.object.object | ||
assert o.kind == "Example" | ||
assert o.metadata.name == "name{}".format(id) | ||
assert o.metadata.namespace == namespace | ||
assert o.value == (id * 100) + rv | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still support Python 2.7, so this fails. You can use
six.moves
to solve it: https://six.readthedocs.io/index.html?highlight=moves#module-six.moves