diff --git a/.gitignore b/.gitignore index 4595d59ea..2b83a0983 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ **/__pycache__/ .venv/ .idea +.sandbox .DS_Store node_modules Tests/kaas/results/ diff --git a/Tests/iaas/security-groups/default-security-group-rules.py b/Tests/iaas/security-groups/default-security-group-rules.py old mode 100644 new mode 100755 index 773cf0bb8..a71f333e8 --- a/Tests/iaas/security-groups/default-security-group-rules.py +++ b/Tests/iaas/security-groups/default-security-group-rules.py @@ -4,127 +4,177 @@ except for ingress rules from the same Security Group. Furthermore the presence of default rules for egress traffic is checked. """ +import argparse +from collections import Counter +import logging +import os +import sys import openstack -import os -import argparse +from openstack.exceptions import ResourceNotFound +logger = logging.getLogger(__name__) -def connect(cloud_name: str) -> openstack.connection.Connection: - """Create a connection to an OpenStack cloud +SG_NAME = "scs-test-default-sg" +DESCRIPTION = "scs-test-default-sg" - :param string cloud_name: - The name of the configuration to load from clouds.yaml. - :returns: openstack.connnection.Connection +def check_default_rules(rules, short=False): """ - return openstack.connect( - cloud=cloud_name, - ) + counts all verall ingress rules and egress rules, depending on the requested testing mode - -def test_rules(cloud_name: str): - try: - connection = connect(cloud_name) - rules = connection.network.default_security_group_rules() - except Exception as e: - print(str(e)) - raise Exception( - f"Connection to cloud '{cloud_name}' was not successfully. " - f"The default Security Group Rules could not be accessed. " - f"Please check your cloud connection and authorization." - ) - - # count all overall ingress rules and egress rules. - ingress_rules = 0 - ingress_from_same_sg = 0 - egress_rules = 0 - egress_ipv4_default_sg = 0 - egress_ipv4_custom_sg = 0 - egress_ipv6_default_sg = 0 - egress_ipv6_custom_sg = 0 + :param bool short + if short is True, the testing mode is set on short for older OpenStack versions + """ + ingress_rules = egress_rules = 0 + egress_vars = {'IPv4': {}, 'IPv6': {}} + for key, value in egress_vars.items(): + value['default'] = 0 + if not short: + value['custom'] = 0 if not rules: - print("No default security group rules defined.") - else: - for rule in rules: - direction = rule.direction - ethertype = rule.ethertype - r_custom_sg = rule.used_in_non_default_sg - r_default_sg = rule.used_in_default_sg - if direction == "ingress": - ingress_rules += 1 + logger.info("No default security group rules defined.") + for rule in rules: + direction = rule["direction"] + ethertype = rule["ethertype"] + if direction == "ingress": + if not short: # we allow ingress from the same security group # but only for the default security group - r_group_id = rule.remote_group_id - if (r_group_id == "PARENT" and not r_custom_sg): - ingress_from_same_sg += 1 - elif direction == "egress" and ethertype == "IPv4": - egress_rules += 1 - if rule.remote_ip_prefix: - # this rule does not allow traffic to all external ips - continue - if r_custom_sg: - egress_ipv4_custom_sg += 1 - if r_default_sg: - egress_ipv4_default_sg += 1 - elif direction == "egress" and ethertype == "IPv6": - egress_rules += 1 - if rule.remote_ip_prefix: - # this rule does not allow traffic to all external ips + if rule.remote_group_id == "PARENT" and not rule["used_in_non_default_sg"]: continue - if r_custom_sg: - egress_ipv6_custom_sg += 1 - if r_default_sg: - egress_ipv6_default_sg += 1 - - # test whether there are no other than the allowed ingress rules - assert ingress_rules == ingress_from_same_sg, ( - f"Expected only ingress rules for default security groups, " - f"that allow ingress traffic from the same group. " - f"But there are more - in total {ingress_rules} ingress rules. " - f"There should be only {ingress_from_same_sg} ingress rules.") - assert egress_rules > 0, ( - f"Expected to have more than {egress_rules} egress rules present.") - var_list = [egress_ipv4_default_sg, egress_ipv4_custom_sg, - egress_ipv6_default_sg, egress_ipv6_custom_sg] - assert all([var > 0 for var in var_list]), ( - "Not all expected egress rules are present. " - "Expected rules for egress for IPv4 and IPv6 " - "both for default and custom security groups.") - - result_dict = { - "Ingress Rules": ingress_rules, - "Egress Rules": egress_rules - } - return result_dict + ingress_rules += 1 + elif direction == "egress" and ethertype in egress_vars: + egress_rules += 1 + if short: + egress_vars[ethertype]['default'] += 1 + continue + if rule.remote_ip_prefix: + # this rule does not allow traffic to all external ips + continue + # note: these two are not mutually exclusive + if rule["used_in_default_sg"]: + egress_vars[ethertype]['default'] += 1 + if rule["used_in_non_default_sg"]: + egress_vars[ethertype]['custom'] += 1 + # test whether there are no unallowed ingress rules + if ingress_rules: + logger.error(f"Expected no default ingress rules, found {ingress_rules}.") + # test whether all expected egress rules are present + missing = [(key, key2) for key, val in egress_vars.items() for key2, val2 in val.items() if not val2] + if missing: + logger.error( + "Expected rules for egress for IPv4 and IPv6 both for default and custom security groups. " + f"Missing rule types: {', '.join(str(x) for x in missing)}" + ) + logger.info(str({ + "Unallowed Ingress Rules": ingress_rules, + "Egress Rules": egress_rules, + })) + + +def create_security_group(conn, sg_name: str = SG_NAME, description: str = DESCRIPTION): + """Create security group in openstack + + :returns: + ~openstack.network.v2.security_group.SecurityGroup: The new security group or None + """ + sg = conn.network.create_security_group(name=sg_name, description=description) + return sg.id + + +def delete_security_group(conn, sg_id): + conn.network.delete_security_group(sg_id) + # in case of a successful delete finding the sg will throw an exception + try: + conn.network.find_security_group(name_or_id=sg_id) + except ResourceNotFound: + logger.debug(f"Security group {sg_id} was deleted successfully.") + except Exception: + logger.critical(f"Security group {sg_id} was not deleted successfully") + raise + + +def altern_test_rules(connection: openstack.connection.Connection): + sg_id = create_security_group(connection) + try: + sg = connection.network.find_security_group(name_or_id=sg_id) + check_default_rules(sg.security_group_rules, short=True) + finally: + delete_security_group(connection, sg_id) + + +def test_rules(connection: openstack.connection.Connection): + try: + rules = list(connection.network.default_security_group_rules()) + except ResourceNotFound: + logger.info( + "API call failed. OpenStack components might not be up to date. " + "Falling back to old-style test method. " + ) + logger.debug("traceback", exc_info=True) + altern_test_rules(connection) + else: + check_default_rules(rules) + + +class CountingHandler(logging.Handler): + def __init__(self, level=logging.NOTSET): + super().__init__(level=level) + self.bylevel = Counter() + + def handle(self, record): + self.bylevel[record.levelno] += 1 def main(): parser = argparse.ArgumentParser( - description="SCS Default Security Group Rules Checker") + description="SCS Default Security Group Rules Checker", + ) parser.add_argument( - "--os-cloud", type=str, + "--os-cloud", + type=str, help="Name of the cloud from clouds.yaml, alternative " - "to the OS_CLOUD environment variable" + "to the OS_CLOUD environment variable", ) parser.add_argument( - "--debug", action="store_true", - help="Enable OpenStack SDK debug logging" + "--debug", action="store_true", help="Enable debug logging", ) args = parser.parse_args() openstack.enable_logging(debug=args.debug) + logging.basicConfig( + format="%(levelname)s: %(message)s", + level=logging.DEBUG if args.debug else logging.INFO, + ) + + # count the number of log records per level (used for summary and return code) + counting_handler = CountingHandler(level=logging.INFO) + logger.addHandler(counting_handler) # parse cloud name for lookup in clouds.yaml - cloud = os.environ.get("OS_CLOUD", None) - if args.os_cloud: - cloud = args.os_cloud - assert cloud, ( - "You need to have the OS_CLOUD environment variable set to your cloud " - "name or pass it via --os-cloud" - ) + cloud = args.os_cloud or os.environ.get("OS_CLOUD", None) + if not cloud: + raise ValueError( + "You need to have the OS_CLOUD environment variable set to your cloud " + "name or pass it via --os-cloud" + ) - print(test_rules(cloud)) + with openstack.connect(cloud) as conn: + test_rules(conn) + + c = counting_handler.bylevel + logger.debug(f"Total critical / error / warning: {c[logging.CRITICAL]} / {c[logging.ERROR]} / {c[logging.WARNING]}") + if not c[logging.CRITICAL]: + print("security-groups-default-rules-check: " + ('PASS', 'FAIL')[min(1, c[logging.ERROR])]) + return min(127, c[logging.CRITICAL] + c[logging.ERROR]) # cap at 127 due to OS restrictions if __name__ == "__main__": - main() + try: + sys.exit(main()) + except SystemExit: + raise + except BaseException as exc: + logging.debug("traceback", exc_info=True) + logging.critical(str(exc)) + sys.exit(1)