diff --git a/qc_otx/checks/state_machine_checker/__init__.py b/qc_otx/checks/state_machine_checker/__init__.py index b172790..185f159 100644 --- a/qc_otx/checks/state_machine_checker/__init__.py +++ b/qc_otx/checks/state_machine_checker/__init__.py @@ -2,3 +2,6 @@ from . import state_machine_checker as state_machine_checker from . import no_procedure_realization as no_procedure_realization from . import mandatory_target_state as mandatory_target_state +from . import no_target_state_for_completed_state as no_target_state_for_completed_state +from . import mandatory_transition as mandatory_transition +from . import mandatory_trigger as mandatory_trigger diff --git a/qc_otx/checks/state_machine_checker/mandatory_transition.py b/qc_otx/checks/state_machine_checker/mandatory_transition.py new file mode 100644 index 0000000..a0d8191 --- /dev/null +++ b/qc_otx/checks/state_machine_checker/mandatory_transition.py @@ -0,0 +1,79 @@ +import logging + +from qc_baselib import IssueSeverity + +from qc_otx import constants +from qc_otx.checks import models, utils + +from qc_otx.checks.state_machine_checker import state_machine_constants + + +def check_rule(checker_data: models.CheckerData) -> None: + """ + Rule ID: asam.net:otx:1.0.0:state_machine.chk_005.mandatory_transition + + Criterion: Each state except the completed state shall have at least one transition. + Severity: Critical + + Version range: [1.0.0, ) + + Remark: + None + + """ + logging.info("Executing mandatory_transition check") + + issue_severity = IssueSeverity.ERROR + + rule_uid = checker_data.result.register_rule( + checker_bundle_name=constants.BUNDLE_NAME, + checker_id=state_machine_constants.CHECKER_ID, + emanating_entity="asam.net", + standard="otx", + definition_setting="1.0.0", + rule_full_name="state_machine.chk_005.mandatory_transition", + ) + + tree = checker_data.input_file_xml_root + nsmap = utils.get_namespace_map(tree) + + if "smp" not in nsmap: + logging.error( + 'No state machine procedure prefix "smp" found in document namespaces. Abort state machine procedure checks...' + ) + return + + state_machine_procedures = utils.get_state_machine_procedures(tree, nsmap) + + if state_machine_procedures is None: + return + + logging.debug(f"state_machine_procedures: {state_machine_procedures}") + + # smp = "state machine procedure" + for state_machine_procedure in state_machine_procedures: + + state_machine = utils.get_state_machine(state_machine_procedure, nsmap) + + if state_machine is None: + return + + for sm_state in state_machine.states: + has_issue = not sm_state.is_completed and len(sm_state.transitions) == 0 + if has_issue: + current_xpath = tree.getelementpath(sm_state.xml_element) + issue_id = checker_data.result.register_issue( + checker_bundle_name=constants.BUNDLE_NAME, + checker_id=state_machine_constants.CHECKER_ID, + description="Issue flagging when a non completed state has no transition", + level=issue_severity, + rule_uid=rule_uid, + ) + + checker_data.result.add_xml_location( + checker_bundle_name=constants.BUNDLE_NAME, + checker_id=state_machine_constants.CHECKER_ID, + issue_id=issue_id, + xpath=current_xpath, + description=f"State {sm_state.name} with id {sm_state.id} does not have any transition", + ) diff --git a/qc_otx/checks/state_machine_checker/mandatory_trigger.py b/qc_otx/checks/state_machine_checker/mandatory_trigger.py new file mode 100644 index 0000000..b8780a8 --- /dev/null +++ b/qc_otx/checks/state_machine_checker/mandatory_trigger.py @@ -0,0 +1,79 @@ +import logging + +from qc_baselib import IssueSeverity + +from qc_otx import constants +from qc_otx.checks import models, utils + +from qc_otx.checks.state_machine_checker import state_machine_constants + + +def check_rule(checker_data: models.CheckerData) -> None: + """ + Rule ID: asam.net:otx:1.0.0:state_machine.chk_004.mandatory_trigger + + Criterion: Each state except the completed state shall have at least one trigger. + Severity: Critical + + Version range: [1.0.0, ) + + Remark: + None + + """ + logging.info("Executing mandatory_trigger check") + + issue_severity = IssueSeverity.ERROR + + rule_uid = checker_data.result.register_rule( + checker_bundle_name=constants.BUNDLE_NAME, + checker_id=state_machine_constants.CHECKER_ID, + emanating_entity="asam.net", + standard="otx", + definition_setting="1.0.0", + rule_full_name="state_machine.chk_004.mandatory_trigger", + ) + + tree = checker_data.input_file_xml_root + nsmap = utils.get_namespace_map(tree) + + if "smp" not in nsmap: + logging.error( + 'No state machine procedure prefix "smp" found in document namespaces. Abort state machine procedure checks...' + ) + return + + state_machine_procedures = utils.get_state_machine_procedures(tree, nsmap) + + if state_machine_procedures is None: + return + + logging.debug(f"state_machine_procedures: {state_machine_procedures}") + + # smp = "state machine procedure" + for state_machine_procedure in state_machine_procedures: + + state_machine = utils.get_state_machine(state_machine_procedure, nsmap) + + if state_machine is None: + return + + for sm_state in state_machine.states: + has_issue = not sm_state.is_completed and len(sm_state.triggers) == 0 + if has_issue: + current_xpath = tree.getelementpath(sm_state.xml_element) + issue_id = checker_data.result.register_issue( + checker_bundle_name=constants.BUNDLE_NAME, + checker_id=state_machine_constants.CHECKER_ID, + description="Issue flagging when a non completed state has no triggers", + level=issue_severity, + rule_uid=rule_uid, + ) + + checker_data.result.add_xml_location( + checker_bundle_name=constants.BUNDLE_NAME, + checker_id=state_machine_constants.CHECKER_ID, + issue_id=issue_id, + xpath=current_xpath, + description=f"State {sm_state.name} with id {sm_state.id} does not have any trigger", + ) diff --git a/qc_otx/checks/state_machine_checker/no_target_state_for_completed_state.py b/qc_otx/checks/state_machine_checker/no_target_state_for_completed_state.py new file mode 100644 index 0000000..c8480bc --- /dev/null +++ b/qc_otx/checks/state_machine_checker/no_target_state_for_completed_state.py @@ -0,0 +1,88 @@ +import logging + +from qc_baselib import IssueSeverity + +from qc_otx import constants +from qc_otx.checks import models, utils + +from qc_otx.checks.state_machine_checker import state_machine_constants + + +def check_rule(checker_data: models.CheckerData) -> None: + """ + Rule ID: asam.net:otx:1.0.0:state_machine.chk_003.no_target_state_for_completed_state + + Criterion: After finishing the completed state the procedure is finished and shall return to + the caller. Therefore the completed state shall not have a target state. + Severity: Warning + + Version range: [1.0.0, ) + + Remark: + None + + """ + logging.info("Executing no_target_state_for_completed_state check") + + issue_severity = IssueSeverity.WARNING + + rule_uid = checker_data.result.register_rule( + checker_bundle_name=constants.BUNDLE_NAME, + checker_id=state_machine_constants.CHECKER_ID, + emanating_entity="asam.net", + standard="otx", + definition_setting="1.0.0", + rule_full_name="state_machine.chk_003.no_target_state_for_completed_state", + ) + + tree = checker_data.input_file_xml_root + nsmap = utils.get_namespace_map(tree) + + if "smp" not in nsmap: + logging.error( + 'No state machine procedure prefix "smp" found in document namespaces. Abort state machine procedure checks...' + ) + return + + state_machine_procedures = utils.get_state_machine_procedures(tree, nsmap) + + if state_machine_procedures is None: + return + + logging.debug(f"state_machine_procedures: {state_machine_procedures}") + + # smp = "state machine procedure" + for state_machine_procedure in state_machine_procedures: + + state_machine = utils.get_state_machine(state_machine_procedure, nsmap) + + if state_machine is None: + return + + completed_state = None + for sm_state in state_machine.states: + if sm_state.is_completed: + completed_state = sm_state + break + + if completed_state is None: + return + + has_issue = len(completed_state.target_state_ids) != 0 + if has_issue: + current_xpath = tree.getelementpath(completed_state.xml_element) + issue_id = checker_data.result.register_issue( + checker_bundle_name=constants.BUNDLE_NAME, + checker_id=state_machine_constants.CHECKER_ID, + description="Issue flagging when a completed state has a target state", + level=issue_severity, + rule_uid=rule_uid, + ) + + checker_data.result.add_xml_location( + checker_bundle_name=constants.BUNDLE_NAME, + checker_id=state_machine_constants.CHECKER_ID, + issue_id=issue_id, + xpath=current_xpath, + description=f"Completed state {completed_state.name} with id {completed_state.id} has a target state but it should not", + ) diff --git a/qc_otx/checks/state_machine_checker/state_machine_checker.py b/qc_otx/checks/state_machine_checker/state_machine_checker.py index 573f33a..37b4b43 100644 --- a/qc_otx/checks/state_machine_checker/state_machine_checker.py +++ b/qc_otx/checks/state_machine_checker/state_machine_checker.py @@ -11,6 +11,9 @@ state_machine_constants, no_procedure_realization, mandatory_target_state, + no_target_state_for_completed_state, + mandatory_transition, + mandatory_trigger, ) @@ -27,6 +30,9 @@ def run_checks(checker_data: models.CheckerData) -> None: rule_list = [ no_procedure_realization.check_rule, # Chk001 mandatory_target_state.check_rule, # Chk002 + no_target_state_for_completed_state.check_rule, # Chk003 + mandatory_transition.check_rule, # Chk004 + mandatory_trigger.check_rule, # Chk005 ] for rule in rule_list: diff --git a/tests/data/StateMachine_Chk003/negative.otx b/tests/data/StateMachine_Chk003/negative.otx new file mode 100644 index 0000000..24bbf42 --- /dev/null +++ b/tests/data/StateMachine_Chk003/negative.otx @@ -0,0 +1,150 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + This is a new state machine procedure + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/data/StateMachine_Chk003/positive.otx b/tests/data/StateMachine_Chk003/positive.otx new file mode 100644 index 0000000..74f879a --- /dev/null +++ b/tests/data/StateMachine_Chk003/positive.otx @@ -0,0 +1,132 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + This is a new state machine procedure + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/data/StateMachine_Chk004/negative.otx b/tests/data/StateMachine_Chk004/negative.otx new file mode 100644 index 0000000..16a218e --- /dev/null +++ b/tests/data/StateMachine_Chk004/negative.otx @@ -0,0 +1,121 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + This is a new state machine procedure + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/data/StateMachine_Chk004/positive.otx b/tests/data/StateMachine_Chk004/positive.otx new file mode 100644 index 0000000..74f879a --- /dev/null +++ b/tests/data/StateMachine_Chk004/positive.otx @@ -0,0 +1,132 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + This is a new state machine procedure + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/data/StateMachine_Chk005/negative.otx b/tests/data/StateMachine_Chk005/negative.otx new file mode 100644 index 0000000..26025fd --- /dev/null +++ b/tests/data/StateMachine_Chk005/negative.otx @@ -0,0 +1,119 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + This is a new state machine procedure + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/data/StateMachine_Chk005/positive.otx b/tests/data/StateMachine_Chk005/positive.otx new file mode 100644 index 0000000..74f879a --- /dev/null +++ b/tests/data/StateMachine_Chk005/positive.otx @@ -0,0 +1,132 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + This is a new state machine procedure + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/test_state_machine_checks.py b/tests/test_state_machine_checks.py index b415cb5..80df187 100644 --- a/tests/test_state_machine_checks.py +++ b/tests/test_state_machine_checks.py @@ -102,3 +102,150 @@ def test_chk002_negative( assert state_machine_issues[0].level == IssueSeverity.ERROR test_utils.cleanup_files() + + +def test_chk003_positive( + monkeypatch, +) -> None: + base_path = "tests/data/StateMachine_Chk003/" + target_file_name = f"positive.otx" + target_file_path = os.path.join(base_path, target_file_name) + + test_utils.create_test_config(target_file_path) + + test_utils.launch_main(monkeypatch) + + result = Result() + result.load_from_file(test_utils.REPORT_FILE_PATH) + + assert ( + len( + result.get_issues_by_rule_uid( + "asam.net:otx:1.0.0:state_machine.chk_003.no_target_state_for_completed_state" + ) + ) + == 0 + ) + + test_utils.cleanup_files() + + +def test_chk003_negative( + monkeypatch, +) -> None: + base_path = "tests/data/StateMachine_Chk003/" + target_file_name = f"negative.otx" + target_file_path = os.path.join(base_path, target_file_name) + + test_utils.create_test_config(target_file_path) + + test_utils.launch_main(monkeypatch) + + result = Result() + result.load_from_file(test_utils.REPORT_FILE_PATH) + + state_machine_issues = result.get_issues_by_rule_uid( + "asam.net:otx:1.0.0:state_machine.chk_003.no_target_state_for_completed_state" + ) + assert len(state_machine_issues) == 1 + assert state_machine_issues[0].level == IssueSeverity.WARNING + + test_utils.cleanup_files() + + +def test_chk004_positive( + monkeypatch, +) -> None: + base_path = "tests/data/StateMachine_Chk004/" + target_file_name = f"positive.otx" + target_file_path = os.path.join(base_path, target_file_name) + + test_utils.create_test_config(target_file_path) + + test_utils.launch_main(monkeypatch) + + result = Result() + result.load_from_file(test_utils.REPORT_FILE_PATH) + + assert ( + len( + result.get_issues_by_rule_uid( + "asam.net:otx:1.0.0:state_machine.chk_004.mandatory_trigger" + ) + ) + == 0 + ) + + test_utils.cleanup_files() + + +def test_chk004_negative( + monkeypatch, +) -> None: + base_path = "tests/data/StateMachine_Chk004/" + target_file_name = f"negative.otx" + target_file_path = os.path.join(base_path, target_file_name) + + test_utils.create_test_config(target_file_path) + + test_utils.launch_main(monkeypatch) + + result = Result() + result.load_from_file(test_utils.REPORT_FILE_PATH) + + state_machine_issues = result.get_issues_by_rule_uid( + "asam.net:otx:1.0.0:state_machine.chk_004.mandatory_trigger" + ) + assert len(state_machine_issues) == 1 + assert state_machine_issues[0].level == IssueSeverity.ERROR + + test_utils.cleanup_files() + + +def test_chk005_positive( + monkeypatch, +) -> None: + base_path = "tests/data/StateMachine_Chk005/" + target_file_name = f"positive.otx" + target_file_path = os.path.join(base_path, target_file_name) + + test_utils.create_test_config(target_file_path) + + test_utils.launch_main(monkeypatch) + + result = Result() + result.load_from_file(test_utils.REPORT_FILE_PATH) + + assert ( + len( + result.get_issues_by_rule_uid( + "asam.net:otx:1.0.0:state_machine.chk_005.mandatory_transition" + ) + ) + == 0 + ) + + test_utils.cleanup_files() + + +def test_chk005_negative( + monkeypatch, +) -> None: + base_path = "tests/data/StateMachine_Chk005/" + target_file_name = f"negative.otx" + target_file_path = os.path.join(base_path, target_file_name) + + test_utils.create_test_config(target_file_path) + + test_utils.launch_main(monkeypatch) + + result = Result() + result.load_from_file(test_utils.REPORT_FILE_PATH) + + state_machine_issues = result.get_issues_by_rule_uid( + "asam.net:otx:1.0.0:state_machine.chk_005.mandatory_transition" + ) + assert len(state_machine_issues) == 1 + assert state_machine_issues[0].level == IssueSeverity.ERROR + + test_utils.cleanup_files()