From ebbca2c880420dc60750826dea28799df499a9f9 Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Mon, 30 Oct 2023 14:50:57 +0100 Subject: [PATCH] CM-22319 - Run secrets scanning asynchronously using polling mechanism (#136) --- cycode/cli/code_scanner.py | 12 +- cycode/cyclient/scan_client.py | 45 +++-- cycode/cyclient/scan_config_base.py | 16 ++ tests/cli/test_main.py | 11 +- tests/conftest.py | 2 + tests/cyclient/mocked_responses/__init__.py | 0 .../mocked_responses/data/detections.json | 83 +++++++++ .../cyclient/mocked_responses/scan_client.py | 158 ++++++++++++++++++ tests/cyclient/test_scan_client.py | 67 +------- 9 files changed, 311 insertions(+), 83 deletions(-) create mode 100644 tests/cyclient/mocked_responses/__init__.py create mode 100644 tests/cyclient/mocked_responses/data/detections.json create mode 100644 tests/cyclient/mocked_responses/scan_client.py diff --git a/cycode/cli/code_scanner.py b/cycode/cli/code_scanner.py index 1801d63c..5920fc80 100644 --- a/cycode/cli/code_scanner.py +++ b/cycode/cli/code_scanner.py @@ -546,13 +546,13 @@ def perform_scan( is_commit_range: bool, scan_parameters: dict, ) -> ZippedFileScanResult: - if scan_type in (consts.SCA_SCAN_TYPE, consts.SAST_SCAN_TYPE): - return perform_scan_async(cycode_client, zipped_documents, scan_type, scan_parameters) - if is_commit_range: return cycode_client.commit_range_zipped_file_scan(scan_type, zipped_documents, scan_id) - return cycode_client.zipped_file_scan(scan_type, zipped_documents, scan_id, scan_parameters, is_git_diff) + if scan_type == consts.INFRA_CONFIGURATION_SCAN_TYPE: + return cycode_client.zipped_file_scan(scan_type, zipped_documents, scan_id, scan_parameters, is_git_diff) + + return perform_scan_async(cycode_client, zipped_documents, scan_type, scan_parameters) def perform_scan_async( @@ -1025,6 +1025,10 @@ def _map_detections_per_file(detections: List[dict]) -> List[DetectionsPerFile]: def _get_file_name_from_detection(detection: dict) -> str: if detection['category'] == 'SAST': return detection['detection_details']['file_path'] + if detection['category'] == 'SecretDetection': + file_path = detection['detection_details']['file_path'] + file_name = detection['detection_details']['file_name'] + return os.path.join(file_path, file_name) return detection['detection_details']['file_name'] diff --git a/cycode/cyclient/scan_client.py b/cycode/cyclient/scan_client.py index 5830e9dc..50ee0e79 100644 --- a/cycode/cyclient/scan_client.py +++ b/cycode/cyclient/scan_client.py @@ -31,14 +31,16 @@ def content_scan(self, scan_type: str, file_name: str, content: str, is_git_diff ) return self.parse_scan_response(response) + def get_zipped_file_scan_url_path(self, scan_type: str) -> str: + return f'{self.scan_config.get_service_name(scan_type)}/{self.SCAN_CONTROLLER_PATH}/zipped-file' + def zipped_file_scan( self, scan_type: str, zip_file: InMemoryZip, scan_id: str, scan_parameters: dict, is_git_diff: bool = False ) -> models.ZippedFileScanResult: - url_path = f'{self.scan_config.get_service_name(scan_type)}/{self.SCAN_CONTROLLER_PATH}/zipped-file' files = {'file': ('multiple_files_scan.zip', zip_file.read())} response = self.scan_cycode_client.post( - url_path=url_path, + url_path=self.get_zipped_file_scan_url_path(scan_type), data={'scan_id': scan_id, 'is_git_diff': is_git_diff, 'scan_parameters': json.dumps(scan_parameters)}, files=files, hide_response_content_log=self._hide_response_log, @@ -46,13 +48,19 @@ def zipped_file_scan( return self.parse_zipped_file_scan_response(response) + def get_zipped_file_scan_async_url_path(self, scan_type: str) -> str: + async_scan_type = self.scan_config.get_async_scan_type(scan_type) + async_entity_type = self.scan_config.get_async_entity_type(scan_type) + + url_prefix = self.scan_config.get_scans_prefix() + return f'{url_prefix}/{self.SCAN_CONTROLLER_PATH}/{async_scan_type}/{async_entity_type}' + def zipped_file_scan_async( self, zip_file: InMemoryZip, scan_type: str, scan_parameters: dict, is_git_diff: bool = False ) -> models.ScanInitializationResponse: - url_path = f'{self.scan_config.get_scans_prefix()}/{self.SCAN_CONTROLLER_PATH}/{scan_type}/repository' files = {'file': ('multiple_files_scan.zip', zip_file.read())} response = self.scan_cycode_client.post( - url_path=url_path, + url_path=self.get_zipped_file_scan_async_url_path(scan_type), data={'is_git_diff': is_git_diff, 'scan_parameters': json.dumps(scan_parameters)}, files=files, ) @@ -80,13 +88,17 @@ def multiple_zipped_file_scan_async( ) return models.ScanInitializationResponseSchema().load(response.json()) + def get_scan_details_path(self, scan_id: str) -> str: + return f'{self.scan_config.get_scans_prefix()}/{self.SCAN_CONTROLLER_PATH}/{scan_id}' + def get_scan_details(self, scan_id: str) -> models.ScanDetailsResponse: - url_path = f'{self.scan_config.get_scans_prefix()}/{self.SCAN_CONTROLLER_PATH}/{scan_id}' - response = self.scan_cycode_client.get(url_path=url_path) + response = self.scan_cycode_client.get(url_path=self.get_scan_details_path(scan_id)) return models.ScanDetailsResponseSchema().load(response.json()) + def get_scan_detections_path(self) -> str: + return f'{self.scan_config.get_detections_prefix()}/{self.DETECTIONS_SERVICE_CONTROLLER_PATH}' + def get_scan_detections(self, scan_id: str) -> List[dict]: - url_path = f'{self.scan_config.get_detections_prefix()}/{self.DETECTIONS_SERVICE_CONTROLLER_PATH}' params = {'scan_id': scan_id} page_size = 200 @@ -100,7 +112,9 @@ def get_scan_detections(self, scan_id: str) -> List[dict]: params['page_number'] = page_number response = self.scan_cycode_client.get( - url_path=url_path, params=params, hide_response_content_log=self._hide_response_log + url_path=self.get_scan_detections_path(), + params=params, + hide_response_content_log=self._hide_response_log, ).json() detections.extend(response) @@ -109,9 +123,13 @@ def get_scan_detections(self, scan_id: str) -> List[dict]: return detections + def get_get_scan_detections_count_path(self) -> str: + return f'{self.scan_config.get_detections_prefix()}/{self.DETECTIONS_SERVICE_CONTROLLER_PATH}/count' + def get_scan_detections_count(self, scan_id: str) -> int: - url_path = f'{self.scan_config.get_detections_prefix()}/{self.DETECTIONS_SERVICE_CONTROLLER_PATH}/count' - response = self.scan_cycode_client.get(url_path=url_path, params={'scan_id': scan_id}) + response = self.scan_cycode_client.get( + url_path=self.get_get_scan_detections_count_path(), params={'scan_id': scan_id} + ) return response.json().get('count', 0) def commit_range_zipped_file_scan( @@ -126,9 +144,11 @@ def commit_range_zipped_file_scan( ) return self.parse_zipped_file_scan_response(response) + def get_report_scan_status_path(self, scan_type: str, scan_id: str) -> str: + return f'{self.scan_config.get_service_name(scan_type)}/{self.SCAN_CONTROLLER_PATH}/{scan_id}/status' + def report_scan_status(self, scan_type: str, scan_id: str, scan_status: dict) -> None: - url_path = f'{self.scan_config.get_service_name(scan_type)}/{self.SCAN_CONTROLLER_PATH}/{scan_id}/status' - self.scan_cycode_client.post(url_path=url_path, body=scan_status) + self.scan_cycode_client.post(url_path=self.get_report_scan_status_path(scan_type, scan_id), body=scan_status) @staticmethod def parse_scan_response(response: Response) -> models.ScanResult: @@ -140,6 +160,7 @@ def parse_zipped_file_scan_response(response: Response) -> models.ZippedFileScan @staticmethod def get_service_name(scan_type: str) -> Optional[str]: + # TODO(MarshalX): get_service_name should be removed from ScanClient? Because it exists in ScanConfig if scan_type == 'secret': return 'secret' if scan_type == 'iac': diff --git a/cycode/cyclient/scan_config_base.py b/cycode/cyclient/scan_config_base.py index 976008bb..0fffa1a2 100644 --- a/cycode/cyclient/scan_config_base.py +++ b/cycode/cyclient/scan_config_base.py @@ -6,6 +6,22 @@ class ScanConfigBase(ABC): def get_service_name(self, scan_type: str) -> str: ... + @staticmethod + def get_async_scan_type(scan_type: str) -> str: + if scan_type == 'secret': + return 'Secrets' + if scan_type == 'iac': + return 'InfraConfiguration' + + return scan_type.upper() + + @staticmethod + def get_async_entity_type(scan_type: str) -> str: + if scan_type == 'secret': + return 'zippedfile' + + return 'repository' + @abstractmethod def get_scans_prefix(self) -> str: ... diff --git a/tests/cli/test_main.py b/tests/cli/test_main.py index 8c3c6246..aa196c0e 100644 --- a/tests/cli/test_main.py +++ b/tests/cli/test_main.py @@ -1,13 +1,14 @@ import json from typing import TYPE_CHECKING +from uuid import UUID import pytest import responses from click.testing import CliRunner from cycode.cli.main import main_cli -from tests.conftest import CLI_ENV_VARS, TEST_FILES_PATH -from tests.cyclient.test_scan_client import get_zipped_file_scan_response, get_zipped_file_scan_url +from tests.conftest import CLI_ENV_VARS, TEST_FILES_PATH, ZIP_CONTENT_PATH +from tests.cyclient.mocked_responses.scan_client import mock_scan_async_responses _PATH_TO_SCAN = TEST_FILES_PATH.joinpath('zip_content').absolute() @@ -27,12 +28,10 @@ def _is_json(plain: str) -> bool: @pytest.mark.parametrize('output', ['text', 'json']) def test_passing_output_option(output: str, scan_client: 'ScanClient', api_token_response: responses.Response) -> None: scan_type = 'secret' + scan_id = UUID('12345678-418f-47ee-abb0-012345678901') - responses.add(get_zipped_file_scan_response(get_zipped_file_scan_url(scan_type, scan_client))) responses.add(api_token_response) - # Scan report is not mocked. - # This raises connection error on the attempt to report scan. - # It doesn't perform real request + mock_scan_async_responses(responses, scan_type, scan_client, scan_id, ZIP_CONTENT_PATH) args = ['--output', output, 'scan', '--soft-fail', 'path', str(_PATH_TO_SCAN)] result = CliRunner().invoke(main_cli, args, env=CLI_ENV_VARS) diff --git a/tests/conftest.py b/tests/conftest.py index fdb02ec2..dc1a84fe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,6 +15,8 @@ CLI_ENV_VARS = {'CYCODE_CLIENT_ID': _CLIENT_ID, 'CYCODE_CLIENT_SECRET': _CLIENT_SECRET} TEST_FILES_PATH = Path(__file__).parent.joinpath('test_files').absolute() +MOCKED_RESPONSES_PATH = Path(__file__).parent.joinpath('cyclient/mocked_responses/data').absolute() +ZIP_CONTENT_PATH = TEST_FILES_PATH.joinpath('zip_content').absolute() @pytest.fixture(scope='session') diff --git a/tests/cyclient/mocked_responses/__init__.py b/tests/cyclient/mocked_responses/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cyclient/mocked_responses/data/detections.json b/tests/cyclient/mocked_responses/data/detections.json new file mode 100644 index 00000000..89196489 --- /dev/null +++ b/tests/cyclient/mocked_responses/data/detections.json @@ -0,0 +1,83 @@ +[ + { + "source_policy_name": "Secrets detection", + "source_policy_type": "SensitiveContent", + "source_entity_name": null, + "source_entity_id": null, + "detection_type_id": "7dff932a-418f-47ee-abb0-703e0f6592cd", + "root_id": null, + "status": "Open", + "status_updated_at": null, + "status_reason": null, + "status_change_message": null, + "source_entity_type": "Audit", + "detection_details": { + "organization_name": null, + "organization_id": "", + "sha512": "6e6c867188c04340d9ecfa1b7e56a356e605f2a70fbda865f11b4a57eb07e634", + "provider": "CycodeCli", + "concrete_provider": "CycodeCli", + "length": 55, + "start_position": 19, + "line": 0, + "commit_id": null, + "member_id": "", + "member_name": "", + "member_email": "", + "author_name": "", + "author_email": "", + "branch_name": "", + "committer_name": "", + "committed_at": "0001-01-01T00:00:00+00:00", + "file_path": "%FILEPATH%", + "file_name": "secrets.py", + "file_extension": ".py", + "url": null, + "should_resolve_upon_branch_deletion": false, + "position_in_line": 19, + "repository_name": null, + "repository_id": null, + "old_detection_id": "f35c42f99d3712d4593d5ee16a9ceb36ca9fb20b33e68edd0e00847e6a02a7b6" + }, + "severity": "Medium", + "remediable": false, + "correlation_message": "Secret of type 'Slack Token' was found in filename 'secrets.py' within '' repository", + "provider": "CycodeCli", + "scan_id": "%SCAN_ID%", + "assignee_id": null, + "type": "slack-token", + "is_hidden": false, + "tags": [], + "detection_rule_id": "26ab3395-2522-4061-a50a-c69c2d622ca1", + "classification": null, + "priority": 0, + "metadata": null, + "labels": [], + "detection_id": "f35c42f99d3712d4593d5ee16a9ceb36ca9fb20b33e68edd0e00847e6a02a7b6", + "internal_note": null, + "sdlc_stages": [ + "Code", + "Container Registry", + "Productivity Tools", + "Cloud", + "Build" + ], + "policy_labels": [], + "category": "SecretDetection", + "sub_category": "SensitiveContent", + "sub_category_v2": "Messaging Systems", + "policy_tags": [], + "remediations": [], + "instruction_details": { + "instruction_name_to_single_id_map": null, + "instruction_name_to_multiple_ids_map": null, + "instruction_tags": null + }, + "external_detection_references": [], + "project_ids": [], + "tenant_id": "123456-663f-4e27-9170-e559c2379292", + "id": "123456-895a-4830-b6c1-b948e99b71a4", + "created_date": "2023-10-25T11:07:14.7516793+00:00", + "updated_date": "2023-10-25T11:07:14.7616063+00:00" + } +] \ No newline at end of file diff --git a/tests/cyclient/mocked_responses/scan_client.py b/tests/cyclient/mocked_responses/scan_client.py new file mode 100644 index 00000000..8518e2b9 --- /dev/null +++ b/tests/cyclient/mocked_responses/scan_client.py @@ -0,0 +1,158 @@ +import json +from pathlib import Path +from typing import Optional +from uuid import UUID, uuid4 + +import responses + +from cycode.cyclient.scan_client import ScanClient +from tests.conftest import MOCKED_RESPONSES_PATH + + +def get_zipped_file_scan_url(scan_type: str, scan_client: ScanClient) -> str: + api_url = scan_client.scan_cycode_client.api_url + service_url = scan_client.get_zipped_file_scan_url_path(scan_type) + return f'{api_url}/{service_url}' + + +def get_zipped_file_scan_response( + url: str, zip_content_path: Path, scan_id: Optional[UUID] = None +) -> responses.Response: + if not scan_id: + scan_id = uuid4() + + json_response = { + 'did_detect': True, + 'scan_id': str(scan_id), # not always as expected due to _get_scan_id and passing scan_id to cxt of CLI + 'detections_per_file': [ + { + 'file_name': str(zip_content_path.joinpath('secrets.py')), + 'commit_id': None, + 'detections': [ + { + 'detection_type_id': '12345678-418f-47ee-abb0-012345678901', + 'detection_rule_id': '12345678-aea1-4304-a6e9-012345678901', + 'message': "Secret of type 'Slack Token' was found in filename 'secrets.py'", + 'type': 'slack-token', + 'is_research': False, + 'detection_details': { + 'sha512': 'sha hash', + 'length': 55, + 'start_position': 19, + 'line': 0, + 'committed_at': '0001-01-01T00:00:00+00:00', + 'file_path': str(zip_content_path), + 'file_name': 'secrets.py', + 'file_extension': '.py', + 'should_resolve_upon_branch_deletion': False, + }, + } + ], + } + ], + 'report_url': None, + } + + return responses.Response(method=responses.POST, url=url, json=json_response, status=200) + + +def get_zipped_file_scan_async_url(scan_type: str, scan_client: ScanClient) -> str: + api_url = scan_client.scan_cycode_client.api_url + service_url = scan_client.get_zipped_file_scan_async_url_path(scan_type) + return f'{api_url}/{service_url}' + + +def get_zipped_file_scan_async_response(url: str, scan_id: Optional[UUID] = None) -> responses.Response: + if not scan_id: + scan_id = uuid4() + + json_response = { + 'scan_id': str(scan_id), # not always as expected due to _get_scan_id and passing scan_id to cxt of CLI + } + + return responses.Response(method=responses.POST, url=url, json=json_response, status=200) + + +def get_scan_details_url(scan_id: Optional[UUID], scan_client: ScanClient) -> str: + api_url = scan_client.scan_cycode_client.api_url + service_url = scan_client.get_scan_details_path(str(scan_id)) + return f'{api_url}/{service_url}' + + +def get_scan_details_response(url: str, scan_id: Optional[UUID] = None) -> responses.Response: + if not scan_id: + scan_id = uuid4() + + json_response = { + 'id': str(scan_id), + 'scan_type': 'Secrets', + 'metadata': f'Path: {scan_id}, Folder: scans, Size: 465', + 'entity_type': 'ZippedFile', + 'entity_id': 'Repository', + 'parent_entity_id': 'Organization', + 'organization_id': 'Organization', + 'scm_provider': 'CycodeCli', + 'started_at': '2023-10-25T10:02:23.048282+00:00', + 'finished_at': '2023-10-25T10:02:26.867082+00:00', + 'scan_status': 'Completed', # mark as completed to avoid mocking repeated requests + 'message': None, + 'results_count': 1, + 'scan_update_at': '2023-10-25T10:02:26.867216+00:00', + 'duration': {'days': 0, 'hours': 0, 'minutes': 0, 'seconds': 3, 'milliseconds': 818}, + 'is_hidden': False, + 'is_initial_scan': False, + 'detection_messages': [], + } + + return responses.Response(method=responses.GET, url=url, json=json_response, status=200) + + +def get_scan_detections_count_url(scan_client: ScanClient) -> str: + api_url = scan_client.scan_cycode_client.api_url + service_url = scan_client.get_get_scan_detections_count_path() + return f'{api_url}/{service_url}' + + +def get_scan_detections_count_response(url: str) -> responses.Response: + json_response = {'count': 1} + + return responses.Response(method=responses.GET, url=url, json=json_response, status=200) + + +def get_scan_detections_url(scan_client: ScanClient) -> str: + api_url = scan_client.scan_cycode_client.api_url + service_url = scan_client.get_scan_detections_path() + return f'{api_url}/{service_url}' + + +def get_scan_detections_response(url: str, scan_id: UUID, zip_content_path: Path) -> responses.Response: + with open(MOCKED_RESPONSES_PATH.joinpath('detections.json'), encoding='UTF-8') as f: + content = f.read() + content = content.replace('%FILEPATH%', str(zip_content_path.absolute().as_posix())) + content = content.replace('%SCAN_ID%', str(scan_id)) + + json_response = json.loads(content) + + return responses.Response(method=responses.GET, url=url, json=json_response, status=200) + + +def get_report_scan_status_url(scan_type: str, scan_id: UUID, scan_client: ScanClient) -> str: + api_url = scan_client.scan_cycode_client.api_url + service_url = scan_client.get_report_scan_status_path(scan_type, str(scan_id)) + return f'{api_url}/{service_url}' + + +def get_report_scan_status_response(url: str) -> responses.Response: + return responses.Response(method=responses.POST, url=url, status=200) + + +def mock_scan_async_responses( + responses_module: responses, scan_type: str, scan_client: ScanClient, scan_id: UUID, zip_content_path: Path +) -> None: + responses_module.add( + get_zipped_file_scan_async_response(get_zipped_file_scan_async_url(scan_type, scan_client), scan_id) + ) + responses_module.add(get_scan_details_response(get_scan_details_url(scan_id, scan_client), scan_id)) + responses_module.add(get_scan_detections_count_response(get_scan_detections_count_url(scan_client))) + responses_module.add(get_scan_detections_response(get_scan_detections_url(scan_client), scan_id, zip_content_path)) + responses_module.add(get_report_scan_status_response(get_report_scan_status_url(scan_type, scan_id, scan_client))) diff --git a/tests/cyclient/test_scan_client.py b/tests/cyclient/test_scan_client.py index 2ca374b2..1f8ed462 100644 --- a/tests/cyclient/test_scan_client.py +++ b/tests/cyclient/test_scan_client.py @@ -1,6 +1,6 @@ import os -from typing import List, Optional, Tuple -from uuid import UUID, uuid4 +from typing import List, Tuple +from uuid import uuid4 import pytest import requests @@ -14,9 +14,8 @@ from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip from cycode.cli.models import Document from cycode.cyclient.scan_client import ScanClient -from tests.conftest import TEST_FILES_PATH - -_ZIP_CONTENT_PATH = TEST_FILES_PATH.joinpath('zip_content').absolute() +from tests.conftest import ZIP_CONTENT_PATH +from tests.cyclient.mocked_responses.scan_client import get_zipped_file_scan_response, get_zipped_file_scan_url def zip_scan_resources(scan_type: str, scan_client: ScanClient) -> Tuple[str, InMemoryZip]: @@ -26,17 +25,10 @@ def zip_scan_resources(scan_type: str, scan_client: ScanClient) -> Tuple[str, In return url, zip_file -def get_zipped_file_scan_url(scan_type: str, scan_client: ScanClient) -> str: - api_url = scan_client.scan_cycode_client.api_url - # TODO(MarshalX): create method in the scan client to build this url - service_url = f'{api_url}/{scan_client.scan_config.get_service_name(scan_type)}' - return f'{service_url}/{scan_client.SCAN_CONTROLLER_PATH}/zipped-file' - - def get_test_zip_file(scan_type: str) -> InMemoryZip: # TODO(MarshalX): refactor scan_disk_files in code_scanner.py to reuse method here instead of this test_documents: List[Document] = [] - for root, _, files in os.walk(_ZIP_CONTENT_PATH): + for root, _, files in os.walk(ZIP_CONTENT_PATH): for name in files: path = os.path.join(root, name) with open(path, 'r', encoding='UTF-8') as f: @@ -45,53 +37,6 @@ def get_test_zip_file(scan_type: str) -> InMemoryZip: return zip_documents(scan_type, test_documents) -def get_zipped_file_scan_response(url: str, scan_id: Optional[UUID] = None) -> responses.Response: - if not scan_id: - scan_id = uuid4() - - json_response = { - 'did_detect': True, - 'scan_id': str(scan_id), # not always as expected due to _get_scan_id and passing scan_id to cxt of CLI - 'detections_per_file': [ - { - 'file_name': str(_ZIP_CONTENT_PATH.joinpath('secrets.py')), - 'commit_id': None, - 'detections': [ - { - 'detection_type_id': '12345678-418f-47ee-abb0-012345678901', - 'detection_rule_id': '12345678-aea1-4304-a6e9-012345678901', - 'message': "Secret of type 'Slack Token' was found in filename 'secrets.py'", - 'type': 'slack-token', - 'is_research': False, - 'detection_details': { - 'sha512': 'sha hash', - 'length': 55, - 'start_position': 19, - 'line': 0, - 'committed_at': '0001-01-01T00:00:00+00:00', - 'file_path': str(_ZIP_CONTENT_PATH), - 'file_name': 'secrets.py', - 'file_extension': '.py', - 'should_resolve_upon_branch_deletion': False, - }, - } - ], - } - ], - 'report_url': None, - } - - return responses.Response(method=responses.POST, url=url, json=json_response, status=200) - - -def test_get_service_name(scan_client: ScanClient) -> None: - # TODO(MarshalX): get_service_name should be removed from ScanClient? Because it exists in ScanConfig - assert scan_client.get_service_name('secret') == 'secret' - assert scan_client.get_service_name('iac') == 'iac' - assert scan_client.get_service_name('sca') == 'scans' - assert scan_client.get_service_name('sast') == 'scans' - - @pytest.mark.parametrize('scan_type', config['scans']['supported_scans']) @responses.activate def test_zipped_file_scan(scan_type: str, scan_client: ScanClient, api_token_response: responses.Response) -> None: @@ -99,7 +44,7 @@ def test_zipped_file_scan(scan_type: str, scan_client: ScanClient, api_token_res expected_scan_id = uuid4() responses.add(api_token_response) # mock token based client - responses.add(get_zipped_file_scan_response(url, expected_scan_id)) + responses.add(get_zipped_file_scan_response(url, ZIP_CONTENT_PATH, expected_scan_id)) zipped_file_scan_response = scan_client.zipped_file_scan( scan_type, zip_file, scan_id=str(expected_scan_id), scan_parameters={}