diff --git a/pyresttest/tests.py b/pyresttest/tests.py index 56365fce..e9f29dc4 100644 --- a/pyresttest/tests.py +++ b/pyresttest/tests.py @@ -5,7 +5,6 @@ import pycurl import sys - from . import contenthandling from .contenthandling import ContentHandler from . import validators @@ -56,6 +55,7 @@ u'POST': pycurl.POST, u'DELETE': 'DELETE'} + # Parsing helper functions def coerce_to_string(val): if isinstance(val, text_type): @@ -67,6 +67,7 @@ def coerce_to_string(val): else: raise TypeError("Input {0} is not a string or integer, and it needs to be!".format(val)) + def coerce_string_to_ascii(val): if isinstance(val, text_type): return val.encode('ascii') @@ -75,6 +76,7 @@ def coerce_string_to_ascii(val): else: raise TypeError("Input {0} is not a string, string expected".format(val)) + def coerce_http_method(val): myval = val if not isinstance(myval, basestring) or len(val) == 0: @@ -83,6 +85,7 @@ def coerce_http_method(val): myval = myval.decode('utf-8') return myval.upper() + def coerce_list_of_ints(val): """ If single value, try to parse as integer, else try to parse as list of integer """ if isinstance(val, list): @@ -90,6 +93,7 @@ def coerce_list_of_ints(val): else: return [int(val)] + class Test(object): """ Describes a REST test """ _url = None @@ -181,9 +185,11 @@ def get_url(self, context=None): if val is None: val = self._url return val + url = property(get_url, set_url, None, 'URL fragment for request') NAME_HEADERS = 'headers' + # Totally different from others def set_headers(self, value, isTemplate=False): @@ -204,6 +210,7 @@ def get_headers(self, context=None): def template_tuple(tuple_input): return (string.Template(str(tuple_item)).safe_substitute(vals) for tuple_item in tuple_input) + return dict(map(template_tuple, self._headers.items())) headers = property(get_headers, set_headers, None, @@ -293,14 +300,14 @@ def configure_curl(self, timeout=DEFAULT_TIMEOUT, context=None, curl_handle=None curl = curl_handle try: # Check the curl handle isn't closed, and reuse it if possible - curl.getinfo(curl.HTTP_CODE) + curl.getinfo(curl.HTTP_CODE) # Below clears the cookies & curl options for clean run # But retains the DNS cache and connection pool curl.reset() curl.setopt(curl.COOKIELIST, "ALL") except pycurl.error: curl = pycurl.Curl() - + else: curl = pycurl.Curl() @@ -319,9 +326,9 @@ def configure_curl(self, timeout=DEFAULT_TIMEOUT, context=None, curl_handle=None curl.setopt(curl.READFUNCTION, MyIO(bod).read) if self.auth_username and self.auth_password: - curl.setopt(pycurl.USERPWD, - parsing.encode_unicode_bytes(self.auth_username) + b':' + - parsing.encode_unicode_bytes(self.auth_password)) + curl.setopt(pycurl.USERPWD, + parsing.encode_unicode_bytes(self.auth_username) + b':' + + parsing.encode_unicode_bytes(self.auth_password)) if self.auth_type: curl.setopt(pycurl.HTTPAUTH, self.auth_type) @@ -407,79 +414,75 @@ def parse_test(cls, base_url, node, input_test=None, test_path=None): Accepted structure must be a single dictionary of key-value pairs for test configuration """ - mytest = input_test - if not mytest: - mytest = Test() + if not input_test: + input_test = Test() # Clean up for easy parsing node = lowercase_keys(flatten_dictionaries(node)) - - # Simple table of variable name, coerce function, and optionally special store function CONFIG_ELEMENTS = { # Simple variables u'auth_username': [coerce_string_to_ascii], u'auth_password': [coerce_string_to_ascii], - u'method': [coerce_http_method], # HTTP METHOD - u'delay': [lambda x: int(x)], # Delay before running - u'group': [coerce_to_string], # Test group name + u'method': [coerce_http_method], # HTTP METHOD + u'delay': [lambda x: int(x)], # Delay before running + u'group': [coerce_to_string], # Test group name u'name': [coerce_to_string], # Test name u'expected_status': [coerce_list_of_ints], - u'delay': [lambda x: int(x)], u'stop_on_failure': [safe_to_bool], # Templated / special handling - #u'url': [coerce_templatable, set_templated), # TODO: special handling for templated content, sigh + # u'url': [coerce_templatable, set_templated), # TODO: special handling for templated content, sigh u'body': [ContentHandler.parse_content] - #u'headers': [], + # u'headers': [], # COMPLEX PARSE OPTIONS - #u'extract_binds':[], # Context variable-to-extractor output binding - #u'variable_binds': [], # Context variable to value binding - #u'generator_binds': [], # Context variable to generator output binding - #u'validators': [], # Validation functions to run + # u'extract_binds':[], # Context variable-to-extractor output binding + # u'variable_binds': [], # Context variable to value binding + # u'generator_binds': [], # Context variable to generator output binding + # u'validators': [], # Validation functions to run } - def use_config_parser(configobject, configelement, configvalue): + def use_config_parser(configobject, config_element, config_value): """ Try to use parser bindings to find an option for parsing and storing config element :configobject: Object to store configuration - :configelement: Configuratione element name - :configvalue: Value to use to set configuration + :config_element: Configuratione element name + :config_value: Value to use to set configuration :returns: True if found match for config element, False if didn't """ - myparsing = CONFIG_ELEMENTS.get(configelement) - if myparsing: - converted = myparsing[0](configvalue) - setattr(configobject, configelement, converted) + parsed_value = CONFIG_ELEMENTS.get(config_element) + if parsed_value: + converted = parsed_value[0](config_value) + setattr(configobject, config_element, converted) return True return False # Copy/convert input elements into appropriate form for a test object - for configelement, configvalue in node.items(): - if use_config_parser(mytest, configelement, configvalue): + for config_element, config_value in node.items(): + if use_config_parser(input_test, config_element, config_value): continue # Configure test using configuration elements - if configelement == u'url': - temp = configvalue - if isinstance(configvalue, dict): + if config_element == u'url': + temp = config_value + if isinstance(config_value, dict): # Template is used for URL - val = lowercase_keys(configvalue)[u'template'] + val = lowercase_keys(config_value)[u'template'] assert isinstance(val, basestring) or isinstance(val, int) url = urlparse.urljoin(base_url, coerce_to_string(val)) - mytest.set_url(url, isTemplate=True) + input_test.set_url(url, isTemplate=True) else: - assert isinstance(configvalue, basestring) or isinstance( - configvalue, int) - mytest.url = urlparse.urljoin(base_url, coerce_to_string(configvalue)) - elif configelement == u'extract_binds': + assert isinstance(config_value, basestring) or isinstance( + config_value, int) + input_test.url = urlparse.urljoin(base_url, coerce_to_string(config_value)) + elif config_element == u'extract_binds': # Add a list of extractors, of format: # {variable_name: {extractor_type: extractor_config}, ... } - binds = flatten_dictionaries(configvalue) - if mytest.extract_binds is None: - mytest.extract_binds = dict() + binds = flatten_dictionaries(config_value) + if input_test.extract_binds is None: + input_test.extract_binds = dict() for variable_name, extractor in binds.items(): if not isinstance(extractor, dict) or len(extractor) == 0: @@ -491,59 +494,58 @@ def use_config_parser(configobject, configelement, configvalue): # Safe because length can only be 1 for extractor_type, extractor_config in extractor.items(): - mytest.extract_binds[variable_name] = validators.parse_extractor(extractor_type, extractor_config) - + input_test.extract_binds[variable_name] = validators.parse_extractor(extractor_type, + extractor_config) - elif configelement == u'validators': + elif config_element == u'validators': # Add a list of validators - if not isinstance(configvalue, list): + if not isinstance(config_value, list): raise Exception( 'Misconfigured validator section, must be a list of validators') - if mytest.validators is None: - mytest.validators = list() + if input_test.validators is None: + input_test.validators = list() # create validator and add to list of validators - for var in configvalue: + for var in config_value: if not isinstance(var, dict): raise TypeError( "Validators must be defined as validatorType:{configs} ") for validator_type, validator_config in var.items(): validator = validators.parse_validator( validator_type, validator_config) - mytest.validators.append(validator) + input_test.validators.append(validator) - elif configelement == 'headers': # HTTP headers to use, flattened to a single string-string dictionary - mytest.headers - configvalue = flatten_dictionaries(configvalue) + elif config_element == 'headers': # HTTP headers to use, flattened to a single string-string dictionary + config_value = flatten_dictionaries(config_value) - if isinstance(configvalue, dict): - filterfunc = lambda x: str(x[0]).lower() == 'template' # Templated items - templates = [x for x in ifilter(filterfunc, configvalue.items())] + if isinstance(config_value, dict): + filterfunc = lambda x: str(x[0]).lower() == 'template' # Templated items + templates = [x for x in ifilter(filterfunc, config_value.items())] else: templates = None if templates: # Should have single entry in dictionary keys - mytest.set_headers(templates[0][1], isTemplate=True) - elif isinstance(configvalue, dict): - mytest.headers = configvalue + input_test.set_headers(templates[0][1], isTemplate=True) + elif isinstance(config_value, dict): + input_test.headers = config_value else: raise TypeError( "Illegal header type: headers must be a dictionary or list of dictionary keys") - elif configelement == 'variable_binds': - mytest.variable_binds = flatten_dictionaries(configvalue) - elif configelement == 'generator_binds': - output = flatten_dictionaries(configvalue) + elif config_element == 'variable_binds': + input_test.variable_binds = flatten_dictionaries(config_value) + elif config_element == 'generator_binds': + output = flatten_dictionaries(config_value) output2 = dict() for key, value in output.items(): output2[str(key)] = str(value) - mytest.generator_binds = output2 - elif configelement.startswith('curl_option_'): - curlopt = configelement[12:].upper() + input_test.generator_binds = output2 + elif config_element.startswith('curl_option_'): + curlopt = config_element[12:].upper() if hasattr(BASECURL, curlopt): - if not mytest.curl_options: - mytest.curl_options = dict() - mytest.curl_options[curlopt] = configvalue + if not input_test.curl_options: + input_test.curl_options = dict() + input_test.curl_options[curlopt] = config_value else: raise ValueError( "Illegal curl option: {0}".format(curlopt)) @@ -553,11 +555,11 @@ def use_config_parser(configobject, configelement, configvalue): # this is per HTTP spec: # http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#sec9.5 if 'expected_status' not in node.keys(): - if mytest.method == 'POST': - mytest.expected_status = [200, 201, 204] - elif mytest.method == 'PUT': - mytest.expected_status = [200, 201, 204] - elif mytest.method == 'DELETE': - mytest.expected_status = [200, 202, 204] + if input_test.method == 'POST': + input_test.expected_status = [200, 201, 204] + elif input_test.method == 'PUT': + input_test.expected_status = [200, 201, 204] + elif input_test.method == 'DELETE': + input_test.expected_status = [200, 202, 204] # Fallthrough default is simply [200] - return mytest + return input_test