Skip to content

Commit

Permalink
CM-31709 - Migrate to the new filter to fetch detection rules
Browse files Browse the repository at this point in the history
  • Loading branch information
MarshalX committed May 10, 2024
1 parent 9b36b51 commit 299e185
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 34 deletions.
15 changes: 5 additions & 10 deletions cycode/cli/commands/scan/code_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,14 @@ def _should_use_sync_flow(scan_type: str, sync_option: bool, scan_parameters: Op


def _enrich_scan_result_with_data_from_detection_rules(
cycode_client: 'ScanClient', scan_type: str, scan_result: ZippedFileScanResult
cycode_client: 'ScanClient', scan_result: ZippedFileScanResult
) -> None:
# TODO(MarshalX): remove scan_type arg after migration to new backend filter
if scan_type not in {consts.SECRET_SCAN_TYPE, consts.INFRA_CONFIGURATION_SCAN_TYPE}:
# not yet
return

detection_rule_ids = set()
for detections_per_file in scan_result.detections_per_file:
for detection in detections_per_file.detections:
detection_rule_ids.add(detection.detection_rule_id)

detection_rules = cycode_client.get_detection_rules(scan_type, detection_rule_ids)
detection_rules = cycode_client.get_detection_rules(detection_rule_ids)
detection_rules_by_id = {detection_rule.detection_rule_id: detection_rule for detection_rule in detection_rules}

for detections_per_file in scan_result.detections_per_file:
Expand All @@ -138,9 +133,9 @@ def _enrich_scan_result_with_data_from_detection_rules(
# we want to make sure that BE returned it. better to not map data instead of failed scan
continue

if detection_rule.classification_data:
if not detection.severity and detection_rule.classification_data:
# it's fine to take the first one, because:
# - for "secrets" and "iac" there is only one classification rule per detection rule
# - for "secrets" and "iac" there is only one classification rule per-detection rule
# - for "sca" and "sast" we get severity from detection service
detection.severity = detection_rule.classification_data[0].severity

Expand Down Expand Up @@ -187,7 +182,7 @@ def _scan_batch_thread_func(batch: List[Document]) -> Tuple[str, CliError, Local
should_use_sync_flow,
)

_enrich_scan_result_with_data_from_detection_rules(cycode_client, scan_type, scan_result)
_enrich_scan_result_with_data_from_detection_rules(cycode_client, scan_result)

local_scan_result = create_local_scan_result(
scan_result, batch, command_scan_type, scan_type, severity_threshold
Expand Down
3 changes: 1 addition & 2 deletions cycode/cyclient/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ class Meta:

message = fields.String()
type = fields.String()
severity = fields.String(missing='High')
# TODO(MarshalX): Remove "missing" arg when IaC and Secrets scans will have classifications
severity = fields.String(missing=None)
detection_type_id = fields.String()
detection_details = fields.Dict()
detection_rule_id = fields.String()
Expand Down
26 changes: 4 additions & 22 deletions cycode/cyclient/scan_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def get_detection_rules_path(self) -> str:
return (
f'{self.scan_config.get_detections_prefix()}/'
f'{self.POLICIES_SERVICE_CONTROLLER_PATH_V3}/'
f'detection_rules'
f'detection_rules/byIds'
)

@staticmethod
Expand All @@ -181,36 +181,18 @@ def _get_policy_type_by_scan_type(scan_type: str) -> str:

return scan_type_to_policy_type[scan_type]

@staticmethod
def _filter_detection_rules_by_ids(
detection_rules: List[models.DetectionRule], detection_rules_ids: Union[Set[str], List[str]]
) -> List[models.DetectionRule]:
ids = set(detection_rules_ids) # cast to set to perform faster search
return [rule for rule in detection_rules if rule.detection_rule_id in ids]

@staticmethod
def parse_detection_rules_response(response: Response) -> List[models.DetectionRule]:
return models.DetectionRuleSchema().load(response.json(), many=True)

def get_detection_rules(
self, scan_type: str, detection_rules_ids: Union[Set[str], List[str]]
) -> List[models.DetectionRule]:
# TODO(MarshalX): use filter by list of IDs instead of policy_type when BE will be ready
params = {
'include_hidden': False,
'include_only_enabled_detection_rules': True,
'page_number': 0,
'page_size': 5000,
'policy_types_v2': self._get_policy_type_by_scan_type(scan_type),
}
def get_detection_rules(self, detection_rules_ids: Union[Set[str], List[str]]) -> List[models.DetectionRule]:
response = self.scan_cycode_client.get(
url_path=self.get_detection_rules_path(),
params=params,
params={'ids': detection_rules_ids},
hide_response_content_log=self._hide_response_log,
)

# we are filtering rules by ids in-place for smooth migration when backend will be ready
return self._filter_detection_rules_by_ids(self.parse_detection_rules_response(response), detection_rules_ids)
return self.parse_detection_rules_response(response)

def get_scan_detections_path(self, scan_type: str) -> str:
return f'{self.scan_config.get_detections_prefix()}/{self.get_detections_service_controller_path(scan_type)}'
Expand Down

0 comments on commit 299e185

Please sign in to comment.