Skip to content

Commit

Permalink
Merge pull request #216 from tyler-8/threaded
Browse files Browse the repository at this point in the history
Add threading to .all() and .filter() requests
  • Loading branch information
Zach Moody authored Mar 16, 2020
2 parents 2523bd9 + 8301002 commit 5930103
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 0 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,13 @@ nb.dcim.devices.all()
[test1-leaf1, test1-leaf2]
```

### Threading

pynetbox supports multithreaded calls (in Python 3 only) for `.filter()` and `.all()` queries. It is **highly recommended** you have `MAX_PAGE_SIZE` in your Netbox install set to anything *except* `0` or `None`. The default value of `1000` is usually a good value to use. To enable threading, add `threading=True` parameter to the `.api`:

```python
nb = pynetbox.api(
'http://localhost:8000',
threading=True,
)
```
7 changes: 7 additions & 0 deletions pynetbox/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys

import requests

from pynetbox.core.endpoint import Endpoint
Expand Down Expand Up @@ -152,6 +154,7 @@ def __init__(
private_key=None,
private_key_file=None,
ssl_verify=True,
threading=False,
):
if private_key and private_key_file:
raise ValueError(
Expand All @@ -165,6 +168,10 @@ def __init__(
self.ssl_verify = ssl_verify
self.session_key = None
self.http_session = requests.Session()
if threading and sys.version_info.major == 2:
raise NotImplementedError("Threaded pynetbox calls not supported \
in Python 2")
self.threading = threading

if self.private_key_file:
with open(self.private_key_file, "r") as kf:
Expand Down
2 changes: 2 additions & 0 deletions pynetbox/core/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def all(self):
session_key=self.session_key,
ssl_verify=self.ssl_verify,
http_session=self.api.http_session,
threading=self.api.threading,
)

return [self._response_loader(i) for i in req.get()]
Expand Down Expand Up @@ -220,6 +221,7 @@ def filter(self, *args, **kwargs):
session_key=self.session_key,
ssl_verify=self.ssl_verify,
http_session=self.api.http_session,
threading=self.api.threading,
)

ret = [self._response_loader(i) for i in req.get()]
Expand Down
50 changes: 50 additions & 0 deletions pynetbox/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
try:
import concurrent.futures as cf
except ImportError:
pass
import json
from six.moves.urllib.parse import urlencode

Expand All @@ -28,6 +32,11 @@ def url_param_builder(param_dict):
return "?{}".format(urlencode(param_dict))


def calc_pages(limit, count):
""" Calculate number of pages required for full results set. """
return int(count / limit) + (limit % count > 0)


class RequestError(Exception):
"""Basic Request Exception
Expand Down Expand Up @@ -141,6 +150,7 @@ def __init__(
session_key=None,
ssl_verify=True,
url=None,
threading=False,
):
"""
Instantiates a new Request object
Expand All @@ -164,6 +174,7 @@ def __init__(
self.ssl_verify = ssl_verify
self.http_session = http_session
self.url = self.base if not key else "{}{}/".format(self.base, key)
self.threading = threading

def get_version(self):
""" Gets the API version of NetBox.
Expand Down Expand Up @@ -262,6 +273,19 @@ def _make_call(
else:
raise RequestError(req)

def concurrent_get(self, ret, page_size, page_offsets):
futures_to_results = []
with cf.ThreadPoolExecutor(max_workers=4) as pool:
for offset in page_offsets:
new_params = {"offset": offset, "limit": page_size}
futures_to_results.append(
pool.submit(self._make_call, add_params=new_params)
)

for future in cf.as_completed(futures_to_results):
result = future.result()
ret.extend(result["results"])

def get(self, add_params=None):
"""Makes a GET request.
Expand Down Expand Up @@ -297,6 +321,32 @@ def req_all():
else:
return req

def req_all_threaded(add_params):
if add_params is None:
# Limit must be 0 to discover the max page size
add_params = {"limit": 0}
req = self._make_call(add_params=add_params)
if isinstance(req, dict) and req.get("results") is not None:
ret = req["results"]
if req.get("next"):
page_size = len(req["results"])
pages = calc_pages(page_size, req["count"])
page_offsets = [
increment * page_size for increment in range(1, pages)
]
if pages == 1:
req = self._make_call(url_override=req.get("next"))
ret.extend(req["results"])
else:
self.concurrent_get(ret, page_size, page_offsets)

return ret
else:
return req

if self.threading:
return req_all_threaded(add_params)

return req_all()

def put(self, data):
Expand Down

0 comments on commit 5930103

Please sign in to comment.