Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Formating Code base and removed duplicate dict key #295

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 78 additions & 76 deletions pyresttest/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import pycurl
import sys


from . import contenthandling
from .contenthandling import ContentHandler
from . import validators
Expand Down Expand Up @@ -56,6 +55,7 @@
u'POST': pycurl.POST,
u'DELETE': 'DELETE'}


# Parsing helper functions
def coerce_to_string(val):
if isinstance(val, text_type):
Expand All @@ -67,6 +67,7 @@ def coerce_to_string(val):
else:
raise TypeError("Input {0} is not a string or integer, and it needs to be!".format(val))


def coerce_string_to_ascii(val):
if isinstance(val, text_type):
return val.encode('ascii')
Expand All @@ -75,6 +76,7 @@ def coerce_string_to_ascii(val):
else:
raise TypeError("Input {0} is not a string, string expected".format(val))


def coerce_http_method(val):
myval = val
if not isinstance(myval, basestring) or len(val) == 0:
Expand All @@ -83,13 +85,15 @@ def coerce_http_method(val):
myval = myval.decode('utf-8')
return myval.upper()


def coerce_list_of_ints(val):
""" If single value, try to parse as integer, else try to parse as list of integer """
if isinstance(val, list):
return [int(x) for x in val]
else:
return [int(val)]


class Test(object):
""" Describes a REST test """
_url = None
Expand Down Expand Up @@ -181,9 +185,11 @@ def get_url(self, context=None):
if val is None:
val = self._url
return val

url = property(get_url, set_url, None, 'URL fragment for request')

NAME_HEADERS = 'headers'

# Totally different from others

def set_headers(self, value, isTemplate=False):
Expand All @@ -204,6 +210,7 @@ def get_headers(self, context=None):

def template_tuple(tuple_input):
return (string.Template(str(tuple_item)).safe_substitute(vals) for tuple_item in tuple_input)

return dict(map(template_tuple, self._headers.items()))

