Skip to content

Commit

Permalink
CM-30564-formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
saramontif committed Jan 24, 2024
1 parent b665c5b commit b8ea49f
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 76 deletions.
152 changes: 77 additions & 75 deletions cycode/cli/commands/scan/code_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ def set_issue_detected_by_scan_results(context: click.Context, scan_results: Lis
set_issue_detected(context, any(scan_result.issue_detected for scan_result in scan_results))


def _should_use_scan_service(scan_type: str, scan_parameters: Optional[dict] = None):
def _should_use_scan_service(scan_type: str, scan_parameters: Optional[dict] = None) -> bool:
return scan_type == consts.SECRET_SCAN_TYPE and scan_parameters['report'] is True


def _enrich_scan_result_with_data_from_detection_rules(
cycode_client: 'ScanClient', scan_type: str, scan_result: ZippedFileScanResult
cycode_client: 'ScanClient', scan_type: str, scan_result: ZippedFileScanResult
) -> None:
# TODO(MarshalX): remove scan_type arg after migration to new backend filter
if scan_type != consts.SECRET_SCAN_TYPE:
Expand Down Expand Up @@ -136,7 +136,7 @@ def _enrich_scan_result_with_data_from_detection_rules(


def _get_scan_documents_thread_func(
context: click.Context, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict
context: click.Context, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict
) -> Callable[[List[Document]], Tuple[str, CliError, LocalScanResult]]:
cycode_client = context.obj['client']
scan_type = context.obj['scan_type']
Expand Down Expand Up @@ -211,7 +211,7 @@ def _scan_batch_thread_func(batch: List[Document]) -> Tuple[str, CliError, Local


def scan_commit_range(
context: click.Context, path: str, commit_range: str, max_commits_count: Optional[int] = None
context: click.Context, path: str, commit_range: str, max_commits_count: Optional[int] = None
) -> None:
scan_type = context.obj['scan_type']

Expand Down Expand Up @@ -274,11 +274,11 @@ def scan_commit_range(


def scan_documents(
context: click.Context,
documents_to_scan: List[Document],
is_git_diff: bool = False,
is_commit_range: bool = False,
scan_parameters: Optional[dict] = None,
context: click.Context,
documents_to_scan: List[Document],
is_git_diff: bool = False,
is_commit_range: bool = False,
scan_parameters: Optional[dict] = None,
) -> None:
progress_bar = context.obj['progress_bar']

Expand Down Expand Up @@ -306,11 +306,11 @@ def scan_documents(


def scan_commit_range_documents(
context: click.Context,
from_documents_to_scan: List[Document],
to_documents_to_scan: List[Document],
scan_parameters: Optional[dict] = None,
timeout: Optional[int] = None,
context: click.Context,
from_documents_to_scan: List[Document],
to_documents_to_scan: List[Document],
scan_parameters: Optional[dict] = None,
timeout: Optional[int] = None,
) -> None:
cycode_client = context.obj['client']
scan_type = context.obj['scan_type']
Expand Down Expand Up @@ -402,11 +402,11 @@ def should_scan_documents(from_documents_to_scan: List[Document], to_documents_t


def create_local_scan_result(
scan_result: ZippedFileScanResult,
documents_to_scan: List[Document],
command_scan_type: str,
scan_type: str,
severity_threshold: str,
scan_result: ZippedFileScanResult,
documents_to_scan: List[Document],
command_scan_type: str,
scan_type: str,
severity_threshold: str,
) -> LocalScanResult:
document_detections = get_document_detections(scan_result, documents_to_scan)
relevant_document_detections_list = exclude_irrelevant_document_detections(
Expand All @@ -429,14 +429,14 @@ def create_local_scan_result(


def perform_scan(
cycode_client: 'ScanClient',
zipped_documents: 'InMemoryZip',
scan_type: str,
scan_id: str,
is_git_diff: bool,
is_commit_range: bool,
scan_parameters: dict,
should_use_scan_service: bool = False,
cycode_client: 'ScanClient',
zipped_documents: 'InMemoryZip',
scan_type: str,
scan_id: str,
is_git_diff: bool,
is_commit_range: bool,
scan_parameters: dict,
should_use_scan_service: bool = False,
) -> ZippedFileScanResult:
if scan_type in (consts.SCA_SCAN_TYPE, consts.SAST_SCAN_TYPE) or should_use_scan_service:
return perform_scan_async(cycode_client, zipped_documents, scan_type, scan_parameters, should_use_scan_service)
Expand All @@ -448,11 +448,11 @@ def perform_scan(


def perform_scan_async(
cycode_client: 'ScanClient',
zipped_documents: 'InMemoryZip',
scan_type: str,
scan_parameters: dict,
should_use_scan_service: bool = False,
cycode_client: 'ScanClient',
zipped_documents: 'InMemoryZip',
scan_type: str,
scan_parameters: dict,
should_use_scan_service: bool = False,
) -> ZippedFileScanResult:
scan_async_result = cycode_client.zipped_file_scan_async(
zipped_documents, scan_type, scan_parameters, should_use_scan_service=should_use_scan_service
Expand All @@ -469,13 +469,13 @@ def perform_scan_async(


def perform_commit_range_scan_async(
cycode_client: 'ScanClient',
from_commit_zipped_documents: 'InMemoryZip',
to_commit_zipped_documents: 'InMemoryZip',
scan_type: str,
scan_parameters: dict,
timeout: Optional[int] = None,
should_use_scan_service: bool = False,
cycode_client: 'ScanClient',
from_commit_zipped_documents: 'InMemoryZip',
to_commit_zipped_documents: 'InMemoryZip',
scan_type: str,
scan_parameters: dict,
timeout: Optional[int] = None,
should_use_scan_service: bool = False,
) -> ZippedFileScanResult:
scan_async_result = cycode_client.multiple_zipped_file_scan_async(
from_commit_zipped_documents, to_commit_zipped_documents, scan_type, scan_parameters
Expand All @@ -493,12 +493,12 @@ def perform_commit_range_scan_async(


def poll_scan_results(
cycode_client: 'ScanClient',
scan_id: str,
scan_type: str,
should_get_report: bool = False,
polling_timeout: Optional[int] = None,
should_use_scan_service: bool = False,
cycode_client: 'ScanClient',
scan_id: str,
scan_type: str,
should_get_report: bool = False,
polling_timeout: Optional[int] = None,
should_use_scan_service: bool = False,
) -> ZippedFileScanResult:
if polling_timeout is None:
polling_timeout = configuration_manager.get_scan_polling_timeout_in_seconds()
Expand Down Expand Up @@ -535,14 +535,15 @@ def print_debug_scan_details(scan_details_response: 'ScanDetailsResponse') -> No


def print_results(
context: click.Context, local_scan_results: List[LocalScanResult], errors: Optional[Dict[str, 'CliError']] = None
context: click.Context, local_scan_results: List[LocalScanResult],
errors: Optional[Dict[str, 'CliError']] = None
) -> None:
printer = ConsolePrinter(context)
printer.print_scan_results(local_scan_results, errors)


def get_document_detections(
scan_result: ZippedFileScanResult, documents_to_scan: List[Document]
scan_result: ZippedFileScanResult, documents_to_scan: List[Document]
) -> List[DocumentDetections]:
logger.debug('Get document detections')

Expand All @@ -560,7 +561,8 @@ def get_document_detections(


def exclude_irrelevant_document_detections(
document_detections_list: List[DocumentDetections], scan_type: str, command_scan_type: str, severity_threshold: str
document_detections_list: List[DocumentDetections], scan_type: str, command_scan_type: str,
severity_threshold: str
) -> List[DocumentDetections]:
relevant_document_detections_list = []
for document_detections in document_detections_list:
Expand Down Expand Up @@ -640,15 +642,15 @@ def try_get_git_remote_url(path: str) -> Optional[str]:


def exclude_irrelevant_detections(
detections: List[Detection], scan_type: str, command_scan_type: str, severity_threshold: str
detections: List[Detection], scan_type: str, command_scan_type: str, severity_threshold: str
) -> List[Detection]:
relevant_detections = _exclude_detections_by_exclusions_configuration(detections, scan_type)
relevant_detections = _exclude_detections_by_scan_type(relevant_detections, scan_type, command_scan_type)
return _exclude_detections_by_severity(relevant_detections, scan_type, severity_threshold)


def _exclude_detections_by_severity(
detections: List[Detection], scan_type: str, severity_threshold: str
detections: List[Detection], scan_type: str, severity_threshold: str
) -> List[Detection]:
if scan_type != consts.SCA_SCAN_TYPE or severity_threshold is None:
return detections
Expand All @@ -663,16 +665,16 @@ def _exclude_detections_by_severity(


def _exclude_detections_by_scan_type(
detections: List[Detection], scan_type: str, command_scan_type: str
detections: List[Detection], scan_type: str, command_scan_type: str
) -> List[Detection]:
if command_scan_type == consts.PRE_COMMIT_COMMAND_SCAN_TYPE:
return exclude_detections_in_deleted_lines(detections)

exclude_in_deleted_lines = configuration_manager.get_should_exclude_detections_in_deleted_lines(command_scan_type)
if (
command_scan_type in consts.COMMIT_RANGE_BASED_COMMAND_SCAN_TYPES
and scan_type == consts.SECRET_SCAN_TYPE
and exclude_in_deleted_lines
command_scan_type in consts.COMMIT_RANGE_BASED_COMMAND_SCAN_TYPES
and scan_type == consts.SECRET_SCAN_TYPE
and exclude_in_deleted_lines
):
return exclude_detections_in_deleted_lines(detections)

Expand Down Expand Up @@ -744,7 +746,7 @@ def _get_package_name(detection: Detection) -> str:


def _get_document_by_file_name(
documents: List[Document], file_name: str, unique_id: Optional[str] = None
documents: List[Document], file_name: str, unique_id: Optional[str] = None
) -> Optional[Document]:
for document in documents:
if _normalize_file_path(document.path) == _normalize_file_path(file_name) and document.unique_id == unique_id:
Expand All @@ -754,17 +756,17 @@ def _get_document_by_file_name(


def _report_scan_status(
cycode_client: 'ScanClient',
scan_type: str,
scan_id: str,
scan_completed: bool,
output_detections_count: int,
all_detections_count: int,
files_to_scan_count: int,
zip_size: int,
command_scan_type: str,
error_message: Optional[str],
should_use_scan_service: bool = False,
cycode_client: 'ScanClient',
scan_type: str,
scan_id: str,
scan_completed: bool,
output_detections_count: int,
all_detections_count: int,
files_to_scan_count: int,
zip_size: int,
command_scan_type: str,
error_message: Optional[str],
should_use_scan_service: bool = False,
) -> None:
try:
end_scan_time = time.time()
Expand Down Expand Up @@ -799,11 +801,11 @@ def _does_severity_match_severity_threshold(severity: str, severity_threshold: s


def _get_scan_result(
cycode_client: 'ScanClient',
scan_type: str,
scan_id: str,
scan_details: 'ScanDetailsResponse',
should_get_report: bool = False,
cycode_client: 'ScanClient',
scan_type: str,
scan_id: str,
scan_details: 'ScanDetailsResponse',
should_get_report: bool = False,
) -> ZippedFileScanResult:
if not scan_details.detections_count:
return init_default_scan_result(cycode_client, scan_id, scan_type, should_get_report)
Expand All @@ -821,7 +823,7 @@ def _get_scan_result(


def init_default_scan_result(
cycode_client: 'ScanClient', scan_id: str, scan_type: str, should_get_report: bool = False
cycode_client: 'ScanClient', scan_id: str, scan_type: str, should_get_report: bool = False
) -> ZippedFileScanResult:
return ZippedFileScanResult(
did_detect=False,
Expand All @@ -832,7 +834,7 @@ def init_default_scan_result(


def _try_get_report_url_if_needed(
cycode_client: 'ScanClient', should_get_report: bool, scan_id: str, scan_type: str
cycode_client: 'ScanClient', should_get_report: bool, scan_id: str, scan_type: str
) -> Optional[str]:
if not should_get_report:
return None
Expand All @@ -845,7 +847,7 @@ def _try_get_report_url_if_needed(


def wait_for_detections_creation(
cycode_client: 'ScanClient', scan_type: str, scan_id: str, expected_detections_count: int
cycode_client: 'ScanClient', scan_type: str, scan_id: str, expected_detections_count: int
) -> None:
logger.debug('Waiting for detections to be created')

Expand Down Expand Up @@ -904,7 +906,7 @@ def _get_file_name_from_detection(detection: dict) -> str:
return detection['detection_details']['file_name']


def _get_secret_file_name_from_detection(detection) -> str:
def _get_secret_file_name_from_detection(detection: dict) -> str:
file_path: str = detection['detection_details']['file_path']
file_name: str = detection['detection_details']['file_name']
return os.path.join(file_path, file_name)
Expand Down
3 changes: 2 additions & 1 deletion cycode/cyclient/scan_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ def get_scan_report_url(self, scan_id: str, scan_type: str) -> models.ScanReport
def get_zipped_file_scan_async_url_path(self, scan_type: str, should_use_scan_service: bool = False) -> str:
async_scan_type = self.scan_config.get_async_scan_type(scan_type)
async_entity_type = self.scan_config.get_async_entity_type(scan_type)
return f'{self.get_scan_service_url_path(scan_type, should_use_scan_service)}/{async_scan_type}/{async_entity_type}'
scan_service_url_path=self.get_scan_service_url_path(scan_type, should_use_scan_service)
return f'{scan_service_url_path}/{async_scan_type}/{async_entity_type}'

def zipped_file_scan_async(
self,
Expand Down

0 comments on commit b8ea49f

Please sign in to comment.