Skip to content

Commit

Permalink
Merge branch 'main' into move/675-node-to-node-encryption
Browse files Browse the repository at this point in the history
  • Loading branch information
matofeder authored Nov 12, 2024
2 parents ce8668a + ef46e2d commit 626f3cf
Show file tree
Hide file tree
Showing 9 changed files with 342 additions and 289 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
**/__pycache__/
.venv/
.idea
.sandbox
.DS_Store
node_modules
Tests/kaas/results/
Expand Down
2 changes: 1 addition & 1 deletion Standards/scs-0102-v1-image-metadata.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: SCS Image Metadata Standard
title: SCS Image Metadata
type: Standard
stabilized_at: 2022-10-31
status: Stable
Expand Down
2 changes: 1 addition & 1 deletion Standards/scs-0118-v1-taxonomy-of-failsafe-levels.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: Taxonomy of Failsafe Levels
title: SCS Taxonomy of Failsafe Levels
type: Decision Record
status: Draft
track: IaaS
Expand Down
13 changes: 8 additions & 5 deletions Tests/iaas/key-manager/check-for-key-manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,16 @@ def main():
# parse cloud name for lookup in clouds.yaml
cloud = args.os_cloud or os.environ.get("OS_CLOUD", None)
if not cloud:
raise RuntimeError(
logger.critical(
"You need to have the OS_CLOUD environment variable set to your cloud "
"name or pass it via --os-cloud"
)
return 2

with openstack.connect(cloud=cloud) as conn:
if not check_for_member_role(conn):
logger.critical("Cannot test key-manager permissions. User has wrong roles")
return 1
return 2
if check_presence_of_key_manager(conn):
return check_key_manager_permissions(conn)
else:
Expand All @@ -145,9 +146,11 @@ def main():

if __name__ == "__main__":
try:
sys.exit(main())
except SystemExit:
sys.exit(main() or 0)
except SystemExit as e:
if e.code < 2:
print("key-manager-check: " + ('PASS', 'FAIL')[min(1, e.code)])
raise
except BaseException:
logger.critical("exception", exc_info=True)
sys.exit(1)
sys.exit(2)
241 changes: 146 additions & 95 deletions Tests/iaas/security-groups/default-security-group-rules.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,130 +1,181 @@
#!/usr/bin/env python3
"""Default Security Group Rules Checker
This script tests the absence of any ingress default security group rule
except for ingress rules from the same Security Group. Furthermore the
presence of default rules for egress traffic is checked.
"""
import argparse
from collections import Counter
import logging
import os
import sys

import openstack
import os
import argparse
from openstack.exceptions import ResourceNotFound

logger = logging.getLogger(__name__)

def connect(cloud_name: str) -> openstack.connection.Connection:
"""Create a connection to an OpenStack cloud
SG_NAME = "scs-test-default-sg"
DESCRIPTION = "scs-test-default-sg"

:param string cloud_name:
The name of the configuration to load from clouds.yaml.

:returns: openstack.connnection.Connection
def check_default_rules(rules, short=False):
"""
return openstack.connect(
cloud=cloud_name,
)
counts all verall ingress rules and egress rules, depending on the requested testing mode

def test_rules(cloud_name: str):
try:
connection = connect(cloud_name)
rules = connection.network.default_security_group_rules()
except Exception as e:
print(str(e))
raise Exception(
f"Connection to cloud '{cloud_name}' was not successfully. "
f"The default Security Group Rules could not be accessed. "
f"Please check your cloud connection and authorization."
)

# count all overall ingress rules and egress rules.
ingress_rules = 0
ingress_from_same_sg = 0
egress_rules = 0
egress_ipv4_default_sg = 0
egress_ipv4_custom_sg = 0
egress_ipv6_default_sg = 0
egress_ipv6_custom_sg = 0
:param bool short
if short is True, the testing mode is set on short for older OpenStack versions
"""
ingress_rules = egress_rules = 0
egress_vars = {'IPv4': {}, 'IPv6': {}}
for key, value in egress_vars.items():
value['default'] = 0
if not short:
value['custom'] = 0
if not rules:
print("No default security group rules defined.")
else:
for rule in rules:
direction = rule.direction
ethertype = rule.ethertype
r_custom_sg = rule.used_in_non_default_sg
r_default_sg = rule.used_in_default_sg
if direction == "ingress":
ingress_rules += 1
logger.info("No default security group rules defined.")
for rule in rules:
direction = rule["direction"]
ethertype = rule["ethertype"]
if direction == "ingress":
if not short:
# we allow ingress from the same security group
# but only for the default security group
r_group_id = rule.remote_group_id
if (r_group_id == "PARENT" and not r_custom_sg):
ingress_from_same_sg += 1
elif direction == "egress" and ethertype == "IPv4":
egress_rules += 1
if rule.remote_ip_prefix:
# this rule does not allow traffic to all external ips
continue
if r_custom_sg:
egress_ipv4_custom_sg += 1
if r_default_sg:
egress_ipv4_default_sg += 1
elif direction == "egress" and ethertype == "IPv6":
egress_rules += 1
if rule.remote_ip_prefix:
# this rule does not allow traffic to all external ips
if rule.remote_group_id == "PARENT" and not rule["used_in_non_default_sg"]:
continue
if r_custom_sg:
egress_ipv6_custom_sg += 1
if r_default_sg:
egress_ipv6_default_sg += 1

# test whether there are no other than the allowed ingress rules
assert ingress_rules == ingress_from_same_sg, (
f"Expected only ingress rules for default security groups, "
f"that allow ingress traffic from the same group. "
f"But there are more - in total {ingress_rules} ingress rules. "
f"There should be only {ingress_from_same_sg} ingress rules.")
assert egress_rules > 0, (
f"Expected to have more than {egress_rules} egress rules present.")
var_list = [egress_ipv4_default_sg, egress_ipv4_custom_sg,
egress_ipv6_default_sg, egress_ipv6_custom_sg]
assert all([var > 0 for var in var_list]), (
"Not all expected egress rules are present. "
"Expected rules for egress for IPv4 and IPv6 "
"both for default and custom security groups.")

result_dict = {
"Ingress Rules": ingress_rules,
"Egress Rules": egress_rules
}
return result_dict
ingress_rules += 1
elif direction == "egress" and ethertype in egress_vars:
egress_rules += 1
if short:
egress_vars[ethertype]['default'] += 1
continue
if rule.remote_ip_prefix:
# this rule does not allow traffic to all external ips
continue
# note: these two are not mutually exclusive
if rule["used_in_default_sg"]:
egress_vars[ethertype]['default'] += 1
if rule["used_in_non_default_sg"]:
egress_vars[ethertype]['custom'] += 1
# test whether there are no unallowed ingress rules
if ingress_rules:
logger.error(f"Expected no default ingress rules, found {ingress_rules}.")
# test whether all expected egress rules are present
missing = [(key, key2) for key, val in egress_vars.items() for key2, val2 in val.items() if not val2]
if missing:
logger.error(
"Expected rules for egress for IPv4 and IPv6 both for default and custom security groups. "
f"Missing rule types: {', '.join(str(x) for x in missing)}"
)
logger.info(str({
"Unallowed Ingress Rules": ingress_rules,
"Egress Rules": egress_rules,
}))


def create_security_group(conn, sg_name: str = SG_NAME, description: str = DESCRIPTION):
"""Create security group in openstack
:returns:
~openstack.network.v2.security_group.SecurityGroup: The new security group or None
"""
sg = conn.network.create_security_group(name=sg_name, description=description)
return sg.id


def delete_security_group(conn, sg_id):
conn.network.delete_security_group(sg_id)
# in case of a successful delete finding the sg will throw an exception
try:
conn.network.find_security_group(name_or_id=sg_id)
except ResourceNotFound:
logger.debug(f"Security group {sg_id} was deleted successfully.")
except Exception:
logger.critical(f"Security group {sg_id} was not deleted successfully")
raise


def altern_test_rules(connection: openstack.connection.Connection):
sg_id = create_security_group(connection)
try:
sg = connection.network.find_security_group(name_or_id=sg_id)
check_default_rules(sg.security_group_rules, short=True)
finally:
delete_security_group(connection, sg_id)


def test_rules(connection: openstack.connection.Connection):
try:
rules = list(connection.network.default_security_group_rules())
except ResourceNotFound:
logger.info(
"API call failed. OpenStack components might not be up to date. "
"Falling back to old-style test method. "
)
logger.debug("traceback", exc_info=True)
altern_test_rules(connection)
else:
check_default_rules(rules)


class CountingHandler(logging.Handler):
def __init__(self, level=logging.NOTSET):
super().__init__(level=level)
self.bylevel = Counter()

def handle(self, record):
self.bylevel[record.levelno] += 1


def main():
parser = argparse.ArgumentParser(
description="SCS Default Security Group Rules Checker")
description="SCS Default Security Group Rules Checker",
)
parser.add_argument(
"--os-cloud", type=str,
"--os-cloud",
type=str,
help="Name of the cloud from clouds.yaml, alternative "
"to the OS_CLOUD environment variable"
"to the OS_CLOUD environment variable",
)
parser.add_argument(
"--debug", action="store_true",
help="Enable OpenStack SDK debug logging"
"--debug", action="store_true", help="Enable debug logging",
)
args = parser.parse_args()
openstack.enable_logging(debug=args.debug)
logging.basicConfig(
format="%(levelname)s: %(message)s",
level=logging.DEBUG if args.debug else logging.INFO,
)

# count the number of log records per level (used for summary and return code)
counting_handler = CountingHandler(level=logging.INFO)
logger.addHandler(counting_handler)

# parse cloud name for lookup in clouds.yaml
cloud = os.environ.get("OS_CLOUD", None)
if args.os_cloud:
cloud = args.os_cloud
assert cloud, (
"You need to have the OS_CLOUD environment variable set to your cloud "
"name or pass it via --os-cloud"
)
cloud = args.os_cloud or os.environ.get("OS_CLOUD", None)
if not cloud:
raise ValueError(
"You need to have the OS_CLOUD environment variable set to your cloud "
"name or pass it via --os-cloud"
)

print(test_rules(cloud))
with openstack.connect(cloud) as conn:
test_rules(conn)

c = counting_handler.bylevel
logger.debug(f"Total critical / error / warning: {c[logging.CRITICAL]} / {c[logging.ERROR]} / {c[logging.WARNING]}")
if not c[logging.CRITICAL]:
print("security-groups-default-rules-check: " + ('PASS', 'FAIL')[min(1, c[logging.ERROR])])
return min(127, c[logging.CRITICAL] + c[logging.ERROR]) # cap at 127 due to OS restrictions


if __name__ == "__main__":
main()
try:
sys.exit(main())
except SystemExit:
raise
except BaseException as exc:
logging.debug("traceback", exc_info=True)
logging.critical(str(exc))
sys.exit(1)
Loading

0 comments on commit 626f3cf

Please sign in to comment.