From a16982e3dea46c5c9511d25073962385c75d8ded Mon Sep 17 00:00:00 2001 From: sarahgibs Date: Fri, 17 Jan 2025 14:45:15 -0500 Subject: [PATCH] fix more pylint errors --- zap/src/scan.py | 1 - zap/src/trigger.py | 1 - zap/src/zap.py | 51 ++++++++++++++++++++++------------------ zap/src/zap_scan_type.py | 1 + 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/zap/src/scan.py b/zap/src/scan.py index 8252e5dcb..274c032e2 100755 --- a/zap/src/scan.py +++ b/zap/src/scan.py @@ -104,7 +104,6 @@ def defectdojo_upload(product_id: int, zap_filename: str, defect_dojo_key: str, absolute_path = os.path.abspath(zap_filename) date = datetime.today().strftime("%Y%m%d%H:%M") lead_id = fetch_dojo_lead_id(dojo, defect_dojo_user) - engagement=dojo.create_engagement( name=date, product_id=product_id, lead_id=lead_id, target_start=datetime.today().strftime("%Y-%m-%d"), diff --git a/zap/src/trigger.py b/zap/src/trigger.py index e2ed006c8..25bb95485 100755 --- a/zap/src/trigger.py +++ b/zap/src/trigger.py @@ -155,7 +155,6 @@ def main(): """ client = google.cloud.logging.Client() client.setup_logging() - defectdojo_url = getenv("DEFECT_DOJO_URL") defect_dojo_key = getenv("DEFECT_DOJO_KEY") zap_topic = getenv("ZAP_TOPIC_NAME") diff --git a/zap/src/zap.py b/zap/src/zap.py index ff336b08e..e690a9242 100755 --- a/zap/src/zap.py +++ b/zap/src/zap.py @@ -24,14 +24,14 @@ TIMEOUT_MINS = 5 zap_port = int(os.getenv("ZAP_PORT", "")) -proxy = f"http://localhost:{zap_port}" +PROXY = f"http://localhost:{zap_port}" def zap_connect(): """ Connect to the Zap instance """ - zap = ZAPv2(proxies={"http": proxy, "https": proxy}, apikey=os.getenv("ZAP_API_KEY", "")) + zap = ZAPv2(proxies={"http": PROXY, "https": PROXY}, apikey=os.getenv("ZAP_API_KEY", "")) wait_for_zap_start(zap, timeout_in_secs=TIMEOUT_MINS * 60) return zap @@ -50,8 +50,13 @@ def zap_init(target_url: str): matchstring = ".*" requestspersecond = 5 groupby = "host" - zap.network.add_rate_limit_rule(description, enabled, matchregex, matchstring, requestspersecond, groupby) - + zap.network.add_rate_limit_rule(description, + enabled, + matchregex, + matchstring, + requestspersecond, + groupby) + zap_access_target(zap, target_url) return zap @@ -118,18 +123,17 @@ def leo_auth(host, path, token): Set up cookie auth for leo apps. """ proxies = { - 'http': proxy, - 'https': proxy, + 'http': PROXY, + 'https': PROXY, } zap = zap_connect() zap.httpsessions.add_default_session_token("LeoToken") logging.info("Authenticating to Leo...") # Leo apps if already launched have a separate domain from Leo. - # And there's a workspace id after the host. + # And there's a workspace id after the host. # https://custom.host/workspaceId/apiEndPoints - - # Make a request to host/first part of path//setCookie + # Make a request to host/first part of path//setCookie # with bearer token header path_parts = path.split('/') if len(path_parts) > 1: @@ -173,8 +177,8 @@ def zap_setup_cookie(zap, domain, context_id, cookie_name=None): # It should always choose the newest one. sessions = zap.httpsessions.sessions(site=domain+":443") session_name = sessions[-1]["session"][0] - zap.users.set_authentication_credentials(context_id, - userid, + zap.users.set_authentication_credentials(context_id, + userid, "sessionName=" + session_name) zap.users.set_user_enabled(context_id, userid, True) @@ -220,7 +224,7 @@ def zap_set_iap_token(client_id): """ logging.info("Fetching id token from google.") zap = zap_connect() - open_id_connect_token = id_token.fetch_id_token( GoogleAuthRequest(), + open_id_connect_token = id_token.fetch_id_token( GoogleAuthRequest(), client_id) bearer = f"Bearer {open_id_connect_token}" logging.info("zap replacer auth id token (IAP)") @@ -271,9 +275,8 @@ def zap_report(zap: ZAPv2, project: str, scan_type: ScanType, sites: str): logging.info(return_message) # If successful we now have a report sitting in the transfer directory of the zap container report_data = zap.core.file_download(filename) - report_file = open(filename, "w") - report_file.write(report_data) - report_file.close() + with open(filename, "w") as report_file: + report_file.write(report_data) return filename @@ -298,7 +301,10 @@ def zap_save_session(zap: ZAPv2, return session_filename + ".zip" -def get_hail_token(credentials_path): +def get_hail_token(): + """ + Fetches an openid token from google using a Service Account file. + """ credentials = base64.b64decode(os.getenv("HAIL_KEY")).decode("utf-8") default_scopes = [ 'openid', @@ -307,7 +313,6 @@ def get_hail_token(credentials_path): 'https://www.googleapis.com/auth/appengine.admin', 'https://www.googleapis.com/auth/compute', ] - creds = json.loads(credentials) now = int(time.time()) scope = ' '.join(default_scopes) @@ -330,6 +335,9 @@ def get_hail_token(credentials_path): return resp.json()['access_token'] def zap_set_hail_token( zap, target_url): + """ + Sets a google access token in ZAP for a dedicated service account for Hail. + """ token = get_hail_token() if token: bearer = f"Bearer {token}" @@ -344,8 +352,7 @@ def zap_set_hail_token( zap, target_url): res = zap.urlopen(target_url) if int(res.split(':')[0]) == 200: return token - else: - raise RuntimeError("Failed to set token for hail.") + raise RuntimeError("Failed to set token for hail.") def zap_compliance_scan( @@ -389,11 +396,10 @@ def zap_compliance_scan( cookie_name = "__host-beehive_session" zap_setup_cookie(zap, host, context_id, cookie_name) elif scan_type == ScanType.HAILAUTH or scan_type == ScanType.HAILAPI: - hail_creds = os.getenv('HAIL_CREDS') - token = zap_set_hail_token(hail_creds, zap, target_url) + token = zap_set_hail_token(zap, target_url) else: token = zap_sa_auth(zap, env) - + if scan_type == ScanType.LEOAPP: success = leo_auth(host, path, token) if success: @@ -402,7 +408,6 @@ def zap_compliance_scan( zap_setup_cookie(zap, host, context_id) else: logging.error("Leo authentication was unsuccessful") - if scan_type == ScanType.API: zap_api_import(zap, target_url) diff --git a/zap/src/zap_scan_type.py b/zap/src/zap_scan_type.py index 55621d709..4be450b7e 100644 --- a/zap/src/zap_scan_type.py +++ b/zap/src/zap_scan_type.py @@ -18,6 +18,7 @@ class ScanType(str, Enum): BEEHIVE = "beehive" IAPAUTH ="IAPAUTH" HAILAUTH = "HAILAUTH" + HAILAPI = "HAILAPI" def __str__(self): return str(self.name).lower()