From c10cef873b491d98a87c8d7f0e878d99fcecff14 Mon Sep 17 00:00:00 2001 From: "Jonas Brand (8R0WNI3)" Date: Mon, 22 Apr 2024 18:02:03 +0200 Subject: [PATCH] Include sprint information in rescoring proposal --- app.py | 2 ++ rescore.py | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/app.py b/app.py index cdcd296f..7018c01d 100755 --- a/app.py +++ b/app.py @@ -440,6 +440,8 @@ def handle_exception(req, resp, ex, params, ws=None): component_descriptor_lookup=component_descriptor_lookup, namespace_callback=namespace_callback, kubernetes_api_callback=kubernetes_api_callback, + sprints_repo_callback=sprints_repo_callback, + sprints_relpath_callback=sprints_relpath_callback, ), ) diff --git a/rescore.py b/rescore.py index 13ebf965..d8a4fa23 100644 --- a/rescore.py +++ b/rescore.py @@ -1,5 +1,6 @@ import collections.abc import dataclasses +import datetime import enum import logging import typing @@ -23,9 +24,11 @@ import deliverydb.model as dm import deliverydb.util as du import k8s.backlog +import k8s.model import k8s.util import middleware.auth import util +import yp logger = logging.getLogger(__name__) @@ -81,6 +84,8 @@ class RescoringProposal: severity: Severity matching_rules: list[str] applicable_rescorings: tuple[dso.model.ArtefactMetadata, ...] # "..." for dacite.from_dict + discovery_date: str + sprint: yp.Sprint | None def _find_cve_rescoring_rule_set( @@ -424,11 +429,35 @@ def filesystem_paths_for_finding( }) +def sprint_for_finding( + finding: dm.ArtefactMetaData, + severity: gcm.Severity | None, + max_processing_days: gcm.MaxProcessingTimesDays | None, + sprints: list[yp.Sprint], +) -> yp.Sprint | None: + if not severity or not max_processing_days or not sprints: + return None + + max_days = max_processing_days.for_severity(severity=severity) + date = finding.discovery_date + datetime.timedelta(days=max_days) + + for sprint in sorted(sprints, key=lambda sprint: sprint.end_date): + if sprint.end_date.date() > date: + break + else: + logger.warning(f'could not determine target sprint for {finding=} with {severity=}') + return None + + return sprint + + def _iter_rescoring_proposals( artefact_metadata: list[dm.ArtefactMetaData], rescorings: tuple[dso.model.ArtefactMetadata], rescoring_rules: tuple[dso.cvss.RescoringRule] | None, categorisation: dso.cvss.CveCategorisation | None, + max_processing_days: gcm.MaxProcessingTimesDays | None=None, + sprints: list[yp.Sprint]=[], ) -> collections.abc.Generator[RescoringProposal, None, None]: seen_ids = set() @@ -458,6 +487,22 @@ def _iter_rescoring_proposals( rescorings=rescorings, ) + if current_rescorings: + current_severity = gcm.Severity[current_rescorings[0].data.severity] + elif am.type != dso.model.Datatype.STRUCTURE_INFO: + # structure info does not have any severity but is just retrieved to evaluate + # whether a scan exists for the given artefacts (if no finding is found) + current_severity = gcm.Severity[am.data.get('severity')] + else: + current_severity = None + + sprint = sprint_for_finding( + finding=am, + severity=current_severity, + max_processing_days=max_processing_days, + sprints=sprints, + ) + if ( am.type == dso.model.Datatype.VULNERABILITY and am.data.get('id').get('source') == dso.model.Datasource.BDBA @@ -519,6 +564,8 @@ def _iter_rescoring_proposals( ) ] if rescoring_rules and categorisation else ['original-severity'], 'applicable_rescorings': current_rescorings, + 'discovery_date': am.discovery_date.isoformat(), + 'sprint': sprint, }, ) @@ -566,6 +613,8 @@ def _iter_rescoring_proposals( 'severity': severity, 'matching_rules': ['original-severity'], 'applicable_rescorings': current_rescorings, + 'discovery_date': am.discovery_date.isoformat(), + 'sprint': sprint, }, ) @@ -692,12 +741,16 @@ def __init__( component_descriptor_lookup: cnudie.retrieve.ComponentDescriptorLookupById, namespace_callback, kubernetes_api_callback, + sprints_repo_callback, + sprints_relpath_callback, ): self.cve_rescoring_rule_set_lookup = cve_rescoring_rule_set_lookup self.default_rule_set_callback = default_rule_set_callback self.component_descriptor_lookup = component_descriptor_lookup self.namespace_callback = namespace_callback self.kubernetes_api_callback = kubernetes_api_callback + self.sprints_repo_callback = sprints_repo_callback + self.sprints_relpath_callback = sprints_relpath_callback def on_post(self, req: falcon.Request, resp: falcon.Response): ''' @@ -891,11 +944,43 @@ def on_get(self, req: falcon.Request, resp: falcon.Response): type_filter=type_filter, ) + scan_configs = tuple(k8s.util.iter_scan_configurations( + namespace=self.namespace_callback(), + kubernetes_api=self.kubernetes_api_callback(), + )) + + # only if there is one scan config we can assume for sure that this config should be used + if len(scan_configs) != 1: + max_processing_days = None + sprints = [] + else: + scan_config = scan_configs[0] + issue_replicator_config = config.deserialise_issue_replicator_config( + spec_config=scan_config.config, + ) + if issue_replicator_config: + max_processing_days = issue_replicator_config.max_processing_days + else: + max_processing_days = gcm.MaxProcessingTimesDays() + + if ( + (repo := self.sprints_repo_callback()) + and (relpath := self.sprints_relpath_callback()) + ): + sprints = yp._sprints( + repo=repo, + sprints_file_relpath=relpath, + ) + else: + sprints = [] + rescoring_proposals = _iter_rescoring_proposals( artefact_metadata=artefact_metadata, rescorings=rescorings, rescoring_rules=cve_rescoring_rule_set.rules if cve_rescoring_rule_set else None, categorisation=categorisation, + max_processing_days=max_processing_days, + sprints=sprints, ) resp.media = tuple(rescoring_proposals)