From ce30a69bb68b7971d5d49f51b3ba7660ea572896 Mon Sep 17 00:00:00 2001 From: Sara Montefiore Date: Wed, 22 May 2024 08:51:06 +0300 Subject: [PATCH 1/9] CM-34882-support one report url --- cycode/cli/commands/scan/code_scanner.py | 38 +++++++++++++++---- cycode/cli/printers/console_printer.py | 7 +++- cycode/cli/printers/json_printer.py | 5 ++- cycode/cli/printers/printer_base.py | 5 ++- .../cli/printers/tables/sca_table_printer.py | 6 +-- cycode/cli/printers/tables/table_printer.py | 4 +- .../cli/printers/tables/table_printer_base.py | 16 +++++--- cycode/cli/printers/text_printer.py | 5 ++- cycode/cyclient/scan_client.py | 26 +++++++++++-- 9 files changed, 84 insertions(+), 28 deletions(-) diff --git a/cycode/cli/commands/scan/code_scanner.py b/cycode/cli/commands/scan/code_scanner.py index fa47c6e3..1b0f51c1 100644 --- a/cycode/cli/commands/scan/code_scanner.py +++ b/cycode/cli/commands/scan/code_scanner.py @@ -148,14 +148,14 @@ def _enrich_scan_result_with_data_from_detection_rules( def _get_scan_documents_thread_func( context: click.Context, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict -) -> Callable[[List[Document]], Tuple[str, CliError, LocalScanResult]]: +) -> tuple[Callable[[list[Document]], tuple[str, CliError, LocalScanResult]], str]: cycode_client = context.obj['client'] scan_type = context.obj['scan_type'] severity_threshold = context.obj['severity_threshold'] sync_option = context.obj['sync'] command_scan_type = context.info_name - - scan_parameters['aggregation_id'] = str(_generate_unique_id()) + aggregation_id = str(_generate_unique_id()) + scan_parameters['aggregation_id'] = aggregation_id def _scan_batch_thread_func(batch: List[Document]) -> Tuple[str, CliError, LocalScanResult]: local_scan_result = error = error_message = None @@ -224,7 +224,7 @@ def _scan_batch_thread_func(batch: List[Document]) -> Tuple[str, CliError, Local return scan_id, error, local_scan_result - return _scan_batch_thread_func + return _scan_batch_thread_func, aggregation_id def scan_commit_range( @@ -312,17 +312,36 @@ def scan_documents( ) return - scan_batch_thread_func = _get_scan_documents_thread_func(context, is_git_diff, is_commit_range, scan_parameters) + scan_batch_thread_func, aggregation_id = _get_scan_documents_thread_func( + context, is_git_diff, is_commit_range, scan_parameters + ) errors, local_scan_results = run_parallel_batched_scan( scan_batch_thread_func, documents_to_scan, progress_bar=progress_bar ) + aggregation_report_url = _try_get_aggregation_report_url_if_needed( + scan_parameters, context.obj['client'], context.obj['scan_type'], aggregation_id + ) progress_bar.set_section_length(ScanProgressBarSection.GENERATE_REPORT, 1) progress_bar.update(ScanProgressBarSection.GENERATE_REPORT) progress_bar.stop() set_issue_detected_by_scan_results(context, local_scan_results) - print_results(context, local_scan_results, errors) + print_results(context, local_scan_results, errors, aggregation_report_url) + + +def _try_get_aggregation_report_url_if_needed( + scan_parameters: dict, cycode_client: 'ScanClient', scan_type: str, aggregation_id: str = '' +) -> Optional[str]: + if not scan_parameters.get('report'): + return None + if aggregation_id is None: + return None + try: + report_url_response = cycode_client.get_scan_aggregation_report_url(aggregation_id, scan_type) + return report_url_response.report_url + except Exception as e: + logger.debug('Failed to get aggregation report url: %s', str(e)) def scan_commit_range_documents( @@ -560,10 +579,13 @@ def print_debug_scan_details(scan_details_response: 'ScanDetailsResponse') -> No def print_results( - context: click.Context, local_scan_results: List[LocalScanResult], errors: Optional[Dict[str, 'CliError']] = None + context: click.Context, + local_scan_results: List[LocalScanResult], + errors: Optional[Dict[str, 'CliError']] = None, + aggregation_report_url: str = '', ) -> None: printer = ConsolePrinter(context) - printer.print_scan_results(local_scan_results, errors) + printer.print_scan_results(local_scan_results, errors, aggregation_report_url=aggregation_report_url) def get_document_detections( diff --git a/cycode/cli/printers/console_printer.py b/cycode/cli/printers/console_printer.py index 667cdd6a..9075b21a 100644 --- a/cycode/cli/printers/console_printer.py +++ b/cycode/cli/printers/console_printer.py @@ -34,10 +34,13 @@ def __init__(self, context: click.Context) -> None: raise CycodeError(f'"{self.output_type}" output type is not supported.') def print_scan_results( - self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None + self, + local_scan_results: List['LocalScanResult'], + errors: Optional[Dict[str, 'CliError']] = None, + aggregation_report_url: str = '', ) -> None: printer = self._get_scan_printer() - printer.print_scan_results(local_scan_results, errors) + printer.print_scan_results(local_scan_results, errors, aggregation_report_url=aggregation_report_url) def _get_scan_printer(self) -> 'PrinterBase': printer_class = self._printer_class diff --git a/cycode/cli/printers/json_printer.py b/cycode/cli/printers/json_printer.py index 44ec9c85..330e6f2f 100644 --- a/cycode/cli/printers/json_printer.py +++ b/cycode/cli/printers/json_printer.py @@ -23,7 +23,10 @@ def print_error(self, error: CliError) -> None: click.echo(self.get_data_json(result)) def print_scan_results( - self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None + self, + local_scan_results: List['LocalScanResult'], + errors: Optional[Dict[str, 'CliError']] = None, + aggregation_report_url: str = '', ) -> None: scan_ids = [] report_urls = [] diff --git a/cycode/cli/printers/printer_base.py b/cycode/cli/printers/printer_base.py index cc354082..f23c1bc3 100644 --- a/cycode/cli/printers/printer_base.py +++ b/cycode/cli/printers/printer_base.py @@ -21,7 +21,10 @@ def __init__(self, context: click.Context) -> None: @abstractmethod def print_scan_results( - self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None + self, + local_scan_results: List['LocalScanResult'], + errors: Optional[Dict[str, 'CliError']] = None, + aggregation_report_url: str = '', ) -> None: pass diff --git a/cycode/cli/printers/tables/sca_table_printer.py b/cycode/cli/printers/tables/sca_table_printer.py index 268f8614..77324b96 100644 --- a/cycode/cli/printers/tables/sca_table_printer.py +++ b/cycode/cli/printers/tables/sca_table_printer.py @@ -13,7 +13,6 @@ if TYPE_CHECKING: from cycode.cli.models import LocalScanResult - column_builder = ColumnInfoBuilder() # Building must have strict order. Represents the order of the columns in the table (from left to right) @@ -29,7 +28,6 @@ DIRECT_DEPENDENCY_COLUMN = column_builder.build(name='Direct Dependency') DEVELOPMENT_DEPENDENCY_COLUMN = column_builder.build(name='Development Dependency') - COLUMN_WIDTHS_CONFIG: ColumnWidths = { REPOSITORY_COLUMN: 2, CODE_PROJECT_COLUMN: 2, @@ -41,7 +39,7 @@ class ScaTablePrinter(TablePrinterBase): - def _print_results(self, local_scan_results: List['LocalScanResult']) -> None: + def _print_results(self, local_scan_results: List['LocalScanResult'], aggregation_report_url: str = '') -> None: detections_per_policy_id = self._extract_detections_per_policy_id(local_scan_results) for policy_id, detections in detections_per_policy_id.items(): table = self._get_table(policy_id) @@ -53,7 +51,7 @@ def _print_results(self, local_scan_results: List['LocalScanResult']) -> None: self._print_summary_issues(len(detections), self._get_title(policy_id)) self._print_table(table) - self._print_report_urls(local_scan_results) + self._print_report_urls(local_scan_results, aggregation_report_url) @staticmethod def _get_title(policy_id: str) -> str: diff --git a/cycode/cli/printers/tables/table_printer.py b/cycode/cli/printers/tables/table_printer.py index 6afd9e66..daa76024 100644 --- a/cycode/cli/printers/tables/table_printer.py +++ b/cycode/cli/printers/tables/table_printer.py @@ -51,7 +51,7 @@ class TablePrinter(TablePrinterBase): - def _print_results(self, local_scan_results: List['LocalScanResult']) -> None: + def _print_results(self, local_scan_results: List['LocalScanResult'], aggregation_report_url: str = '') -> None: table = self._get_table() if self.scan_type in COLUMN_WIDTHS_CONFIG: table.set_cols_width(COLUMN_WIDTHS_CONFIG[self.scan_type]) @@ -63,7 +63,7 @@ def _print_results(self, local_scan_results: List['LocalScanResult']) -> None: self._enrich_table_with_values(table, detection, document_detections.document) self._print_table(table) - self._print_report_urls(local_scan_results) + self._print_report_urls(local_scan_results, aggregation_report_url) def _get_table(self) -> Table: table = Table() diff --git a/cycode/cli/printers/tables/table_printer_base.py b/cycode/cli/printers/tables/table_printer_base.py index 9b6e8ac7..ba9bcfc9 100644 --- a/cycode/cli/printers/tables/table_printer_base.py +++ b/cycode/cli/printers/tables/table_printer_base.py @@ -25,13 +25,16 @@ def print_error(self, error: CliError) -> None: TextPrinter(self.context).print_error(error) def print_scan_results( - self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None + self, + local_scan_results: List['LocalScanResult'], + errors: Optional[Dict[str, 'CliError']] = None, + aggregation_report_url: str = '', ) -> None: if not errors and all(result.issue_detected == 0 for result in local_scan_results): click.secho('Good job! No issues were found!!! 👏👏👏', fg=self.GREEN_COLOR_NAME) return - self._print_results(local_scan_results) + self._print_results(local_scan_results, aggregation_report_url=aggregation_report_url) if not errors: return @@ -49,7 +52,7 @@ def _is_git_repository(self) -> bool: return self.context.obj.get('remote_url') is not None @abc.abstractmethod - def _print_results(self, local_scan_results: List['LocalScanResult']) -> None: + def _print_results(self, local_scan_results: List['LocalScanResult'], aggregation_report_url: str = '') -> None: raise NotImplementedError @staticmethod @@ -58,9 +61,12 @@ def _print_table(table: 'Table') -> None: click.echo(table.get_table().draw()) @staticmethod - def _print_report_urls(local_scan_results: List['LocalScanResult']) -> None: + def _print_report_urls(local_scan_results: List['LocalScanResult'], aggregation_report_url: str = '') -> None: report_urls = [scan_result.report_url for scan_result in local_scan_results if scan_result.report_url] - if not report_urls: + if not report_urls and not aggregation_report_url: + return + if aggregation_report_url: + click.echo(f'Aggregation report URL: {aggregation_report_url}') return click.echo('Report URLs:') diff --git a/cycode/cli/printers/text_printer.py b/cycode/cli/printers/text_printer.py index 2bbab6a3..8633237a 100644 --- a/cycode/cli/printers/text_printer.py +++ b/cycode/cli/printers/text_printer.py @@ -31,7 +31,10 @@ def print_error(self, error: CliError) -> None: click.secho(error.message, fg=self.RED_COLOR_NAME) def print_scan_results( - self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None + self, + local_scan_results: List['LocalScanResult'], + errors: Optional[Dict[str, 'CliError']] = None, + aggregation_report_url: str = '', ) -> None: if not errors and all(result.issue_detected == 0 for result in local_scan_results): click.secho('Good job! No issues were found!!! 👏👏👏', fg=self.GREEN_COLOR_NAME) diff --git a/cycode/cyclient/scan_client.py b/cycode/cyclient/scan_client.py index e3e2c85b..34910835 100644 --- a/cycode/cyclient/scan_client.py +++ b/cycode/cyclient/scan_client.py @@ -30,11 +30,13 @@ def __init__( self._hide_response_log = hide_response_log - def get_scan_controller_path(self, scan_type: str, should_use_scan_service: bool = False) -> str: + def get_scan_controller_path( + self, scan_type: str, should_use_scan_service: bool = False, should_use_cli_path: bool = False + ) -> str: if scan_type == consts.INFRA_CONFIGURATION_SCAN_TYPE: # we don't use async flow for IaC scan yet return self._SCAN_SERVICE_CONTROLLER_PATH - if not should_use_scan_service and scan_type == consts.SECRET_SCAN_TYPE: + if not should_use_scan_service and scan_type == consts.SECRET_SCAN_TYPE and not should_use_cli_path: # if a secret scan goes to detector directly, we should not use CLI controller. # CLI controller belongs to the scan service only return self._SCAN_SERVICE_CONTROLLER_PATH @@ -56,10 +58,14 @@ def get_scan_flow_type(should_use_sync_flow: bool = False) -> str: return '' def get_scan_service_url_path( - self, scan_type: str, should_use_scan_service: bool = False, should_use_sync_flow: bool = False + self, + scan_type: str, + should_use_scan_service: bool = False, + should_use_sync_flow: bool = False, + should_use_cli_path: bool = False, ) -> str: service_path = self.scan_config.get_service_name(scan_type, should_use_scan_service) - controller_path = self.get_scan_controller_path(scan_type) + controller_path = self.get_scan_controller_path(scan_type, should_use_cli_path=should_use_cli_path) flow_type = self.get_scan_flow_type(should_use_sync_flow) return f'{service_path}/{controller_path}{flow_type}' @@ -92,6 +98,12 @@ def get_scan_report_url(self, scan_id: str, scan_type: str) -> models.ScanReport response = self.scan_cycode_client.get(url_path=self.get_scan_report_url_path(scan_id, scan_type)) return models.ScanReportUrlResponseSchema().build_dto(response.json()) + def get_scan_aggregation_report_url(self, aggregation_id: str, scan_type: str) -> models.ScanReportUrlResponse: + response = self.scan_cycode_client.get( + url_path=self.get_scan_aggregation_report_url_path(aggregation_id, scan_type) + ) + return models.ScanReportUrlResponseSchema().build_dto(response.json()) + def get_zipped_file_scan_async_url_path(self, scan_type: str, should_use_sync_flow: bool = False) -> str: async_scan_type = self.scan_config.get_async_scan_type(scan_type) async_entity_type = self.scan_config.get_async_entity_type(scan_type) @@ -155,6 +167,12 @@ def get_scan_details_path(self, scan_type: str, scan_id: str) -> str: def get_scan_report_url_path(self, scan_id: str, scan_type: str) -> str: return f'{self.get_scan_service_url_path(scan_type, should_use_scan_service=True)}/reportUrl/{scan_id}' + def get_scan_aggregation_report_url_path(self, aggregation_id: str, scan_type: str) -> str: + return ( + f'{self.get_scan_service_url_path(scan_type, should_use_scan_service=True, should_use_cli_path=True)}' + f'/reportUrlByAggregationId/{aggregation_id}' + ) + def get_scan_details(self, scan_type: str, scan_id: str) -> models.ScanDetailsResponse: path = self.get_scan_details_path(scan_type, scan_id) response = self.scan_cycode_client.get(url_path=path) From a2587d00853d26aa3aa3a523bf5dcd4b5820a31c Mon Sep 17 00:00:00 2001 From: Sara Montefiore Date: Wed, 22 May 2024 09:45:03 +0300 Subject: [PATCH 2/9] CM-34882-add tests --- .../cyclient/mocked_responses/scan_client.py | 14 ++++++ tests/test_code_scanner.py | 48 ++++++++++++++++++- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/tests/cyclient/mocked_responses/scan_client.py b/tests/cyclient/mocked_responses/scan_client.py index a3117f55..f6b20194 100644 --- a/tests/cyclient/mocked_responses/scan_client.py +++ b/tests/cyclient/mocked_responses/scan_client.py @@ -85,6 +85,12 @@ def get_scan_report_url(scan_id: Optional[UUID], scan_client: ScanClient, scan_t return f'{api_url}/{service_url}' +def get_scan_aggregation_report_url(aggregation_id: Optional[UUID], scan_client: ScanClient, scan_type: str) -> str: + api_url = scan_client.scan_cycode_client.api_url + service_url = scan_client.get_scan_aggregation_report_url_path(str(aggregation_id), scan_type) + return f'{api_url}/{service_url}' + + def get_scan_report_url_response(url: str, scan_id: Optional[UUID] = None) -> responses.Response: if not scan_id: scan_id = uuid4() @@ -93,6 +99,14 @@ def get_scan_report_url_response(url: str, scan_id: Optional[UUID] = None) -> re return responses.Response(method=responses.GET, url=url, json=json_response, status=200) +def get_scan_aggregation_report_url_response(url: str, aggregation_id: Optional[UUID] = None) -> responses.Response: + if not aggregation_id: + aggregation_id = uuid4() + json_response = {'report_url': f'https://app.domain/cli-logs-aggregation/{aggregation_id}'} + + return responses.Response(method=responses.GET, url=url, json=json_response, status=200) + + def get_scan_details_response(url: str, scan_id: Optional[UUID] = None) -> responses.Response: if not scan_id: scan_id = uuid4() diff --git a/tests/test_code_scanner.py b/tests/test_code_scanner.py index f18e5c02..7ae7a259 100644 --- a/tests/test_code_scanner.py +++ b/tests/test_code_scanner.py @@ -4,12 +4,20 @@ import pytest import responses -from cycode.cli.commands.scan.code_scanner import _try_get_report_url_if_needed +from cycode.cli.commands.scan.code_scanner import ( + _try_get_aggregation_report_url_if_needed, + _try_get_report_url_if_needed, +) from cycode.cli.config import config from cycode.cli.files_collector.excluder import _is_relevant_file_to_scan from cycode.cyclient.scan_client import ScanClient from tests.conftest import TEST_FILES_PATH -from tests.cyclient.mocked_responses.scan_client import get_scan_report_url, get_scan_report_url_response +from tests.cyclient.mocked_responses.scan_client import ( + get_scan_aggregation_report_url, + get_scan_aggregation_report_url_response, + get_scan_report_url, + get_scan_report_url_response, +) def test_is_relevant_file_to_scan_sca() -> None: @@ -37,3 +45,39 @@ def test_try_get_report_url_if_needed_return_result( scan_report_url_response = scan_client.get_scan_report_url(str(scan_id), scan_type) result = _try_get_report_url_if_needed(scan_client, True, str(scan_id), scan_type) assert result == scan_report_url_response.report_url + + +@pytest.mark.parametrize('scan_type', config['scans']['supported_scans']) +def test_try_get_aggregation_report_url_if_no_report_command_needed_return_none( + scan_type: str, scan_client: ScanClient +) -> None: + aggregation_id = uuid4().hex + scan_parameter = {} + result = _try_get_aggregation_report_url_if_needed(scan_parameter, scan_client, scan_type, aggregation_id) + assert result is None + + +@pytest.mark.parametrize('scan_type', config['scans']['supported_scans']) +def test_try_get_aggregation_report_url_if_no_aggregation_id_needed_return_none( + scan_type: str, scan_client: ScanClient +) -> None: + scan_parameter = {'report': True} + result = _try_get_aggregation_report_url_if_needed(scan_parameter, scan_client, scan_type, '') + assert result is None + + +@pytest.mark.parametrize('scan_type', config['scans']['supported_scans']) +@responses.activate +def test_try_get_aggregation_report_url_if_needed_return_result( + scan_type: str, scan_client: ScanClient, api_token_response: responses.Response +) -> None: + aggregation_id = uuid4() + scan_parameter = {'report': True} + url = get_scan_aggregation_report_url(aggregation_id, scan_client, scan_type) + responses.add(api_token_response) # mock token based client + responses.add(get_scan_aggregation_report_url_response(url, aggregation_id)) + + scan_aggregation_report_url_response = scan_client.get_scan_aggregation_report_url(str(aggregation_id), scan_type) + + result = _try_get_aggregation_report_url_if_needed(scan_parameter, scan_client, scan_type, str(aggregation_id)) + assert result == scan_aggregation_report_url_response.report_url From 8659e7a1c39838ac59bc2bca37dbe091eb78e0d9 Mon Sep 17 00:00:00 2001 From: Sara Montefiore Date: Wed, 22 May 2024 14:04:38 +0300 Subject: [PATCH 3/9] CM-34882-fix --- cycode/cli/commands/scan/code_scanner.py | 7 ++++--- cycode/cli/printers/tables/table_printer_base.py | 2 +- cycode/cyclient/scan_client.py | 11 +++++------ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cycode/cli/commands/scan/code_scanner.py b/cycode/cli/commands/scan/code_scanner.py index 1b0f51c1..67d0c65e 100644 --- a/cycode/cli/commands/scan/code_scanner.py +++ b/cycode/cli/commands/scan/code_scanner.py @@ -148,7 +148,7 @@ def _enrich_scan_result_with_data_from_detection_rules( def _get_scan_documents_thread_func( context: click.Context, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict -) -> tuple[Callable[[list[Document]], tuple[str, CliError, LocalScanResult]], str]: +) -> List[Tuple[Callable[[List[Document]], Tuple[str, CliError, LocalScanResult]], str]]: cycode_client = context.obj['client'] scan_type = context.obj['scan_type'] severity_threshold = context.obj['severity_threshold'] @@ -319,7 +319,7 @@ def scan_documents( scan_batch_thread_func, documents_to_scan, progress_bar=progress_bar ) aggregation_report_url = _try_get_aggregation_report_url_if_needed( - scan_parameters, context.obj['client'], context.obj['scan_type'], aggregation_id + scan_parameters, context.obj['client'], context.obj['scan_type'] ) progress_bar.set_section_length(ScanProgressBarSection.GENERATE_REPORT, 1) @@ -331,8 +331,9 @@ def scan_documents( def _try_get_aggregation_report_url_if_needed( - scan_parameters: dict, cycode_client: 'ScanClient', scan_type: str, aggregation_id: str = '' + scan_parameters: dict, cycode_client: 'ScanClient', scan_type: str ) -> Optional[str]: + aggregation_id = scan_parameters['aggregation_id'] if not scan_parameters.get('report'): return None if aggregation_id is None: diff --git a/cycode/cli/printers/tables/table_printer_base.py b/cycode/cli/printers/tables/table_printer_base.py index ba9bcfc9..af9bc82b 100644 --- a/cycode/cli/printers/tables/table_printer_base.py +++ b/cycode/cli/printers/tables/table_printer_base.py @@ -66,7 +66,7 @@ def _print_report_urls(local_scan_results: List['LocalScanResult'], aggregation_ if not report_urls and not aggregation_report_url: return if aggregation_report_url: - click.echo(f'Aggregation report URL: {aggregation_report_url}') + click.echo(f'Report URL: {aggregation_report_url}') return click.echo('Report URLs:') diff --git a/cycode/cyclient/scan_client.py b/cycode/cyclient/scan_client.py index 34910835..a6534447 100644 --- a/cycode/cyclient/scan_client.py +++ b/cycode/cyclient/scan_client.py @@ -31,12 +31,12 @@ def __init__( self._hide_response_log = hide_response_log def get_scan_controller_path( - self, scan_type: str, should_use_scan_service: bool = False, should_use_cli_path: bool = False + self, scan_type: str, should_use_scan_service: bool = False ) -> str: if scan_type == consts.INFRA_CONFIGURATION_SCAN_TYPE: # we don't use async flow for IaC scan yet return self._SCAN_SERVICE_CONTROLLER_PATH - if not should_use_scan_service and scan_type == consts.SECRET_SCAN_TYPE and not should_use_cli_path: + if not should_use_scan_service and scan_type == consts.SECRET_SCAN_TYPE: # if a secret scan goes to detector directly, we should not use CLI controller. # CLI controller belongs to the scan service only return self._SCAN_SERVICE_CONTROLLER_PATH @@ -61,11 +61,10 @@ def get_scan_service_url_path( self, scan_type: str, should_use_scan_service: bool = False, - should_use_sync_flow: bool = False, - should_use_cli_path: bool = False, + should_use_sync_flow: bool = False ) -> str: service_path = self.scan_config.get_service_name(scan_type, should_use_scan_service) - controller_path = self.get_scan_controller_path(scan_type, should_use_cli_path=should_use_cli_path) + controller_path = self.get_scan_controller_path(scan_type, should_use_scan_service) flow_type = self.get_scan_flow_type(should_use_sync_flow) return f'{service_path}/{controller_path}{flow_type}' @@ -169,7 +168,7 @@ def get_scan_report_url_path(self, scan_id: str, scan_type: str) -> str: def get_scan_aggregation_report_url_path(self, aggregation_id: str, scan_type: str) -> str: return ( - f'{self.get_scan_service_url_path(scan_type, should_use_scan_service=True, should_use_cli_path=True)}' + f'{self.get_scan_service_url_path(scan_type, should_use_scan_service=True)}' f'/reportUrlByAggregationId/{aggregation_id}' ) From e09b354828d43689955f602ad0adb3a9ea814c23 Mon Sep 17 00:00:00 2001 From: Sara Montefiore Date: Wed, 22 May 2024 15:04:47 +0300 Subject: [PATCH 4/9] CM-34882-fix --- cycode/cli/commands/scan/code_scanner.py | 2 +- cycode/cyclient/scan_client.py | 9 ++------- tests/test_code_scanner.py | 10 +++++----- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/cycode/cli/commands/scan/code_scanner.py b/cycode/cli/commands/scan/code_scanner.py index 67d0c65e..68029e03 100644 --- a/cycode/cli/commands/scan/code_scanner.py +++ b/cycode/cli/commands/scan/code_scanner.py @@ -333,7 +333,7 @@ def scan_documents( def _try_get_aggregation_report_url_if_needed( scan_parameters: dict, cycode_client: 'ScanClient', scan_type: str ) -> Optional[str]: - aggregation_id = scan_parameters['aggregation_id'] + aggregation_id = scan_parameters.get('aggregation_id') if not scan_parameters.get('report'): return None if aggregation_id is None: diff --git a/cycode/cyclient/scan_client.py b/cycode/cyclient/scan_client.py index a6534447..9431c9a3 100644 --- a/cycode/cyclient/scan_client.py +++ b/cycode/cyclient/scan_client.py @@ -30,9 +30,7 @@ def __init__( self._hide_response_log = hide_response_log - def get_scan_controller_path( - self, scan_type: str, should_use_scan_service: bool = False - ) -> str: + def get_scan_controller_path(self, scan_type: str, should_use_scan_service: bool = False) -> str: if scan_type == consts.INFRA_CONFIGURATION_SCAN_TYPE: # we don't use async flow for IaC scan yet return self._SCAN_SERVICE_CONTROLLER_PATH @@ -58,10 +56,7 @@ def get_scan_flow_type(should_use_sync_flow: bool = False) -> str: return '' def get_scan_service_url_path( - self, - scan_type: str, - should_use_scan_service: bool = False, - should_use_sync_flow: bool = False + self, scan_type: str, should_use_scan_service: bool = False, should_use_sync_flow: bool = False ) -> str: service_path = self.scan_config.get_service_name(scan_type, should_use_scan_service) controller_path = self.get_scan_controller_path(scan_type, should_use_scan_service) diff --git a/tests/test_code_scanner.py b/tests/test_code_scanner.py index 7ae7a259..d789312d 100644 --- a/tests/test_code_scanner.py +++ b/tests/test_code_scanner.py @@ -52,8 +52,8 @@ def test_try_get_aggregation_report_url_if_no_report_command_needed_return_none( scan_type: str, scan_client: ScanClient ) -> None: aggregation_id = uuid4().hex - scan_parameter = {} - result = _try_get_aggregation_report_url_if_needed(scan_parameter, scan_client, scan_type, aggregation_id) + scan_parameter = {'aggregation_id': aggregation_id} + result = _try_get_aggregation_report_url_if_needed(scan_parameter, scan_client, scan_type) assert result is None @@ -62,7 +62,7 @@ def test_try_get_aggregation_report_url_if_no_aggregation_id_needed_return_none( scan_type: str, scan_client: ScanClient ) -> None: scan_parameter = {'report': True} - result = _try_get_aggregation_report_url_if_needed(scan_parameter, scan_client, scan_type, '') + result = _try_get_aggregation_report_url_if_needed(scan_parameter, scan_client, scan_type) assert result is None @@ -72,12 +72,12 @@ def test_try_get_aggregation_report_url_if_needed_return_result( scan_type: str, scan_client: ScanClient, api_token_response: responses.Response ) -> None: aggregation_id = uuid4() - scan_parameter = {'report': True} + scan_parameter = {'report': True, 'aggregation_id': aggregation_id} url = get_scan_aggregation_report_url(aggregation_id, scan_client, scan_type) responses.add(api_token_response) # mock token based client responses.add(get_scan_aggregation_report_url_response(url, aggregation_id)) scan_aggregation_report_url_response = scan_client.get_scan_aggregation_report_url(str(aggregation_id), scan_type) - result = _try_get_aggregation_report_url_if_needed(scan_parameter, scan_client, scan_type, str(aggregation_id)) + result = _try_get_aggregation_report_url_if_needed(scan_parameter, scan_client, scan_type) assert result == scan_aggregation_report_url_response.report_url From 1b92959b156a8b24264887b59e5860fcf94526ef Mon Sep 17 00:00:00 2001 From: Sara Montefiore Date: Wed, 22 May 2024 18:59:08 +0300 Subject: [PATCH 5/9] CM-34882-fix --- cycode/cli/commands/scan/code_scanner.py | 170 +++++++++--------- cycode/cli/printers/console_printer.py | 2 +- cycode/cli/printers/json_printer.py | 15 +- cycode/cli/printers/printer_base.py | 2 +- .../cli/printers/tables/table_printer_base.py | 2 +- cycode/cli/printers/text_printer.py | 59 +++--- 6 files changed, 133 insertions(+), 117 deletions(-) diff --git a/cycode/cli/commands/scan/code_scanner.py b/cycode/cli/commands/scan/code_scanner.py index 68029e03..2e626118 100644 --- a/cycode/cli/commands/scan/code_scanner.py +++ b/cycode/cli/commands/scan/code_scanner.py @@ -116,7 +116,7 @@ def _should_use_sync_flow(scan_type: str, sync_option: bool, scan_parameters: Op def _enrich_scan_result_with_data_from_detection_rules( - cycode_client: 'ScanClient', scan_result: ZippedFileScanResult + cycode_client: 'ScanClient', scan_result: ZippedFileScanResult ) -> None: detection_rule_ids = set() for detections_per_file in scan_result.detections_per_file: @@ -147,8 +147,8 @@ def _enrich_scan_result_with_data_from_detection_rules( def _get_scan_documents_thread_func( - context: click.Context, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict -) -> List[Tuple[Callable[[List[Document]], Tuple[str, CliError, LocalScanResult]], str]]: + context: click.Context, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict +) -> Tuple[Callable[[List[Document]], Tuple[str, CliError, LocalScanResult]], str]: cycode_client = context.obj['client'] scan_type = context.obj['scan_type'] severity_threshold = context.obj['severity_threshold'] @@ -228,7 +228,7 @@ def _scan_batch_thread_func(batch: List[Document]) -> Tuple[str, CliError, Local def scan_commit_range( - context: click.Context, path: str, commit_range: str, max_commits_count: Optional[int] = None + context: click.Context, path: str, commit_range: str, max_commits_count: Optional[int] = None ) -> None: scan_type = context.obj['scan_type'] @@ -291,11 +291,11 @@ def scan_commit_range( def scan_documents( - context: click.Context, - documents_to_scan: List[Document], - is_git_diff: bool = False, - is_commit_range: bool = False, - scan_parameters: Optional[dict] = None, + context: click.Context, + documents_to_scan: List[Document], + is_git_diff: bool = False, + is_commit_range: bool = False, + scan_parameters: Optional[dict] = None, ) -> None: if not scan_parameters: scan_parameters = get_default_scan_parameters(context) @@ -321,17 +321,21 @@ def scan_documents( aggregation_report_url = _try_get_aggregation_report_url_if_needed( scan_parameters, context.obj['client'], context.obj['scan_type'] ) - + set_aggregation_report_url(context, aggregation_report_url) progress_bar.set_section_length(ScanProgressBarSection.GENERATE_REPORT, 1) progress_bar.update(ScanProgressBarSection.GENERATE_REPORT) progress_bar.stop() set_issue_detected_by_scan_results(context, local_scan_results) - print_results(context, local_scan_results, errors, aggregation_report_url) + print_results(context, local_scan_results, errors) + + +def set_aggregation_report_url(context: click.Context, aggregation_report_url: str = '') -> None: + context.obj['aggregation_report_url'] = aggregation_report_url def _try_get_aggregation_report_url_if_needed( - scan_parameters: dict, cycode_client: 'ScanClient', scan_type: str + scan_parameters: dict, cycode_client: 'ScanClient', scan_type: str ) -> Optional[str]: aggregation_id = scan_parameters.get('aggregation_id') if not scan_parameters.get('report'): @@ -346,11 +350,11 @@ def _try_get_aggregation_report_url_if_needed( def scan_commit_range_documents( - context: click.Context, - from_documents_to_scan: List[Document], - to_documents_to_scan: List[Document], - scan_parameters: Optional[dict] = None, - timeout: Optional[int] = None, + context: click.Context, + from_documents_to_scan: List[Document], + to_documents_to_scan: List[Document], + scan_parameters: Optional[dict] = None, + timeout: Optional[int] = None, ) -> None: cycode_client = context.obj['client'] scan_type = context.obj['scan_type'] @@ -439,11 +443,11 @@ def should_scan_documents(from_documents_to_scan: List[Document], to_documents_t def create_local_scan_result( - scan_result: ZippedFileScanResult, - documents_to_scan: List[Document], - command_scan_type: str, - scan_type: str, - severity_threshold: str, + scan_result: ZippedFileScanResult, + documents_to_scan: List[Document], + command_scan_type: str, + scan_type: str, + severity_threshold: str, ) -> LocalScanResult: document_detections = get_document_detections(scan_result, documents_to_scan) relevant_document_detections_list = exclude_irrelevant_document_detections( @@ -466,15 +470,15 @@ def create_local_scan_result( def perform_scan( - cycode_client: 'ScanClient', - zipped_documents: 'InMemoryZip', - scan_type: str, - scan_id: str, - is_git_diff: bool, - is_commit_range: bool, - scan_parameters: dict, - should_use_scan_service: bool = False, - should_use_sync_flow: bool = False, + cycode_client: 'ScanClient', + zipped_documents: 'InMemoryZip', + scan_type: str, + scan_id: str, + is_git_diff: bool, + is_commit_range: bool, + scan_parameters: dict, + should_use_scan_service: bool = False, + should_use_sync_flow: bool = False, ) -> ZippedFileScanResult: if should_use_sync_flow: return perform_scan_sync(cycode_client, zipped_documents, scan_type, scan_parameters) @@ -489,10 +493,10 @@ def perform_scan( def perform_scan_async( - cycode_client: 'ScanClient', - zipped_documents: 'InMemoryZip', - scan_type: str, - scan_parameters: dict, + cycode_client: 'ScanClient', + zipped_documents: 'InMemoryZip', + scan_type: str, + scan_parameters: dict, ) -> ZippedFileScanResult: scan_async_result = cycode_client.zipped_file_scan_async(zipped_documents, scan_type, scan_parameters) logger.debug('scan request has been triggered successfully, scan id: %s', scan_async_result.scan_id) @@ -506,10 +510,10 @@ def perform_scan_async( def perform_scan_sync( - cycode_client: 'ScanClient', - zipped_documents: 'InMemoryZip', - scan_type: str, - scan_parameters: dict, + cycode_client: 'ScanClient', + zipped_documents: 'InMemoryZip', + scan_type: str, + scan_parameters: dict, ) -> ZippedFileScanResult: scan_results = cycode_client.zipped_file_scan_sync(zipped_documents, scan_type, scan_parameters) logger.debug('scan request has been triggered successfully, scan id: %s', scan_results.id) @@ -521,12 +525,12 @@ def perform_scan_sync( def perform_commit_range_scan_async( - cycode_client: 'ScanClient', - from_commit_zipped_documents: 'InMemoryZip', - to_commit_zipped_documents: 'InMemoryZip', - scan_type: str, - scan_parameters: dict, - timeout: Optional[int] = None, + cycode_client: 'ScanClient', + from_commit_zipped_documents: 'InMemoryZip', + to_commit_zipped_documents: 'InMemoryZip', + scan_type: str, + scan_parameters: dict, + timeout: Optional[int] = None, ) -> ZippedFileScanResult: scan_async_result = cycode_client.multiple_zipped_file_scan_async( from_commit_zipped_documents, to_commit_zipped_documents, scan_type, scan_parameters @@ -539,11 +543,11 @@ def perform_commit_range_scan_async( def poll_scan_results( - cycode_client: 'ScanClient', - scan_id: str, - scan_type: str, - should_get_report: bool = False, - polling_timeout: Optional[int] = None, + cycode_client: 'ScanClient', + scan_id: str, + scan_type: str, + should_get_report: bool = False, + polling_timeout: Optional[int] = None, ) -> ZippedFileScanResult: if polling_timeout is None: polling_timeout = configuration_manager.get_scan_polling_timeout_in_seconds() @@ -580,17 +584,16 @@ def print_debug_scan_details(scan_details_response: 'ScanDetailsResponse') -> No def print_results( - context: click.Context, - local_scan_results: List[LocalScanResult], - errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: str = '', -) -> None: + context: click.Context, + local_scan_results: List[LocalScanResult], + errors: Optional[Dict[str, 'CliError']] = None) -> None: printer = ConsolePrinter(context) + aggregation_report_url = context.obj.get('aggregation_report_url') printer.print_scan_results(local_scan_results, errors, aggregation_report_url=aggregation_report_url) def get_document_detections( - scan_result: ZippedFileScanResult, documents_to_scan: List[Document] + scan_result: ZippedFileScanResult, documents_to_scan: List[Document] ) -> List[DocumentDetections]: logger.debug('Get document detections') @@ -608,7 +611,8 @@ def get_document_detections( def exclude_irrelevant_document_detections( - document_detections_list: List[DocumentDetections], scan_type: str, command_scan_type: str, severity_threshold: str + document_detections_list: List[DocumentDetections], scan_type: str, command_scan_type: str, + severity_threshold: str ) -> List[DocumentDetections]: relevant_document_detections_list = [] for document_detections in document_detections_list: @@ -690,7 +694,7 @@ def try_get_git_remote_url(path: str) -> Optional[str]: def exclude_irrelevant_detections( - detections: List[Detection], scan_type: str, command_scan_type: str, severity_threshold: str + detections: List[Detection], scan_type: str, command_scan_type: str, severity_threshold: str ) -> List[Detection]: relevant_detections = _exclude_detections_by_exclusions_configuration(detections, scan_type) relevant_detections = _exclude_detections_by_scan_type(relevant_detections, scan_type, command_scan_type) @@ -698,7 +702,7 @@ def exclude_irrelevant_detections( def _exclude_detections_by_severity( - detections: List[Detection], scan_type: str, severity_threshold: str + detections: List[Detection], scan_type: str, severity_threshold: str ) -> List[Detection]: if scan_type != consts.SCA_SCAN_TYPE or severity_threshold is None: return detections @@ -713,16 +717,16 @@ def _exclude_detections_by_severity( def _exclude_detections_by_scan_type( - detections: List[Detection], scan_type: str, command_scan_type: str + detections: List[Detection], scan_type: str, command_scan_type: str ) -> List[Detection]: if command_scan_type == consts.PRE_COMMIT_COMMAND_SCAN_TYPE: return exclude_detections_in_deleted_lines(detections) exclude_in_deleted_lines = configuration_manager.get_should_exclude_detections_in_deleted_lines(command_scan_type) if ( - command_scan_type in consts.COMMIT_RANGE_BASED_COMMAND_SCAN_TYPES - and scan_type == consts.SECRET_SCAN_TYPE - and exclude_in_deleted_lines + command_scan_type in consts.COMMIT_RANGE_BASED_COMMAND_SCAN_TYPES + and scan_type == consts.SECRET_SCAN_TYPE + and exclude_in_deleted_lines ): return exclude_detections_in_deleted_lines(detections) @@ -794,7 +798,7 @@ def _get_package_name(detection: Detection) -> str: def _get_document_by_file_name( - documents: List[Document], file_name: str, unique_id: Optional[str] = None + documents: List[Document], file_name: str, unique_id: Optional[str] = None ) -> Optional[Document]: for document in documents: if _normalize_file_path(document.path) == _normalize_file_path(file_name) and document.unique_id == unique_id: @@ -804,17 +808,17 @@ def _get_document_by_file_name( def _report_scan_status( - cycode_client: 'ScanClient', - scan_type: str, - scan_id: str, - scan_completed: bool, - output_detections_count: int, - all_detections_count: int, - files_to_scan_count: int, - zip_size: int, - command_scan_type: str, - error_message: Optional[str], - should_use_scan_service: bool = False, + cycode_client: 'ScanClient', + scan_type: str, + scan_id: str, + scan_completed: bool, + output_detections_count: int, + all_detections_count: int, + files_to_scan_count: int, + zip_size: int, + command_scan_type: str, + error_message: Optional[str], + should_use_scan_service: bool = False, ) -> None: try: end_scan_time = time.time() @@ -849,11 +853,11 @@ def _does_severity_match_severity_threshold(severity: str, severity_threshold: s def _get_scan_result( - cycode_client: 'ScanClient', - scan_type: str, - scan_id: str, - scan_details: 'ScanDetailsResponse', - should_get_report: bool = False, + cycode_client: 'ScanClient', + scan_type: str, + scan_id: str, + scan_details: 'ScanDetailsResponse', + should_get_report: bool = False, ) -> ZippedFileScanResult: if not scan_details.detections_count: return init_default_scan_result(cycode_client, scan_id, scan_type, should_get_report) @@ -871,7 +875,7 @@ def _get_scan_result( def init_default_scan_result( - cycode_client: 'ScanClient', scan_id: str, scan_type: str, should_get_report: bool = False + cycode_client: 'ScanClient', scan_id: str, scan_type: str, should_get_report: bool = False ) -> ZippedFileScanResult: return ZippedFileScanResult( did_detect=False, @@ -882,7 +886,7 @@ def init_default_scan_result( def _try_get_report_url_if_needed( - cycode_client: 'ScanClient', should_get_report: bool, scan_id: str, scan_type: str + cycode_client: 'ScanClient', should_get_report: bool, scan_id: str, scan_type: str ) -> Optional[str]: if not should_get_report: return None @@ -895,7 +899,7 @@ def _try_get_report_url_if_needed( def wait_for_detections_creation( - cycode_client: 'ScanClient', scan_type: str, scan_id: str, expected_detections_count: int + cycode_client: 'ScanClient', scan_type: str, scan_id: str, expected_detections_count: int ) -> None: logger.debug('Waiting for detections to be created') diff --git a/cycode/cli/printers/console_printer.py b/cycode/cli/printers/console_printer.py index 9075b21a..b1c9e371 100644 --- a/cycode/cli/printers/console_printer.py +++ b/cycode/cli/printers/console_printer.py @@ -37,7 +37,7 @@ def print_scan_results( self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: str = '', + aggregation_report_url: Optional[str] = None ) -> None: printer = self._get_scan_printer() printer.print_scan_results(local_scan_results, errors, aggregation_report_url=aggregation_report_url) diff --git a/cycode/cli/printers/json_printer.py b/cycode/cli/printers/json_printer.py index 330e6f2f..0ea6026d 100644 --- a/cycode/cli/printers/json_printer.py +++ b/cycode/cli/printers/json_printer.py @@ -23,10 +23,10 @@ def print_error(self, error: CliError) -> None: click.echo(self.get_data_json(result)) def print_scan_results( - self, - local_scan_results: List['LocalScanResult'], - errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: str = '', + self, + local_scan_results: List['LocalScanResult'], + errors: Optional[Dict[str, 'CliError']] = None, + aggregation_report_url: Optional[str] = None ) -> None: scan_ids = [] report_urls = [] @@ -35,9 +35,10 @@ def print_scan_results( for local_scan_result in local_scan_results: scan_ids.append(local_scan_result.scan_id) - if local_scan_result.report_url: + if aggregation_report_url: + report_urls.append(aggregation_report_url) + elif local_scan_result.report_url: report_urls.append(local_scan_result.report_url) - for document_detections in local_scan_result.document_detections: detections.extend(document_detections.detections) @@ -51,7 +52,7 @@ def print_scan_results( click.echo(self._get_json_scan_result(scan_ids, detections_dict, report_urls, inlined_errors)) def _get_json_scan_result( - self, scan_ids: List[str], detections: dict, report_urls: List[str], errors: List[dict] + self, scan_ids: List[str], detections: dict, report_urls: List[str], errors: List[dict] ) -> str: result = { 'scan_id': 'DEPRECATED', # backward compatibility diff --git a/cycode/cli/printers/printer_base.py b/cycode/cli/printers/printer_base.py index f23c1bc3..f893c25d 100644 --- a/cycode/cli/printers/printer_base.py +++ b/cycode/cli/printers/printer_base.py @@ -24,7 +24,7 @@ def print_scan_results( self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: str = '', + aggregation_report_url: Optional[str] = None ) -> None: pass diff --git a/cycode/cli/printers/tables/table_printer_base.py b/cycode/cli/printers/tables/table_printer_base.py index af9bc82b..2eb051fc 100644 --- a/cycode/cli/printers/tables/table_printer_base.py +++ b/cycode/cli/printers/tables/table_printer_base.py @@ -28,7 +28,7 @@ def print_scan_results( self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: str = '', + aggregation_report_url: Optional[str] = None ) -> None: if not errors and all(result.issue_detected == 0 for result in local_scan_results): click.secho('Good job! No issues were found!!! 👏👏👏', fg=self.GREEN_COLOR_NAME) diff --git a/cycode/cli/printers/text_printer.py b/cycode/cli/printers/text_printer.py index 8633237a..29cb3c3a 100644 --- a/cycode/cli/printers/text_printer.py +++ b/cycode/cli/printers/text_printer.py @@ -31,10 +31,10 @@ def print_error(self, error: CliError) -> None: click.secho(error.message, fg=self.RED_COLOR_NAME) def print_scan_results( - self, - local_scan_results: List['LocalScanResult'], - errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: str = '', + self, + local_scan_results: List['LocalScanResult'], + errors: Optional[Dict[str, 'CliError']] = None, + aggregation_report_url: Optional[str] = None ) -> None: if not errors and all(result.issue_detected == 0 for result in local_scan_results): click.secho('Good job! No issues were found!!! 👏👏👏', fg=self.GREEN_COLOR_NAME) @@ -42,10 +42,10 @@ def print_scan_results( for local_scan_result in local_scan_results: for document_detections in local_scan_result.document_detections: - self._print_document_detections( - document_detections, local_scan_result.scan_id, local_scan_result.report_url - ) + self._print_document_detections(document_detections, local_scan_result.scan_id) + report_urls = [scan_result.report_url for scan_result in local_scan_results if scan_result.report_url] + self._print_report_urls(report_urls, aggregation_report_url) if not errors: return @@ -58,17 +58,18 @@ def print_scan_results( click.echo(f'- {scan_id}: ', nl=False) self.print_error(error) + def _print_document_detections( - self, document_detections: DocumentDetections, scan_id: str, report_url: Optional[str] + self, document_detections: DocumentDetections, scan_id: str ) -> None: document = document_detections.document lines_to_display = self._get_lines_to_display_count() for detection in document_detections.detections: - self._print_detection_summary(detection, document.path, scan_id, report_url) + self._print_detection_summary(detection, document.path, scan_id) self._print_detection_code_segment(detection, document, lines_to_display) def _print_detection_summary( - self, detection: Detection, document_path: str, scan_id: str, report_url: Optional[str] + self, detection: Detection, document_path: str, scan_id: str ) -> None: detection_name = detection.type if self.scan_type == SECRET_SCAN_TYPE else detection.message detection_name_styled = click.style(detection_name, fg='bright_red', bold=True) @@ -77,8 +78,6 @@ def _print_detection_summary( detection_sha_message = f'\nSecret SHA: {detection_sha}' if detection_sha else '' scan_id_message = f'\nScan ID: {scan_id}' - report_url_message = f'\nReport URL: {report_url}' if report_url else '' - detection_commit_id = detection.detection_details.get('commit_id') detection_commit_id_message = f'\nCommit SHA: {detection_commit_id}' if detection_commit_id else '' @@ -91,7 +90,6 @@ def _print_detection_summary( f'(rule ID: {detection.detection_rule_id}) in file: {click.format_filename(document_path)} ' f'{detection_sha_message}' f'{scan_id_message}' - f'{report_url_message}' f'{detection_commit_id_message}' f'{company_guidelines_message}' f' ⛔' @@ -104,19 +102,31 @@ def _print_detection_code_segment(self, detection: Detection, document: Document self._print_detection_from_file(detection, document, code_segment_size) + @staticmethod + def _print_report_urls(report_urls: List[str], aggregation_report_url: str = '') -> None: + if not report_urls and not aggregation_report_url: + return + if aggregation_report_url: + click.echo(f'Report URL: {aggregation_report_url}') + return + + click.echo('Report URLs:') + for report_url in report_urls: + click.echo(f'- {report_url}') + @staticmethod def _get_code_segment_start_line(detection_line: int, code_segment_size: int) -> int: start_line = detection_line - math.ceil(code_segment_size / 2) return 0 if start_line < 0 else start_line def _print_line_of_code_segment( - self, - document: Document, - line: str, - line_number: int, - detection_position_in_line: int, - violation_length: int, - is_detection_line: bool, + self, + document: Document, + line: str, + line_number: int, + detection_position_in_line: int, + violation_length: int, + is_detection_line: bool, ) -> None: if is_detection_line: self._print_detection_line(document, line, line_number, detection_position_in_line, violation_length) @@ -124,7 +134,8 @@ def _print_line_of_code_segment( self._print_line(document, line, line_number) def _print_detection_line( - self, document: Document, line: str, line_number: int, detection_position_in_line: int, violation_length: int + self, document: Document, line: str, line_number: int, detection_position_in_line: int, + violation_length: int ) -> None: detection_line = self._get_detection_line_style( line, document.is_git_diff_format, detection_position_in_line, violation_length @@ -143,12 +154,12 @@ def _get_detection_line_style(self, line: str, is_git_diff: bool, start_position if self.scan_type != SECRET_SCAN_TYPE or start_position < 0 or length < 0: return self._get_line_style(line, is_git_diff, line_color) - violation = line[start_position : start_position + length] + violation = line[start_position: start_position + length] if not self.show_secret: violation = obfuscate_text(violation) line_to_violation = line[0:start_position] - line_from_violation = line[start_position + length :] + line_from_violation = line[start_position + length:] return ( f'{self._get_line_style(line_to_violation, is_git_diff, line_color)}' @@ -157,7 +168,7 @@ def _get_detection_line_style(self, line: str, is_git_diff: bool, start_position ) def _get_line_style( - self, line: str, is_git_diff: bool, color: Optional[str] = None, underline: bool = False + self, line: str, is_git_diff: bool, color: Optional[str] = None, underline: bool = False ) -> str: if color is None: color = self._get_line_color(line, is_git_diff) From 91fb3345563016e92dce561a47d29215006fc203 Mon Sep 17 00:00:00 2001 From: Sara Montefiore Date: Wed, 22 May 2024 19:00:28 +0300 Subject: [PATCH 6/9] CM-34882-fix --- cycode/cli/commands/scan/code_scanner.py | 156 +++++++++--------- cycode/cli/printers/console_printer.py | 2 +- cycode/cli/printers/json_printer.py | 10 +- cycode/cli/printers/printer_base.py | 2 +- .../cli/printers/tables/table_printer_base.py | 2 +- cycode/cli/printers/text_printer.py | 40 ++--- 6 files changed, 102 insertions(+), 110 deletions(-) diff --git a/cycode/cli/commands/scan/code_scanner.py b/cycode/cli/commands/scan/code_scanner.py index 2e626118..17a67160 100644 --- a/cycode/cli/commands/scan/code_scanner.py +++ b/cycode/cli/commands/scan/code_scanner.py @@ -116,7 +116,7 @@ def _should_use_sync_flow(scan_type: str, sync_option: bool, scan_parameters: Op def _enrich_scan_result_with_data_from_detection_rules( - cycode_client: 'ScanClient', scan_result: ZippedFileScanResult + cycode_client: 'ScanClient', scan_result: ZippedFileScanResult ) -> None: detection_rule_ids = set() for detections_per_file in scan_result.detections_per_file: @@ -147,7 +147,7 @@ def _enrich_scan_result_with_data_from_detection_rules( def _get_scan_documents_thread_func( - context: click.Context, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict + context: click.Context, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict ) -> Tuple[Callable[[List[Document]], Tuple[str, CliError, LocalScanResult]], str]: cycode_client = context.obj['client'] scan_type = context.obj['scan_type'] @@ -228,7 +228,7 @@ def _scan_batch_thread_func(batch: List[Document]) -> Tuple[str, CliError, Local def scan_commit_range( - context: click.Context, path: str, commit_range: str, max_commits_count: Optional[int] = None + context: click.Context, path: str, commit_range: str, max_commits_count: Optional[int] = None ) -> None: scan_type = context.obj['scan_type'] @@ -291,11 +291,11 @@ def scan_commit_range( def scan_documents( - context: click.Context, - documents_to_scan: List[Document], - is_git_diff: bool = False, - is_commit_range: bool = False, - scan_parameters: Optional[dict] = None, + context: click.Context, + documents_to_scan: List[Document], + is_git_diff: bool = False, + is_commit_range: bool = False, + scan_parameters: Optional[dict] = None, ) -> None: if not scan_parameters: scan_parameters = get_default_scan_parameters(context) @@ -335,7 +335,7 @@ def set_aggregation_report_url(context: click.Context, aggregation_report_url: s def _try_get_aggregation_report_url_if_needed( - scan_parameters: dict, cycode_client: 'ScanClient', scan_type: str + scan_parameters: dict, cycode_client: 'ScanClient', scan_type: str ) -> Optional[str]: aggregation_id = scan_parameters.get('aggregation_id') if not scan_parameters.get('report'): @@ -350,11 +350,11 @@ def _try_get_aggregation_report_url_if_needed( def scan_commit_range_documents( - context: click.Context, - from_documents_to_scan: List[Document], - to_documents_to_scan: List[Document], - scan_parameters: Optional[dict] = None, - timeout: Optional[int] = None, + context: click.Context, + from_documents_to_scan: List[Document], + to_documents_to_scan: List[Document], + scan_parameters: Optional[dict] = None, + timeout: Optional[int] = None, ) -> None: cycode_client = context.obj['client'] scan_type = context.obj['scan_type'] @@ -443,11 +443,11 @@ def should_scan_documents(from_documents_to_scan: List[Document], to_documents_t def create_local_scan_result( - scan_result: ZippedFileScanResult, - documents_to_scan: List[Document], - command_scan_type: str, - scan_type: str, - severity_threshold: str, + scan_result: ZippedFileScanResult, + documents_to_scan: List[Document], + command_scan_type: str, + scan_type: str, + severity_threshold: str, ) -> LocalScanResult: document_detections = get_document_detections(scan_result, documents_to_scan) relevant_document_detections_list = exclude_irrelevant_document_detections( @@ -470,15 +470,15 @@ def create_local_scan_result( def perform_scan( - cycode_client: 'ScanClient', - zipped_documents: 'InMemoryZip', - scan_type: str, - scan_id: str, - is_git_diff: bool, - is_commit_range: bool, - scan_parameters: dict, - should_use_scan_service: bool = False, - should_use_sync_flow: bool = False, + cycode_client: 'ScanClient', + zipped_documents: 'InMemoryZip', + scan_type: str, + scan_id: str, + is_git_diff: bool, + is_commit_range: bool, + scan_parameters: dict, + should_use_scan_service: bool = False, + should_use_sync_flow: bool = False, ) -> ZippedFileScanResult: if should_use_sync_flow: return perform_scan_sync(cycode_client, zipped_documents, scan_type, scan_parameters) @@ -493,10 +493,10 @@ def perform_scan( def perform_scan_async( - cycode_client: 'ScanClient', - zipped_documents: 'InMemoryZip', - scan_type: str, - scan_parameters: dict, + cycode_client: 'ScanClient', + zipped_documents: 'InMemoryZip', + scan_type: str, + scan_parameters: dict, ) -> ZippedFileScanResult: scan_async_result = cycode_client.zipped_file_scan_async(zipped_documents, scan_type, scan_parameters) logger.debug('scan request has been triggered successfully, scan id: %s', scan_async_result.scan_id) @@ -510,10 +510,10 @@ def perform_scan_async( def perform_scan_sync( - cycode_client: 'ScanClient', - zipped_documents: 'InMemoryZip', - scan_type: str, - scan_parameters: dict, + cycode_client: 'ScanClient', + zipped_documents: 'InMemoryZip', + scan_type: str, + scan_parameters: dict, ) -> ZippedFileScanResult: scan_results = cycode_client.zipped_file_scan_sync(zipped_documents, scan_type, scan_parameters) logger.debug('scan request has been triggered successfully, scan id: %s', scan_results.id) @@ -525,12 +525,12 @@ def perform_scan_sync( def perform_commit_range_scan_async( - cycode_client: 'ScanClient', - from_commit_zipped_documents: 'InMemoryZip', - to_commit_zipped_documents: 'InMemoryZip', - scan_type: str, - scan_parameters: dict, - timeout: Optional[int] = None, + cycode_client: 'ScanClient', + from_commit_zipped_documents: 'InMemoryZip', + to_commit_zipped_documents: 'InMemoryZip', + scan_type: str, + scan_parameters: dict, + timeout: Optional[int] = None, ) -> ZippedFileScanResult: scan_async_result = cycode_client.multiple_zipped_file_scan_async( from_commit_zipped_documents, to_commit_zipped_documents, scan_type, scan_parameters @@ -543,11 +543,11 @@ def perform_commit_range_scan_async( def poll_scan_results( - cycode_client: 'ScanClient', - scan_id: str, - scan_type: str, - should_get_report: bool = False, - polling_timeout: Optional[int] = None, + cycode_client: 'ScanClient', + scan_id: str, + scan_type: str, + should_get_report: bool = False, + polling_timeout: Optional[int] = None, ) -> ZippedFileScanResult: if polling_timeout is None: polling_timeout = configuration_manager.get_scan_polling_timeout_in_seconds() @@ -584,16 +584,15 @@ def print_debug_scan_details(scan_details_response: 'ScanDetailsResponse') -> No def print_results( - context: click.Context, - local_scan_results: List[LocalScanResult], - errors: Optional[Dict[str, 'CliError']] = None) -> None: + context: click.Context, local_scan_results: List[LocalScanResult], errors: Optional[Dict[str, 'CliError']] = None +) -> None: printer = ConsolePrinter(context) aggregation_report_url = context.obj.get('aggregation_report_url') printer.print_scan_results(local_scan_results, errors, aggregation_report_url=aggregation_report_url) def get_document_detections( - scan_result: ZippedFileScanResult, documents_to_scan: List[Document] + scan_result: ZippedFileScanResult, documents_to_scan: List[Document] ) -> List[DocumentDetections]: logger.debug('Get document detections') @@ -611,8 +610,7 @@ def get_document_detections( def exclude_irrelevant_document_detections( - document_detections_list: List[DocumentDetections], scan_type: str, command_scan_type: str, - severity_threshold: str + document_detections_list: List[DocumentDetections], scan_type: str, command_scan_type: str, severity_threshold: str ) -> List[DocumentDetections]: relevant_document_detections_list = [] for document_detections in document_detections_list: @@ -694,7 +692,7 @@ def try_get_git_remote_url(path: str) -> Optional[str]: def exclude_irrelevant_detections( - detections: List[Detection], scan_type: str, command_scan_type: str, severity_threshold: str + detections: List[Detection], scan_type: str, command_scan_type: str, severity_threshold: str ) -> List[Detection]: relevant_detections = _exclude_detections_by_exclusions_configuration(detections, scan_type) relevant_detections = _exclude_detections_by_scan_type(relevant_detections, scan_type, command_scan_type) @@ -702,7 +700,7 @@ def exclude_irrelevant_detections( def _exclude_detections_by_severity( - detections: List[Detection], scan_type: str, severity_threshold: str + detections: List[Detection], scan_type: str, severity_threshold: str ) -> List[Detection]: if scan_type != consts.SCA_SCAN_TYPE or severity_threshold is None: return detections @@ -717,16 +715,16 @@ def _exclude_detections_by_severity( def _exclude_detections_by_scan_type( - detections: List[Detection], scan_type: str, command_scan_type: str + detections: List[Detection], scan_type: str, command_scan_type: str ) -> List[Detection]: if command_scan_type == consts.PRE_COMMIT_COMMAND_SCAN_TYPE: return exclude_detections_in_deleted_lines(detections) exclude_in_deleted_lines = configuration_manager.get_should_exclude_detections_in_deleted_lines(command_scan_type) if ( - command_scan_type in consts.COMMIT_RANGE_BASED_COMMAND_SCAN_TYPES - and scan_type == consts.SECRET_SCAN_TYPE - and exclude_in_deleted_lines + command_scan_type in consts.COMMIT_RANGE_BASED_COMMAND_SCAN_TYPES + and scan_type == consts.SECRET_SCAN_TYPE + and exclude_in_deleted_lines ): return exclude_detections_in_deleted_lines(detections) @@ -798,7 +796,7 @@ def _get_package_name(detection: Detection) -> str: def _get_document_by_file_name( - documents: List[Document], file_name: str, unique_id: Optional[str] = None + documents: List[Document], file_name: str, unique_id: Optional[str] = None ) -> Optional[Document]: for document in documents: if _normalize_file_path(document.path) == _normalize_file_path(file_name) and document.unique_id == unique_id: @@ -808,17 +806,17 @@ def _get_document_by_file_name( def _report_scan_status( - cycode_client: 'ScanClient', - scan_type: str, - scan_id: str, - scan_completed: bool, - output_detections_count: int, - all_detections_count: int, - files_to_scan_count: int, - zip_size: int, - command_scan_type: str, - error_message: Optional[str], - should_use_scan_service: bool = False, + cycode_client: 'ScanClient', + scan_type: str, + scan_id: str, + scan_completed: bool, + output_detections_count: int, + all_detections_count: int, + files_to_scan_count: int, + zip_size: int, + command_scan_type: str, + error_message: Optional[str], + should_use_scan_service: bool = False, ) -> None: try: end_scan_time = time.time() @@ -853,11 +851,11 @@ def _does_severity_match_severity_threshold(severity: str, severity_threshold: s def _get_scan_result( - cycode_client: 'ScanClient', - scan_type: str, - scan_id: str, - scan_details: 'ScanDetailsResponse', - should_get_report: bool = False, + cycode_client: 'ScanClient', + scan_type: str, + scan_id: str, + scan_details: 'ScanDetailsResponse', + should_get_report: bool = False, ) -> ZippedFileScanResult: if not scan_details.detections_count: return init_default_scan_result(cycode_client, scan_id, scan_type, should_get_report) @@ -875,7 +873,7 @@ def _get_scan_result( def init_default_scan_result( - cycode_client: 'ScanClient', scan_id: str, scan_type: str, should_get_report: bool = False + cycode_client: 'ScanClient', scan_id: str, scan_type: str, should_get_report: bool = False ) -> ZippedFileScanResult: return ZippedFileScanResult( did_detect=False, @@ -886,7 +884,7 @@ def init_default_scan_result( def _try_get_report_url_if_needed( - cycode_client: 'ScanClient', should_get_report: bool, scan_id: str, scan_type: str + cycode_client: 'ScanClient', should_get_report: bool, scan_id: str, scan_type: str ) -> Optional[str]: if not should_get_report: return None @@ -899,7 +897,7 @@ def _try_get_report_url_if_needed( def wait_for_detections_creation( - cycode_client: 'ScanClient', scan_type: str, scan_id: str, expected_detections_count: int + cycode_client: 'ScanClient', scan_type: str, scan_id: str, expected_detections_count: int ) -> None: logger.debug('Waiting for detections to be created') diff --git a/cycode/cli/printers/console_printer.py b/cycode/cli/printers/console_printer.py index b1c9e371..a8f764d0 100644 --- a/cycode/cli/printers/console_printer.py +++ b/cycode/cli/printers/console_printer.py @@ -37,7 +37,7 @@ def print_scan_results( self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: Optional[str] = None + aggregation_report_url: Optional[str] = None, ) -> None: printer = self._get_scan_printer() printer.print_scan_results(local_scan_results, errors, aggregation_report_url=aggregation_report_url) diff --git a/cycode/cli/printers/json_printer.py b/cycode/cli/printers/json_printer.py index 0ea6026d..2645caa2 100644 --- a/cycode/cli/printers/json_printer.py +++ b/cycode/cli/printers/json_printer.py @@ -23,10 +23,10 @@ def print_error(self, error: CliError) -> None: click.echo(self.get_data_json(result)) def print_scan_results( - self, - local_scan_results: List['LocalScanResult'], - errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: Optional[str] = None + self, + local_scan_results: List['LocalScanResult'], + errors: Optional[Dict[str, 'CliError']] = None, + aggregation_report_url: Optional[str] = None, ) -> None: scan_ids = [] report_urls = [] @@ -52,7 +52,7 @@ def print_scan_results( click.echo(self._get_json_scan_result(scan_ids, detections_dict, report_urls, inlined_errors)) def _get_json_scan_result( - self, scan_ids: List[str], detections: dict, report_urls: List[str], errors: List[dict] + self, scan_ids: List[str], detections: dict, report_urls: List[str], errors: List[dict] ) -> str: result = { 'scan_id': 'DEPRECATED', # backward compatibility diff --git a/cycode/cli/printers/printer_base.py b/cycode/cli/printers/printer_base.py index f893c25d..fe41aeb3 100644 --- a/cycode/cli/printers/printer_base.py +++ b/cycode/cli/printers/printer_base.py @@ -24,7 +24,7 @@ def print_scan_results( self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: Optional[str] = None + aggregation_report_url: Optional[str] = None, ) -> None: pass diff --git a/cycode/cli/printers/tables/table_printer_base.py b/cycode/cli/printers/tables/table_printer_base.py index 2eb051fc..d0d2394f 100644 --- a/cycode/cli/printers/tables/table_printer_base.py +++ b/cycode/cli/printers/tables/table_printer_base.py @@ -28,7 +28,7 @@ def print_scan_results( self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: Optional[str] = None + aggregation_report_url: Optional[str] = None, ) -> None: if not errors and all(result.issue_detected == 0 for result in local_scan_results): click.secho('Good job! No issues were found!!! 👏👏👏', fg=self.GREEN_COLOR_NAME) diff --git a/cycode/cli/printers/text_printer.py b/cycode/cli/printers/text_printer.py index 29cb3c3a..2220807c 100644 --- a/cycode/cli/printers/text_printer.py +++ b/cycode/cli/printers/text_printer.py @@ -31,10 +31,10 @@ def print_error(self, error: CliError) -> None: click.secho(error.message, fg=self.RED_COLOR_NAME) def print_scan_results( - self, - local_scan_results: List['LocalScanResult'], - errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: Optional[str] = None + self, + local_scan_results: List['LocalScanResult'], + errors: Optional[Dict[str, 'CliError']] = None, + aggregation_report_url: Optional[str] = None, ) -> None: if not errors and all(result.issue_detected == 0 for result in local_scan_results): click.secho('Good job! No issues were found!!! 👏👏👏', fg=self.GREEN_COLOR_NAME) @@ -58,19 +58,14 @@ def print_scan_results( click.echo(f'- {scan_id}: ', nl=False) self.print_error(error) - - def _print_document_detections( - self, document_detections: DocumentDetections, scan_id: str - ) -> None: + def _print_document_detections(self, document_detections: DocumentDetections, scan_id: str) -> None: document = document_detections.document lines_to_display = self._get_lines_to_display_count() for detection in document_detections.detections: self._print_detection_summary(detection, document.path, scan_id) self._print_detection_code_segment(detection, document, lines_to_display) - def _print_detection_summary( - self, detection: Detection, document_path: str, scan_id: str - ) -> None: + def _print_detection_summary(self, detection: Detection, document_path: str, scan_id: str) -> None: detection_name = detection.type if self.scan_type == SECRET_SCAN_TYPE else detection.message detection_name_styled = click.style(detection_name, fg='bright_red', bold=True) @@ -120,13 +115,13 @@ def _get_code_segment_start_line(detection_line: int, code_segment_size: int) -> return 0 if start_line < 0 else start_line def _print_line_of_code_segment( - self, - document: Document, - line: str, - line_number: int, - detection_position_in_line: int, - violation_length: int, - is_detection_line: bool, + self, + document: Document, + line: str, + line_number: int, + detection_position_in_line: int, + violation_length: int, + is_detection_line: bool, ) -> None: if is_detection_line: self._print_detection_line(document, line, line_number, detection_position_in_line, violation_length) @@ -134,8 +129,7 @@ def _print_line_of_code_segment( self._print_line(document, line, line_number) def _print_detection_line( - self, document: Document, line: str, line_number: int, detection_position_in_line: int, - violation_length: int + self, document: Document, line: str, line_number: int, detection_position_in_line: int, violation_length: int ) -> None: detection_line = self._get_detection_line_style( line, document.is_git_diff_format, detection_position_in_line, violation_length @@ -154,12 +148,12 @@ def _get_detection_line_style(self, line: str, is_git_diff: bool, start_position if self.scan_type != SECRET_SCAN_TYPE or start_position < 0 or length < 0: return self._get_line_style(line, is_git_diff, line_color) - violation = line[start_position: start_position + length] + violation = line[start_position : start_position + length] if not self.show_secret: violation = obfuscate_text(violation) line_to_violation = line[0:start_position] - line_from_violation = line[start_position + length:] + line_from_violation = line[start_position + length :] return ( f'{self._get_line_style(line_to_violation, is_git_diff, line_color)}' @@ -168,7 +162,7 @@ def _get_detection_line_style(self, line: str, is_git_diff: bool, start_position ) def _get_line_style( - self, line: str, is_git_diff: bool, color: Optional[str] = None, underline: bool = False + self, line: str, is_git_diff: bool, color: Optional[str] = None, underline: bool = False ) -> str: if color is None: color = self._get_line_color(line, is_git_diff) From f80402180f4803dcf4bcb111d5131a98fab3d606 Mon Sep 17 00:00:00 2001 From: Sara Montefiore Date: Thu, 23 May 2024 10:23:17 +0300 Subject: [PATCH 7/9] CM-34882-fix --- cycode/cli/printers/tables/sca_table_printer.py | 6 ++++-- cycode/cli/printers/tables/table_printer.py | 8 ++++++-- cycode/cli/printers/tables/table_printer_base.py | 9 +++++++-- cycode/cli/printers/text_printer.py | 2 +- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/cycode/cli/printers/tables/sca_table_printer.py b/cycode/cli/printers/tables/sca_table_printer.py index 77324b96..5ed3bade 100644 --- a/cycode/cli/printers/tables/sca_table_printer.py +++ b/cycode/cli/printers/tables/sca_table_printer.py @@ -1,5 +1,5 @@ from collections import defaultdict -from typing import TYPE_CHECKING, Dict, List +from typing import TYPE_CHECKING, Dict, List, Optional import click @@ -39,7 +39,9 @@ class ScaTablePrinter(TablePrinterBase): - def _print_results(self, local_scan_results: List['LocalScanResult'], aggregation_report_url: str = '') -> None: + def _print_results( + self, local_scan_results: List['LocalScanResult'], aggregation_report_url: Optional[str] = None + ) -> None: detections_per_policy_id = self._extract_detections_per_policy_id(local_scan_results) for policy_id, detections in detections_per_policy_id.items(): table = self._get_table(policy_id) diff --git a/cycode/cli/printers/tables/table_printer.py b/cycode/cli/printers/tables/table_printer.py index daa76024..78618e40 100644 --- a/cycode/cli/printers/tables/table_printer.py +++ b/cycode/cli/printers/tables/table_printer.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, List +from typing import TYPE_CHECKING, List, Optional import click @@ -51,7 +51,11 @@ class TablePrinter(TablePrinterBase): - def _print_results(self, local_scan_results: List['LocalScanResult'], aggregation_report_url: str = '') -> None: + def _print_results( + self, + local_scan_results: List['LocalScanResult'], + aggregation_report_url: Optional[str] = None, + ) -> None: table = self._get_table() if self.scan_type in COLUMN_WIDTHS_CONFIG: table.set_cols_width(COLUMN_WIDTHS_CONFIG[self.scan_type]) diff --git a/cycode/cli/printers/tables/table_printer_base.py b/cycode/cli/printers/tables/table_printer_base.py index d0d2394f..133511ef 100644 --- a/cycode/cli/printers/tables/table_printer_base.py +++ b/cycode/cli/printers/tables/table_printer_base.py @@ -52,7 +52,9 @@ def _is_git_repository(self) -> bool: return self.context.obj.get('remote_url') is not None @abc.abstractmethod - def _print_results(self, local_scan_results: List['LocalScanResult'], aggregation_report_url: str = '') -> None: + def _print_results( + self, local_scan_results: List['LocalScanResult'], aggregation_report_url: Optional[str] = None + ) -> None: raise NotImplementedError @staticmethod @@ -61,7 +63,10 @@ def _print_table(table: 'Table') -> None: click.echo(table.get_table().draw()) @staticmethod - def _print_report_urls(local_scan_results: List['LocalScanResult'], aggregation_report_url: str = '') -> None: + def _print_report_urls( + local_scan_results: List['LocalScanResult'], + aggregation_report_url: Optional[str] = None, + ) -> None: report_urls = [scan_result.report_url for scan_result in local_scan_results if scan_result.report_url] if not report_urls and not aggregation_report_url: return diff --git a/cycode/cli/printers/text_printer.py b/cycode/cli/printers/text_printer.py index 2220807c..91c6b029 100644 --- a/cycode/cli/printers/text_printer.py +++ b/cycode/cli/printers/text_printer.py @@ -98,7 +98,7 @@ def _print_detection_code_segment(self, detection: Detection, document: Document self._print_detection_from_file(detection, document, code_segment_size) @staticmethod - def _print_report_urls(report_urls: List[str], aggregation_report_url: str = '') -> None: + def _print_report_urls(report_urls: List[str], aggregation_report_url: Optional[str] = None) -> None: if not report_urls and not aggregation_report_url: return if aggregation_report_url: From 44971103cb2704dc0684ad50511ec6413baae920 Mon Sep 17 00:00:00 2001 From: Sara Montefiore Date: Thu, 23 May 2024 11:42:26 +0300 Subject: [PATCH 8/9] CM-34882-fix --- cycode/cli/commands/scan/code_scanner.py | 3 +-- cycode/cli/printers/console_printer.py | 5 ++--- cycode/cli/printers/json_printer.py | 12 +++++------- cycode/cli/printers/printer_base.py | 5 +---- cycode/cli/printers/tables/sca_table_printer.py | 7 +++---- cycode/cli/printers/tables/table_printer_base.py | 11 +++-------- cycode/cli/printers/text_printer.py | 8 +++----- 7 files changed, 18 insertions(+), 33 deletions(-) diff --git a/cycode/cli/commands/scan/code_scanner.py b/cycode/cli/commands/scan/code_scanner.py index 17a67160..1cb18395 100644 --- a/cycode/cli/commands/scan/code_scanner.py +++ b/cycode/cli/commands/scan/code_scanner.py @@ -587,8 +587,7 @@ def print_results( context: click.Context, local_scan_results: List[LocalScanResult], errors: Optional[Dict[str, 'CliError']] = None ) -> None: printer = ConsolePrinter(context) - aggregation_report_url = context.obj.get('aggregation_report_url') - printer.print_scan_results(local_scan_results, errors, aggregation_report_url=aggregation_report_url) + printer.print_scan_results(local_scan_results, errors) def get_document_detections( diff --git a/cycode/cli/printers/console_printer.py b/cycode/cli/printers/console_printer.py index a8f764d0..ad473560 100644 --- a/cycode/cli/printers/console_printer.py +++ b/cycode/cli/printers/console_printer.py @@ -28,7 +28,7 @@ def __init__(self, context: click.Context) -> None: self.context = context self.scan_type = self.context.obj.get('scan_type') self.output_type = self.context.obj.get('output') - + self.aggregation_report_url = self.context.obj.get('aggregation_report_url') self._printer_class = self._AVAILABLE_PRINTERS.get(self.output_type) if self._printer_class is None: raise CycodeError(f'"{self.output_type}" output type is not supported.') @@ -37,10 +37,9 @@ def print_scan_results( self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: Optional[str] = None, ) -> None: printer = self._get_scan_printer() - printer.print_scan_results(local_scan_results, errors, aggregation_report_url=aggregation_report_url) + printer.print_scan_results(local_scan_results, errors) def _get_scan_printer(self) -> 'PrinterBase': printer_class = self._printer_class diff --git a/cycode/cli/printers/json_printer.py b/cycode/cli/printers/json_printer.py index 2645caa2..187a1bf8 100644 --- a/cycode/cli/printers/json_printer.py +++ b/cycode/cli/printers/json_printer.py @@ -23,21 +23,19 @@ def print_error(self, error: CliError) -> None: click.echo(self.get_data_json(result)) def print_scan_results( - self, - local_scan_results: List['LocalScanResult'], - errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: Optional[str] = None, + self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None ) -> None: scan_ids = [] report_urls = [] detections = [] + aggregation_report_url = self.context.obj.get('aggregation_report_url') + if aggregation_report_url: + report_urls.append(aggregation_report_url) for local_scan_result in local_scan_results: scan_ids.append(local_scan_result.scan_id) - if aggregation_report_url: - report_urls.append(aggregation_report_url) - elif local_scan_result.report_url: + if not aggregation_report_url and local_scan_result.report_url: report_urls.append(local_scan_result.report_url) for document_detections in local_scan_result.document_detections: detections.extend(document_detections.detections) diff --git a/cycode/cli/printers/printer_base.py b/cycode/cli/printers/printer_base.py index fe41aeb3..cc354082 100644 --- a/cycode/cli/printers/printer_base.py +++ b/cycode/cli/printers/printer_base.py @@ -21,10 +21,7 @@ def __init__(self, context: click.Context) -> None: @abstractmethod def print_scan_results( - self, - local_scan_results: List['LocalScanResult'], - errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: Optional[str] = None, + self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None ) -> None: pass diff --git a/cycode/cli/printers/tables/sca_table_printer.py b/cycode/cli/printers/tables/sca_table_printer.py index 5ed3bade..d51359a3 100644 --- a/cycode/cli/printers/tables/sca_table_printer.py +++ b/cycode/cli/printers/tables/sca_table_printer.py @@ -1,5 +1,5 @@ from collections import defaultdict -from typing import TYPE_CHECKING, Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List import click @@ -39,9 +39,8 @@ class ScaTablePrinter(TablePrinterBase): - def _print_results( - self, local_scan_results: List['LocalScanResult'], aggregation_report_url: Optional[str] = None - ) -> None: + def _print_results(self, local_scan_results: List['LocalScanResult']) -> None: + aggregation_report_url = self.context.obj.get('aggregation_report_url') detections_per_policy_id = self._extract_detections_per_policy_id(local_scan_results) for policy_id, detections in detections_per_policy_id.items(): table = self._get_table(policy_id) diff --git a/cycode/cli/printers/tables/table_printer_base.py b/cycode/cli/printers/tables/table_printer_base.py index 133511ef..be41454f 100644 --- a/cycode/cli/printers/tables/table_printer_base.py +++ b/cycode/cli/printers/tables/table_printer_base.py @@ -25,16 +25,13 @@ def print_error(self, error: CliError) -> None: TextPrinter(self.context).print_error(error) def print_scan_results( - self, - local_scan_results: List['LocalScanResult'], - errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: Optional[str] = None, + self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None ) -> None: if not errors and all(result.issue_detected == 0 for result in local_scan_results): click.secho('Good job! No issues were found!!! 👏👏👏', fg=self.GREEN_COLOR_NAME) return - self._print_results(local_scan_results, aggregation_report_url=aggregation_report_url) + self._print_results(local_scan_results) if not errors: return @@ -52,9 +49,7 @@ def _is_git_repository(self) -> bool: return self.context.obj.get('remote_url') is not None @abc.abstractmethod - def _print_results( - self, local_scan_results: List['LocalScanResult'], aggregation_report_url: Optional[str] = None - ) -> None: + def _print_results(self, local_scan_results: List['LocalScanResult']) -> None: raise NotImplementedError @staticmethod diff --git a/cycode/cli/printers/text_printer.py b/cycode/cli/printers/text_printer.py index 91c6b029..1e2babd2 100644 --- a/cycode/cli/printers/text_printer.py +++ b/cycode/cli/printers/text_printer.py @@ -31,10 +31,7 @@ def print_error(self, error: CliError) -> None: click.secho(error.message, fg=self.RED_COLOR_NAME) def print_scan_results( - self, - local_scan_results: List['LocalScanResult'], - errors: Optional[Dict[str, 'CliError']] = None, - aggregation_report_url: Optional[str] = None, + self, local_scan_results: List['LocalScanResult'], errors: Optional[Dict[str, 'CliError']] = None ) -> None: if not errors and all(result.issue_detected == 0 for result in local_scan_results): click.secho('Good job! No issues were found!!! 👏👏👏', fg=self.GREEN_COLOR_NAME) @@ -45,7 +42,8 @@ def print_scan_results( self._print_document_detections(document_detections, local_scan_result.scan_id) report_urls = [scan_result.report_url for scan_result in local_scan_results if scan_result.report_url] - self._print_report_urls(report_urls, aggregation_report_url) + + self._print_report_urls(report_urls, self.context.obj.get('aggregation_report_url')) if not errors: return From c26c1bfa9be61adefe0d7dc6751ffcd8af006922 Mon Sep 17 00:00:00 2001 From: Sara Montefiore Date: Thu, 23 May 2024 12:22:37 +0300 Subject: [PATCH 9/9] CM-34882-fix --- cycode/cli/commands/scan/code_scanner.py | 2 +- cycode/cli/printers/tables/table_printer.py | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/cycode/cli/commands/scan/code_scanner.py b/cycode/cli/commands/scan/code_scanner.py index 1cb18395..3c882997 100644 --- a/cycode/cli/commands/scan/code_scanner.py +++ b/cycode/cli/commands/scan/code_scanner.py @@ -330,7 +330,7 @@ def scan_documents( print_results(context, local_scan_results, errors) -def set_aggregation_report_url(context: click.Context, aggregation_report_url: str = '') -> None: +def set_aggregation_report_url(context: click.Context, aggregation_report_url: Optional[str] = None) -> None: context.obj['aggregation_report_url'] = aggregation_report_url diff --git a/cycode/cli/printers/tables/table_printer.py b/cycode/cli/printers/tables/table_printer.py index 78618e40..f2153e56 100644 --- a/cycode/cli/printers/tables/table_printer.py +++ b/cycode/cli/printers/tables/table_printer.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, List import click @@ -51,11 +51,7 @@ class TablePrinter(TablePrinterBase): - def _print_results( - self, - local_scan_results: List['LocalScanResult'], - aggregation_report_url: Optional[str] = None, - ) -> None: + def _print_results(self, local_scan_results: List['LocalScanResult']) -> None: table = self._get_table() if self.scan_type in COLUMN_WIDTHS_CONFIG: table.set_cols_width(COLUMN_WIDTHS_CONFIG[self.scan_type]) @@ -67,7 +63,7 @@ def _print_results( self._enrich_table_with_values(table, detection, document_detections.document) self._print_table(table) - self._print_report_urls(local_scan_results, aggregation_report_url) + self._print_report_urls(local_scan_results, self.context.obj.get('aggregation_report_url')) def _get_table(self) -> Table: table = Table()