-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathRIPEAtlas.py
276 lines (247 loc) · 11.5 KB
/
RIPEAtlas.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" A module to perform measurements on the RIPE Atlas
<http://atlas.ripe.net/> probes using the UDM (User Defined
Measurements) creation API.
Authorization key is expected in $HOME/.atlas/auth or have to be
provided in the constructor's arguments.
Stéphane Bortzmeyer <[email protected]>
"""
import os
import json
import time
import urllib2
import random
authfile = "%s/.atlas/auth" % os.environ['HOME']
base_url = "https://atlas.ripe.net/api/v2/measurements"
# The following parameters are currently not settable. Anyway, be
# careful when changing these, you may get inconsistent results if you
# do not wait long enough. Other warning: the time to wait depend on
# the number of the probes.
# All in seconds:
fields_delay_base = 6
fields_delay_factor = 0.2
results_delay_base = 3
results_delay_factor = 0.15
maximum_time_for_results_base = 30
maximum_time_for_results_factor = 5
# The basic problem is that there is no easy way in Atlas to know when
# it is over, either for retrieving the list of the probes, or for
# retrieving the results themselves. The only solution is to wait
# "long enough". The time to wait is not documented so the values
# above have been found mostly with trial-and-error.
class AuthFileNotFound(Exception):
pass
class AuthFileEmpty(Exception):
pass
class RequestSubmissionError(Exception):
pass
class FieldsQueryError(Exception):
pass
class MeasurementNotFound(Exception):
pass
class MeasurementAccessError(Exception):
pass
class ResultError(Exception):
pass
class IncompatibleArguments(Exception):
pass
class InternalError(Exception):
pass
# Resut JSON file does not have the expected fields/members
class WrongAssumption(Exception):
pass
class JsonRequest(urllib2.Request):
def __init__(self, url):
urllib2.Request.__init__(self, url)
self.add_header("Content-Type", "application/json")
self.add_header("Accept", "application/json")
self.add_header("User-Agent", "RIPEAtlas.py")
class Measurement():
""" An Atlas measurement, identified by its ID (such as #1010569) in the field "id" """
def __init__(self, data, wait=True, sleep_notification=None, key=None, id=None):
"""
Creates a measurement."data" must be a dictionary (*not* a JSON string) having the members
requested by the Atlas documentation. "wait" should be set to False for periodic (not
oneoff) measurements. "sleep_notification" is a lambda taking one parameter, the
sleep delay: when the module has to sleep, it calls this lambda, allowing you to be informed of
the delay. "key" is the API key. If None, it will be read in the configuration file.
If "data" is None and id is not, a dummy measurement will be created, mapped to
the existing measurement having this ID.
"""
if data is None and id is None:
raise RequestSubmissionError("No data and no measurement ID")
# TODO: when creating a dummy measurement, a key may not be necessary if the measurement is public
if not key:
if not os.path.exists(authfile):
raise AuthFileNotFound("Authentication file %s not found" % authfile)
auth = open(authfile)
key = auth.readline()
if key is None or key == "":
raise AuthFileEmpty("Authentication file %s empty or missing a end-of-line at the end" % authfile)
key = key.rstrip('\n')
auth.close()
self.url = base_url + "/?key=%s" % key
self.url_probes = base_url + "/%s/?fields=probes,status"
self.url_status = base_url + "/%s/?fields=status"
self.url_results = base_url + "/%s/results/"
self.url_all = base_url + "/%s/"
self.url_latest = base_url + "-latest/%s/?versions=%s"
self.status = None
if data is not None:
self.json_data = json.dumps(data)
self.notification = sleep_notification
request = JsonRequest(self.url)
try:
# Start the measurement
conn = urllib2.urlopen(request, self.json_data)
# Now, parse the answer
results = json.load(conn)
self.id = results["measurements"][0]
conn.close()
except urllib2.HTTPError as e:
raise RequestSubmissionError("Status %s, reason \"%s\"" % \
(e.code, e.read()))
self.gen = random.Random()
self.time = time.gmtime()
if not wait:
return
# Find out how many probes were actually allocated to this measurement
enough = False
left = 30 # Maximum number of tests
requested = data["probes"][0]["requested"]
fields_delay = fields_delay_base + (requested * fields_delay_factor)
while not enough:
# Let's be patient
if self.notification is not None:
self.notification(fields_delay)
time.sleep(fields_delay)
fields_delay *= 2
try:
request = JsonRequest((self.url_probes % self.id) + \
("&defeatcaching=dc%s" % self.gen.randint(1,10000))) # A random
# component is necesary to defeat caching (even Cache-Control sems ignored)
conn = urllib2.urlopen(request)
# Now, parse the answer
meta = json.load(conn)
self.status = meta["status"]["name"]
if meta["status"]["name"] == "Specified" or \
meta["status"]["name"] == "Scheduled":
# Not done, loop
left -= 1
if left <= 0:
raise FieldsQueryError("Maximum number of status queries reached")
elif meta["status"]["name"] == "Ongoing":
enough = True
self.num_probes = len(meta["probes"])
else:
raise InternalError("Internal error in #%s, unexpected status when querying the measurement fields: \"%s\"" % (self.id, meta["status"]))
conn.close()
except urllib2.HTTPError as e:
raise FieldsQueryError("%s" % e.read())
else:
self.id = id
self.notification = None
try:
conn = urllib2.urlopen(JsonRequest(self.url_status % self.id))
except urllib2.HTTPError as e:
if e.code == 404:
raise MeasurementNotFound
else:
raise MeasurementAccessError("%s" % e.read())
result_status = json.load(conn)
status = result_status["status"]["name"]
self.status = status
if status != "Ongoing" and status != "Stopped":
raise MeasurementAccessError("Invalid status \"%s\"" % status)
try:
conn = urllib2.urlopen(JsonRequest(self.url_probes % self.id))
except urllib2.HTTPError as e:
if e.code == 404:
raise MeasurementNotFound
else:
raise MeasurementAccessError("%s" % e.read())
result_status = json.load(conn)
self.num_probes = len(result_status["probes"])
try:
conn = urllib2.urlopen(JsonRequest(self.url_all % self.id))
except urllib2.HTTPError as e:
if e.code == 404:
raise MeasurementNotFound
else:
raise MeasurementAccessError("%s" % e.read())
result_status = json.load(conn)
self.time = time.gmtime(result_status["start_time"])
self.description = result_status["description"]
self.interval = result_status["interval"]
def results(self, wait=True, percentage_required=0.9, latest=None):
"""Retrieves the result. "wait" indicates if you are willing to wait until
the measurement is over (otherwise, you'll get partial
results). "percentage_required" is meaningful only when you wait
and it indicates the percentage of the allocated probes that
have to report before the function returns (warning: the
measurement may stop even if not enough probes reported so you
always have to check the actual number of reporting probes in
the result). "latest" indicates that you want to retrieve only
the last N results (by default, you get all the results).
"""
if latest is not None:
wait = False
if latest is None:
request = JsonRequest(self.url_results % self.id)
else:
request = JsonRequest(self.url_latest% (self.id, latest))
if wait:
enough = False
attempts = 0
results_delay = results_delay_base + (self.num_probes * results_delay_factor)
maximum_time_for_results = maximum_time_for_results_base + \
(self.num_probes * maximum_time_for_results_factor)
start = time.time()
elapsed = 0
result_data = None
while not enough and elapsed < maximum_time_for_results:
if self.notification is not None:
self.notification(results_delay)
time.sleep(results_delay)
results_delay *= 2
attempts += 1
elapsed = time.time() - start
try:
conn = urllib2.urlopen(request)
result_data = json.load(conn)
num_results = len(result_data)
if num_results >= self.num_probes*percentage_required:
# Requesting a strict equality may be too
# strict: if an allocated probe does not
# respond, we will have to wait for the stop
# of the measurement (many minutes). Anyway,
# there is also the problem that a probe may
# have sent only a part of its measurements.
enough = True
else:
conn = urllib2.urlopen(JsonRequest(self.url_status % self.id))
result_status = json.load(conn)
status = result_status["status"]["name"]
if status == "Ongoing":
# Wait a bit more
pass
elif status == "Stopped":
enough = True # Even if not enough probes
else:
raise InternalError("Unexpected status when retrieving the measurement: \"%s\"" % \
result_data["status"])
conn.close()
except urllib2.HTTPError as e:
if e.code != 404: # Yes, we may have no result file at
# all for some time
raise ResultError(str(e.code) + " " + e.reason)
if result_data is None:
raise ResultError("No results retrieved")
else:
try:
conn = urllib2.urlopen(request)
result_data = json.load(conn)
except urllib2.HTTPError as e:
raise ResultError(e.read())
return result_data