Skip to content

Commit

Permalink
CM-30564-formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
saramontif committed Jan 24, 2024
1 parent 0d7f330 commit c9bd0d4
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 76 deletions.
148 changes: 73 additions & 75 deletions cycode/cli/commands/scan/code_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _should_use_scan_service(scan_type: str, scan_parameters: Optional[dict] = N


def _enrich_scan_result_with_data_from_detection_rules(
cycode_client: 'ScanClient', scan_type: str, scan_result: ZippedFileScanResult
cycode_client: 'ScanClient', scan_type: str, scan_result: ZippedFileScanResult
) -> None:
# TODO(MarshalX): remove scan_type arg after migration to new backend filter
if scan_type != consts.SECRET_SCAN_TYPE:
Expand Down Expand Up @@ -136,7 +136,7 @@ def _enrich_scan_result_with_data_from_detection_rules(


def _get_scan_documents_thread_func(
context: click.Context, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict
context: click.Context, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict
) -> Callable[[List[Document]], Tuple[str, CliError, LocalScanResult]]:
cycode_client = context.obj['client']
scan_type = context.obj['scan_type']
Expand Down Expand Up @@ -213,7 +213,7 @@ def _scan_batch_thread_func(batch: List[Document]) -> Tuple[str, CliError, Local


def scan_commit_range(
context: click.Context, path: str, commit_range: str, max_commits_count: Optional[int] = None
context: click.Context, path: str, commit_range: str, max_commits_count: Optional[int] = None
) -> None:
scan_type = context.obj['scan_type']

Expand Down Expand Up @@ -276,11 +276,11 @@ def scan_commit_range(


def scan_documents(
context: click.Context,
documents_to_scan: List[Document],
is_git_diff: bool = False,
is_commit_range: bool = False,
scan_parameters: Optional[dict] = None,
context: click.Context,
documents_to_scan: List[Document],
is_git_diff: bool = False,
is_commit_range: bool = False,
scan_parameters: Optional[dict] = None,
) -> None:
if not scan_parameters:
scan_parameters = get_default_scan_parameters(context)
Expand Down Expand Up @@ -311,11 +311,11 @@ def scan_documents(


def scan_commit_range_documents(
context: click.Context,
from_documents_to_scan: List[Document],
to_documents_to_scan: List[Document],
scan_parameters: Optional[dict] = None,
timeout: Optional[int] = None,
context: click.Context,
from_documents_to_scan: List[Document],
to_documents_to_scan: List[Document],
scan_parameters: Optional[dict] = None,
timeout: Optional[int] = None,
) -> None:
cycode_client = context.obj['client']
scan_type = context.obj['scan_type']
Expand Down Expand Up @@ -406,11 +406,11 @@ def should_scan_documents(from_documents_to_scan: List[Document], to_documents_t


def create_local_scan_result(
scan_result: ZippedFileScanResult,
documents_to_scan: List[Document],
command_scan_type: str,
scan_type: str,
severity_threshold: str,
scan_result: ZippedFileScanResult,
documents_to_scan: List[Document],
command_scan_type: str,
scan_type: str,
severity_threshold: str,
) -> LocalScanResult:
document_detections = get_document_detections(scan_result, documents_to_scan)
relevant_document_detections_list = exclude_irrelevant_document_detections(
Expand All @@ -433,14 +433,14 @@ def create_local_scan_result(


def perform_scan(
cycode_client: 'ScanClient',
zipped_documents: 'InMemoryZip',
scan_type: str,
scan_id: str,
is_git_diff: bool,
is_commit_range: bool,
scan_parameters: dict,
should_use_scan_service: bool = False,
cycode_client: 'ScanClient',
zipped_documents: 'InMemoryZip',
scan_type: str,
scan_id: str,
is_git_diff: bool,
is_commit_range: bool,
scan_parameters: dict,
should_use_scan_service: bool = False,
) -> ZippedFileScanResult:
if scan_type in (consts.SCA_SCAN_TYPE, consts.SAST_SCAN_TYPE) or should_use_scan_service:
return perform_scan_async(cycode_client, zipped_documents, scan_type, scan_parameters, should_use_scan_service)
Expand All @@ -452,11 +452,11 @@ def perform_scan(


def perform_scan_async(
cycode_client: 'ScanClient',
zipped_documents: 'InMemoryZip',
scan_type: str,
scan_parameters: dict,
should_use_scan_service: bool = False,
cycode_client: 'ScanClient',
zipped_documents: 'InMemoryZip',
scan_type: str,
scan_parameters: dict,
should_use_scan_service: bool = False,
) -> ZippedFileScanResult:
scan_async_result = cycode_client.zipped_file_scan_async(
zipped_documents, scan_type, scan_parameters, should_use_scan_service=should_use_scan_service
Expand All @@ -473,13 +473,13 @@ def perform_scan_async(


def perform_commit_range_scan_async(
cycode_client: 'ScanClient',
from_commit_zipped_documents: 'InMemoryZip',
to_commit_zipped_documents: 'InMemoryZip',
scan_type: str,
scan_parameters: dict,
timeout: Optional[int] = None,
should_use_scan_service: bool = False,
cycode_client: 'ScanClient',
from_commit_zipped_documents: 'InMemoryZip',
to_commit_zipped_documents: 'InMemoryZip',
scan_type: str,
scan_parameters: dict,
timeout: Optional[int] = None,
should_use_scan_service: bool = False,
) -> ZippedFileScanResult:
scan_async_result = cycode_client.multiple_zipped_file_scan_async(
from_commit_zipped_documents, to_commit_zipped_documents, scan_type, scan_parameters
Expand All @@ -497,12 +497,12 @@ def perform_commit_range_scan_async(


def poll_scan_results(
cycode_client: 'ScanClient',
scan_id: str,
scan_type: str,
should_get_report: bool = False,
polling_timeout: Optional[int] = None,
should_use_scan_service: bool = False,
cycode_client: 'ScanClient',
scan_id: str,
scan_type: str,
should_get_report: bool = False,
polling_timeout: Optional[int] = None,
should_use_scan_service: bool = False,
) -> ZippedFileScanResult:
if polling_timeout is None:
polling_timeout = configuration_manager.get_scan_polling_timeout_in_seconds()
Expand Down Expand Up @@ -539,15 +539,14 @@ def print_debug_scan_details(scan_details_response: 'ScanDetailsResponse') -> No


def print_results(
context: click.Context, local_scan_results: List[LocalScanResult],
errors: Optional[Dict[str, 'CliError']] = None
context: click.Context, local_scan_results: List[LocalScanResult], errors: Optional[Dict[str, 'CliError']] = None
) -> None:
printer = ConsolePrinter(context)
printer.print_scan_results(local_scan_results, errors)


def get_document_detections(
scan_result: ZippedFileScanResult, documents_to_scan: List[Document]
scan_result: ZippedFileScanResult, documents_to_scan: List[Document]
) -> List[DocumentDetections]:
logger.debug('Get document detections')

Expand All @@ -565,8 +564,7 @@ def get_document_detections(


def exclude_irrelevant_document_detections(
document_detections_list: List[DocumentDetections], scan_type: str, command_scan_type: str,
severity_threshold: str
document_detections_list: List[DocumentDetections], scan_type: str, command_scan_type: str, severity_threshold: str
) -> List[DocumentDetections]:
relevant_document_detections_list = []
for document_detections in document_detections_list:
Expand Down Expand Up @@ -647,15 +645,15 @@ def try_get_git_remote_url(path: str) -> Optional[str]:


def exclude_irrelevant_detections(
detections: List[Detection], scan_type: str, command_scan_type: str, severity_threshold: str
detections: List[Detection], scan_type: str, command_scan_type: str, severity_threshold: str
) -> List[Detection]:
relevant_detections = _exclude_detections_by_exclusions_configuration(detections, scan_type)
relevant_detections = _exclude_detections_by_scan_type(relevant_detections, scan_type, command_scan_type)
return _exclude_detections_by_severity(relevant_detections, scan_type, severity_threshold)


def _exclude_detections_by_severity(
detections: List[Detection], scan_type: str, severity_threshold: str
detections: List[Detection], scan_type: str, severity_threshold: str
) -> List[Detection]:
if scan_type != consts.SCA_SCAN_TYPE or severity_threshold is None:
return detections
Expand All @@ -670,16 +668,16 @@ def _exclude_detections_by_severity(


def _exclude_detections_by_scan_type(
detections: List[Detection], scan_type: str, command_scan_type: str
detections: List[Detection], scan_type: str, command_scan_type: str
) -> List[Detection]:
if command_scan_type == consts.PRE_COMMIT_COMMAND_SCAN_TYPE:
return exclude_detections_in_deleted_lines(detections)

exclude_in_deleted_lines = configuration_manager.get_should_exclude_detections_in_deleted_lines(command_scan_type)
if (
command_scan_type in consts.COMMIT_RANGE_BASED_COMMAND_SCAN_TYPES
and scan_type == consts.SECRET_SCAN_TYPE
and exclude_in_deleted_lines
command_scan_type in consts.COMMIT_RANGE_BASED_COMMAND_SCAN_TYPES
and scan_type == consts.SECRET_SCAN_TYPE
and exclude_in_deleted_lines
):
return exclude_detections_in_deleted_lines(detections)

Expand Down Expand Up @@ -751,7 +749,7 @@ def _get_package_name(detection: Detection) -> str:


def _get_document_by_file_name(
documents: List[Document], file_name: str, unique_id: Optional[str] = None
documents: List[Document], file_name: str, unique_id: Optional[str] = None
) -> Optional[Document]:
for document in documents:
if _normalize_file_path(document.path) == _normalize_file_path(file_name) and document.unique_id == unique_id:
Expand All @@ -761,17 +759,17 @@ def _get_document_by_file_name(


def _report_scan_status(
cycode_client: 'ScanClient',
scan_type: str,
scan_id: str,
scan_completed: bool,
output_detections_count: int,
all_detections_count: int,
files_to_scan_count: int,
zip_size: int,
command_scan_type: str,
error_message: Optional[str],
should_use_scan_service: bool = False,
cycode_client: 'ScanClient',
scan_type: str,
scan_id: str,
scan_completed: bool,
output_detections_count: int,
all_detections_count: int,
files_to_scan_count: int,
zip_size: int,
command_scan_type: str,
error_message: Optional[str],
should_use_scan_service: bool = False,
) -> None:
try:
end_scan_time = time.time()
Expand Down Expand Up @@ -806,11 +804,11 @@ def _does_severity_match_severity_threshold(severity: str, severity_threshold: s


def _get_scan_result(
cycode_client: 'ScanClient',
scan_type: str,
scan_id: str,
scan_details: 'ScanDetailsResponse',
should_get_report: bool = False,
cycode_client: 'ScanClient',
scan_type: str,
scan_id: str,
scan_details: 'ScanDetailsResponse',
should_get_report: bool = False,
) -> ZippedFileScanResult:
if not scan_details.detections_count:
return init_default_scan_result(cycode_client, scan_id, scan_type, should_get_report)
Expand All @@ -828,7 +826,7 @@ def _get_scan_result(


def init_default_scan_result(
cycode_client: 'ScanClient', scan_id: str, scan_type: str, should_get_report: bool = False
cycode_client: 'ScanClient', scan_id: str, scan_type: str, should_get_report: bool = False
) -> ZippedFileScanResult:
return ZippedFileScanResult(
did_detect=False,
Expand All @@ -839,7 +837,7 @@ def init_default_scan_result(


def _try_get_report_url_if_needed(
cycode_client: 'ScanClient', should_get_report: bool, scan_id: str, scan_type: str
cycode_client: 'ScanClient', should_get_report: bool, scan_id: str, scan_type: str
) -> Optional[str]:
if not should_get_report:
return None
Expand All @@ -852,7 +850,7 @@ def _try_get_report_url_if_needed(


def wait_for_detections_creation(
cycode_client: 'ScanClient', scan_type: str, scan_id: str, expected_detections_count: int
cycode_client: 'ScanClient', scan_type: str, scan_id: str, expected_detections_count: int
) -> None:
logger.debug('Waiting for detections to be created')

Expand Down
2 changes: 1 addition & 1 deletion cycode/cyclient/scan_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def get_scan_report_url(self, scan_id: str, scan_type: str) -> models.ScanReport
def get_zipped_file_scan_async_url_path(self, scan_type: str, should_use_scan_service: bool = False) -> str:
async_scan_type = self.scan_config.get_async_scan_type(scan_type)
async_entity_type = self.scan_config.get_async_entity_type(scan_type)
scan_service_url_path=self.get_scan_service_url_path(scan_type, should_use_scan_service)
scan_service_url_path = self.get_scan_service_url_path(scan_type, should_use_scan_service)
return f'{scan_service_url_path}/{async_scan_type}/{async_entity_type}'

def zipped_file_scan_async(
Expand Down

0 comments on commit c9bd0d4

Please sign in to comment.