headers = property(get_headers, set_headers, None,
Expand Down Expand Up @@ -293,14 +300,14 @@ def configure_curl(self, timeout=DEFAULT_TIMEOUT, context=None, curl_handle=None
curl = curl_handle

try: # Check the curl handle isn't closed, and reuse it if possible
curl.getinfo(curl.HTTP_CODE)
curl.getinfo(curl.HTTP_CODE)
# Below clears the cookies & curl options for clean run
# But retains the DNS cache and connection pool
curl.reset()
curl.setopt(curl.COOKIELIST, "ALL")
except pycurl.error:
curl = pycurl.Curl()

else:
curl = pycurl.Curl()

Expand All @@ -319,9 +326,9 @@ def configure_curl(self, timeout=DEFAULT_TIMEOUT, context=None, curl_handle=None
curl.setopt(curl.READFUNCTION, MyIO(bod).read)

if self.auth_username and self.auth_password:
curl.setopt(pycurl.USERPWD,
parsing.encode_unicode_bytes(self.auth_username) + b':' +
parsing.encode_unicode_bytes(self.auth_password))
curl.setopt(pycurl.USERPWD,
parsing.encode_unicode_bytes(self.auth_username) + b':' +
parsing.encode_unicode_bytes(self.auth_password))
if self.auth_type:
curl.setopt(pycurl.HTTPAUTH, self.auth_type)

Expand Down Expand Up @@ -407,79 +414,75 @@ def parse_test(cls, base_url, node, input_test=None, test_path=None):

Accepted structure must be a single dictionary of key-value pairs for test configuration """

mytest = input_test
if not mytest:
mytest = Test()
if not input_test:
input_test = Test()

# Clean up for easy parsing
node = lowercase_keys(flatten_dictionaries(node))



# Simple table of variable name, coerce function, and optionally special store function
CONFIG_ELEMENTS = {
# Simple variables
u'auth_username': [coerce_string_to_ascii],
u'auth_password': [coerce_string_to_ascii],
u'method': [coerce_http_method], # HTTP METHOD
u'delay': [lambda x: int(x)], # Delay before running
u'group': [coerce_to_string], # Test group name
u'method': [coerce_http_method], # HTTP METHOD
u'delay': [lambda x: int(x)], # Delay before running
u'group': [coerce_to_string], # Test group name
u'name': [coerce_to_string], # Test name
u'expected_status': [coerce_list_of_ints],
u'delay': [lambda x: int(x)],
u'stop_on_failure': [safe_to_bool],

# Templated / special handling
#u'url': [coerce_templatable, set_templated), # TODO: special handling for templated content, sigh
# u'url': [coerce_templatable, set_templated), # TODO: special handling for templated content, sigh
u'body': [ContentHandler.parse_content]
#u'headers': [],
# u'headers': [],

# COMPLEX PARSE OPTIONS
#u'extract_binds':[], # Context variable-to-extractor output binding
#u'variable_binds': [], # Context variable to value binding
#u'generator_binds': [], # Context variable to generator output binding
#u'validators': [], # Validation functions to run
# u'extract_binds':[], # Context variable-to-extractor output binding
# u'variable_binds': [], # Context variable to value binding
# u'generator_binds': [], # Context variable to generator output binding
# u'validators': [], # Validation functions to run
}

def use_config_parser(configobject, configelement, configvalue):
def use_config_parser(configobject, config_element, config_value):
""" Try to use parser bindings to find an option for parsing and storing config element
:configobject: Object to store configuration
:configelement: Configuratione element name
:configvalue: Value to use to set configuration
:config_element: Configuratione element name
:config_value: Value to use to set configuration
:returns: True if found match for config element, False if didn't
"""

myparsing = CONFIG_ELEMENTS.get(configelement)
if myparsing:
converted = myparsing[0](configvalue)
setattr(configobject, configelement, converted)
parsed_value = CONFIG_ELEMENTS.get(config_element)
if parsed_value:
converted = parsed_value[0](config_value)
setattr(configobject, config_element, converted)
return True
return False

# Copy/convert input elements into appropriate form for a test object
for configelement, configvalue in node.items():
if use_config_parser(mytest, configelement, configvalue):
for config_element, config_value in node.items():
if use_config_parser(input_test, config_element, config_value):
continue

# Configure test using configuration elements
if configelement == u'url':
temp = configvalue
if isinstance(configvalue, dict):
if config_element == u'url':
temp = config_value
if isinstance(config_value, dict):
# Template is used for URL
val = lowercase_keys(configvalue)[u'template']
val = lowercase_keys(config_value)[u'template']
assert isinstance(val, basestring) or isinstance(val, int)
url = urlparse.urljoin(base_url, coerce_to_string(val))
mytest.set_url(url, isTemplate=True)
input_test.set_url(url, isTemplate=True)
else:
assert isinstance(configvalue, basestring) or isinstance(
configvalue, int)
mytest.url = urlparse.urljoin(base_url, coerce_to_string(configvalue))
elif configelement == u'extract_binds':
assert isinstance(config_value, basestring) or isinstance(
config_value, int)
input_test.url = urlparse.urljoin(base_url, coerce_to_string(config_value))
elif config_element == u'extract_binds':
# Add a list of extractors, of format:
# {variable_name: {extractor_type: extractor_config}, ... }
binds = flatten_dictionaries(configvalue)
if mytest.extract_binds is None:
mytest.extract_binds = dict()
binds = flatten_dictionaries(config_value)
if input_test.extract_binds is None:
input_test.extract_binds = dict()

for variable_name, extractor in binds.items():
if not isinstance(extractor, dict) or len(extractor) == 0:
Expand All @@ -491,59 +494,58 @@ def use_config_parser(configobject, configelement, configvalue):

# Safe because length can only be 1
for extractor_type, extractor_config in extractor.items():
mytest.extract_binds[variable_name] = validators.parse_extractor(extractor_type, extractor_config)

input_test.extract_binds[variable_name] = validators.parse_extractor(extractor_type,
extractor_config)

elif configelement == u'validators':
elif config_element == u'validators':
# Add a list of validators
if not isinstance(configvalue, list):
if not isinstance(config_value, list):
raise Exception(
'Misconfigured validator section, must be a list of validators')
if mytest.validators is None:
mytest.validators = list()
if input_test.validators is None:
input_test.validators = list()

# create validator and add to list of validators
for var in configvalue:
for var in config_value:
if not isinstance(var, dict):
raise TypeError(
"Validators must be defined as validatorType:{configs} ")
for validator_type, validator_config in var.items():
validator = validators.parse_validator(
validator_type, validator_config)
mytest.validators.append(validator)
input_test.validators.append(validator)

elif configelement == 'headers': # HTTP headers to use, flattened to a single string-string dictionary
mytest.headers
configvalue = flatten_dictionaries(configvalue)
elif config_element == 'headers': # HTTP headers to use, flattened to a single string-string dictionary
config_value = flatten_dictionaries(config_value)

if isinstance(configvalue, dict):
filterfunc = lambda x: str(x[0]).lower() == 'template' # Templated items
templates = [x for x in ifilter(filterfunc, configvalue.items())]
if isinstance(config_value, dict):
filterfunc = lambda x: str(x[0]).lower() == 'template' # Templated items
templates = [x for x in ifilter(filterfunc, config_value.items())]
else:
templates = None

if templates:
# Should have single entry in dictionary keys
mytest.set_headers(templates[0][1], isTemplate=True)
elif isinstance(configvalue, dict):
mytest.headers = configvalue
input_test.set_headers(templates[0][1], isTemplate=True)
elif isinstance(config_value, dict):
input_test.headers = config_value
else:
raise TypeError(
"Illegal header type: headers must be a dictionary or list of dictionary keys")
elif configelement == 'variable_binds':
mytest.variable_binds = flatten_dictionaries(configvalue)
elif configelement == 'generator_binds':
output = flatten_dictionaries(configvalue)
elif config_element == 'variable_binds':
input_test.variable_binds = flatten_dictionaries(config_value)
elif config_element == 'generator_binds':
output = flatten_dictionaries(config_value)
output2 = dict()
for key, value in output.items():
output2[str(key)] = str(value)
mytest.generator_binds = output2
elif configelement.startswith('curl_option_'):
curlopt = configelement[12:].upper()
input_test.generator_binds = output2
elif config_element.startswith('curl_option_'):
curlopt = config_element[12:].upper()
if hasattr(BASECURL, curlopt):
if not mytest.curl_options:
mytest.curl_options = dict()
mytest.curl_options[curlopt] = configvalue
if not input_test.curl_options:
input_test.curl_options = dict()
input_test.curl_options[curlopt] = config_value
else:
raise ValueError(
"Illegal curl option: {0}".format(curlopt))
Expand All @@ -553,11 +555,11 @@ def use_config_parser(configobject, configelement, configvalue):
# this is per HTTP spec:
# http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#sec9.5
if 'expected_status' not in node.keys():
if mytest.method == 'POST':
mytest.expected_status = [200, 201, 204]
elif mytest.method == 'PUT':
mytest.expected_status = [200, 201, 204]
elif mytest.method == 'DELETE':
mytest.expected_status = [200, 202, 204]
if input_test.method == 'POST':
input_test.expected_status = [200, 201, 204]
elif input_test.method == 'PUT':
input_test.expected_status = [200, 201, 204]
elif input_test.method == 'DELETE':
input_test.expected_status = [200, 202, 204]
# Fallthrough default is simply [200]
return mytest
return input_test