diff --git a/Tests/kaas/k8s-version-policy/k8s_version_policy.py b/Tests/kaas/k8s-version-policy/k8s_version_policy.py index a2acd3cd5..4554ab1ea 100755 --- a/Tests/kaas/k8s-version-policy/k8s_version_policy.py +++ b/Tests/kaas/k8s-version-policy/k8s_version_policy.py @@ -24,9 +24,9 @@ (c) Hannes Baum , 6/2023 (c) Martin Morgenstern , 2/2024 (c) Matthias Büchse , 3/2024 +(c) Piotr Bigos SPDX-License-Identifier: CC-BY-SA-4.0 """ - from collections import Counter from dataclasses import dataclass from datetime import datetime, timedelta @@ -35,11 +35,13 @@ import asyncio import contextlib import getopt +import json import kubernetes_asyncio import logging import logging.config import re import requests +import subprocess import sys import yaml @@ -93,6 +95,10 @@ class HelpException(BaseException): """Exception raised if the help functionality is called""" +class CriticalException(BaseException): + """Raise an exception if a critical CVE is found""" + + class Config: kubeconfig = None context = None @@ -381,6 +387,82 @@ async def collect_cve_versions(session: aiohttp.ClientSession) -> set: return cfvs +async def run_trivy_scan(image: str) -> dict: + """Run Trivy scan on the specified image and return the results as a dictionary. + + Args: + image (str): The Docker image to scan. + + Returns: + dict: Parsed JSON results from Trivy. + """ + try: + # Run the Trivy scan command + result = await asyncio.create_subprocess_exec( + 'trivy', + 'image', + '--format', 'json', + '--no-progress', + image, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + stdout, stderr = await result.communicate() + + if result.returncode != 0: + logger.error("Trivy scan failed: %s", stderr.decode().strip()) + return {} + + # Parse the JSON output from Trivy + return json.loads(stdout.decode()) + + except Exception as e: + logger.error("Error running Trivy scan: %s", e) + return {} + + +async def get_k8s_pod_images(kubeconfig, context=None) -> list[str]: + """Get the list of container images used by all the pods in the Kubernetes cluster.""" + + async with kubernetes_asyncio.client.ApiClient() as api: + v1 = kubernetes_asyncio.client.CoreV1Api(api) + pods = await v1.list_pod_for_all_namespaces(watch=False) + + images = set() + for pod in pods.items: + for container in pod.spec.containers: + images.add(container.image) + + if pod.spec.init_containers: + for container in pod.spec.init_containers: + images.add(container.image) + + return list(images) + + +async def scan_k8s_images(images_to_scan) -> None: + """Scan the images used in the Kubernetes cluster for vulnerabilities.""" + + for image in images_to_scan: + logger.info(f"Scanning image: {image}") + scan_results = await run_trivy_scan(image) + + if scan_results: + for result in scan_results.get('Results', []): + for vulnerability in result.get('Vulnerabilities', []): + logger.warning( + f"""Vulnerability found in image {image}: + {vulnerability['VulnerabilityID']} " + (Severity: {vulnerability['Severity']})""" + ) + + +async def get_images_and_scan(kubeconfig, context=None) -> None: + images_to_scan = await get_k8s_pod_images(kubeconfig, context) + await scan_k8s_images(images_to_scan) + + async def get_k8s_cluster_info(kubeconfig, context=None) -> ClusterInfo: """Get the k8s version of the cluster under test.""" cluster_config = await kubernetes_asyncio.config.load_kube_config(kubeconfig, context) @@ -479,6 +561,19 @@ async def main(argv): cve_affected_ranges = await collect_cve_versions(session) releases_data = fetch_k8s_releases_data() + try: + logger.info( + f"""Initiating scan on the Kubernetes cluster specified by kubeconfig at {config.kubeconfig} + with context {config.context if config.context else ''}. + Fetching cluster information and verifying access.""") + await get_k8s_cluster_info(config.kubeconfig, config.context) + await get_images_and_scan(config.kubeconfig, config.context) + + except CriticalException as e: + logger.critical(e) + logger.debug("Exception info", exc_info=True) + return 1 + try: context_desc = f"context '{config.context}'" if config.context else "default context" logger.info("Checking cluster specified by %s in %s.", context_desc, config.kubeconfig)