diff --git a/doozer/doozerlib/backend/konflux_client.py b/doozer/doozerlib/backend/konflux_client.py new file mode 100644 index 000000000..ce3ae642e --- /dev/null +++ b/doozer/doozerlib/backend/konflux_client.py @@ -0,0 +1,502 @@ +import asyncio +import logging +from typing import Dict, List, Optional, Sequence, Union, cast + +import jinja2 +from artcommonlib import exectools +from artcommonlib import util as art_util +from async_lru import alru_cache +from importlib_resources import files +from kubernetes import config, watch +from kubernetes.client import ApiClient, Configuration +from kubernetes.dynamic import DynamicClient, exceptions, resource +from ruamel.yaml import YAML + +from doozerlib import constants + +yaml = YAML(typ="safe") +LOGGER = logging.getLogger(__name__) + + +class KonfluxClient: + """ + KonfluxClient is a client for interacting with the Konflux API. + """ + # https://gitlab.cee.redhat.com/konflux/docs/users/-/blob/main/topics/getting-started/multi-platform-builds.md + SUPPORTED_ARCHES = { + "x86_64": "linux/x86_64", + "s390x": "linux/s390x", + "ppc64le": "linux/ppc64le", + "aarch64": "linux/arm64", + } + + def __init__(self, default_namespace: str, config: Configuration, dry_run: bool = False, logger: logging.Logger = LOGGER) -> None: + self.api_client = ApiClient(configuration=config) + self.dyn_client = DynamicClient(self.api_client) + self.default_namespace = default_namespace + self.dry_run = dry_run + self._logger = logger + + @staticmethod + def from_kubeconfig(default_namespace: str, config_file: Optional[str], context: Optional[str], dry_run: bool = False, logger: logging.Logger = LOGGER) -> "KonfluxClient": + """ Create a KonfluxClient from a kubeconfig file. + + :param config_file: The path to the kubeconfig file. + :param context: The context to use. + :param default_namespace: The default namespace. + :param dry_run: Whether to run in dry-run mode. + :param logger: The logger. + :return: The KonfluxClient. + """ + cfg = Configuration() + config.load_kube_config(config_file=config_file, context=context, persist_config=False, client_configuration=cfg) + return KonfluxClient(default_namespace=default_namespace, config=cfg, dry_run=dry_run, logger=logger) + + @alru_cache + async def _get_api(self, api_version: str, kind: str): + """ Get the API object for the given API version and kind. + + :param api_version: The API version. + :param kind: The kind. + :return: The API object. + """ + api = await exectools.to_thread( + self.dyn_client.resources.get, + api_version=api_version, + kind=kind, + ) + return api + + def _extract_manifest_metadata(self, manifest: dict): + """ Extract the metadata from a manifest. + + :param manifest: The manifest. + :return: The API version, kind, name, and namespace. + """ + api_version = manifest["apiVersion"] + kind = manifest["kind"] + name = manifest["metadata"].get("name") + namespace = manifest["metadata"].get("namespace", self.default_namespace) + return api_version, kind, name, namespace + + async def _get(self, api_version: str, kind: str, name: str, namespace: str, strict: bool = True): + """ Get a resource by name and namespace. + + :param api_version: The API version. + :param kind: The kind. + :param name: The name of the resource. + :param namespace: The namespace of the resource. + :param strict: Whether to raise an exception if the resource is not found. + :return: The resource. + """ + api = await self._get_api(api_version, kind) + resource = None + try: + resource = await exectools.to_thread(api.get, name=name, namespace=namespace) + except exceptions.NotFoundError: + if strict: + raise + return resource + + @alru_cache + async def _get__caching(self, api_version: str, kind: str, name: str, namespace: str, strict: bool = True): + """ Get a resource by name and namespace, with caching. + + :param api_version: The API version. + :param kind: The kind. + :param name: The name of the resource. + :param namespace: The namespace of the resource. + :param strict: Whether to raise an exception if the resource is not found. + :return: The resource. + """ + return await self._get(api_version, kind, name, namespace, strict) + + async def _create(self, manifest: dict, **kwargs): + """ Create a resource. + + :param manifest: The manifest. + :param kwargs: Additional keyword arguments to pass to the API. + :return: The resource. + """ + api_version, kind, name, namespace = self._extract_manifest_metadata(manifest) + api = await self._get_api(api_version, kind) + if self.dry_run: + self._logger.warning(f"[DRY RUN] Would have created {api_version}/{kind} {namespace}/{name}") + return resource.ResourceInstance(self.dyn_client, manifest) + self._logger.info(f"Creating {api_version}/{kind} {namespace}/{name or ''}...") + new = await exectools.to_thread(api.create, namespace=namespace, body=manifest, **kwargs) + new = cast(resource.ResourceInstance, new) + api_version, kind, name, namespace = self._extract_manifest_metadata(new.to_dict()) + self._logger.info(f"Created {api_version}/{kind} {namespace}/{name}") + return new + + async def _patch(self, manifest: dict): + """ Patch a resource. + + :param manifest: The manifest. + :return: The resource. + """ + api_version, kind, name, namespace = self._extract_manifest_metadata(manifest) + api = await self._get_api(api_version, kind) + if self.dry_run: + self._logger.warning(f"[DRY RUN] Would have patched {api_version}/{kind} {namespace}/{name}") + return resource.ResourceInstance(self.dyn_client, manifest) + self._logger.info(f"Patching {api_version}/{kind} {namespace}/{name}") + new = await exectools.to_thread( + api.patch, + body=manifest, + namespace=namespace, + content_type="application/merge-patch+json", + ) + new = cast(resource.ResourceInstance, new) + api_version, kind, name, namespace = self._extract_manifest_metadata(new.to_dict()) + self._logger.info(f"Patched {api_version}/{kind} {namespace}/{name}") + return new + + async def _replace(self, manifest: dict): + """ Replace a resource. + + :param manifest: The manifest. + :return: The resource. + """ + api_version, kind, name, namespace = self._extract_manifest_metadata(manifest) + api = await self._get_api(api_version, kind) + if self.dry_run: + self._logger.warning(f"[DRY RUN] Would have replaced {api_version}/{kind} {namespace}/{name}") + return resource.ResourceInstance(self.dyn_client, manifest) + self._logger.info(f"Replacing {api_version}/{kind} {namespace}/{name}") + new = await exectools.to_thread(api.replace, body=manifest, namespace=namespace) + new = cast(resource.ResourceInstance, new) + api_version, kind, name, namespace = self._extract_manifest_metadata(new.to_dict()) + self._logger.info(f"Replaced {api_version}/{kind} {namespace}/{name}") + return new + + async def _delete(self, api_version: str, kind: str, name: str, namespace: str): + """ Delete a resource. + + :param api_version: The API version. + :param kind: The kind. + :param name: The name of the resource. + :param namespace: The namespace of the resource. + """ + api = await self._get_api(api_version, kind) + if self.dry_run: + self._logger.warning(f"[DRY RUN] Would have deleted {api_version}/{kind} {namespace}/{name}") + return + self._logger.info(f"Deleting {api_version}/{kind} {namespace}/{name}") + await exectools.to_thread(api.delete, name=name, namespace=namespace) + self._logger.info(f"Deleted {api_version}/{kind} {namespace}/{name}") + + async def _create_or_patch(self, manifest: dict): + """ Create or patch a resource. + + :param manifest: The manifest. + :return: The resource. + """ + api_version, kind, name, namespace = self._extract_manifest_metadata(manifest) + resource = await self._get(api_version, kind, name, namespace, strict=False) + if not resource: + return await self._create(manifest) + return await self._patch(manifest) + + async def _create_or_replace(self, manifest: dict): + """ Create or replace a resource. + + :param manifest: The manifest. + :return: The resource. + """ + api_version, kind, name, namespace = self._extract_manifest_metadata(manifest) + resource = await self._get(api_version, kind, name, namespace, strict=False) + if not resource: + return await self._create(manifest) + if not manifest.get("metadata", {}).get("resourceVersion"): + # resourceVersion is required for replace + manifest.setdefault("metadata", {})["resourceVersion"] = resource.metadata.resourceVersion + while True: + try: + return await self._replace(manifest) + except exceptions.ConflictError: + # If the resource has changed since we fetched it, retry + resource = await self._get(api_version, kind, name, namespace, strict=False) + manifest["metadata"]["resourceVersion"] = resource.metadata.resourceVersion + continue + + @staticmethod + def _new_application(name: str, display_name: str) -> dict: + obj = { + "apiVersion": "appstudio.redhat.com/v1alpha1", + "kind": "Application", + "metadata": { + "name": name + }, + "spec": { + "displayName": display_name, + "appModelRepository": {"url": ""}, + "gitOpsRepository": {"url": ""}, + } + } + return obj + + async def ensure_application(self, name: str, display_name: str) -> resource.ResourceInstance: + application = self._new_application(name, display_name) + return await self._create_or_patch(application) + + @staticmethod + def _new_component(name: str, application: str, component_name: str, + image_repo: Optional[str], source_url: Optional[str], revision: Optional[str]) -> dict: + obj = { + "apiVersion": "appstudio.redhat.com/v1alpha1", + "kind": "Component", + "metadata": { + "name": name, + "annotations": { + "build.appstudio.openshift.io/pipeline": '{"name":"docker-build-multi-platform-oci-ta","bundle":"latest"}', + "build.appstudio.openshift.io/status": '{"pac":{"state":"disabled"}}', # will raise PRs to upstream repos (openshift-priv) if this is not set to false + # "build.appstudio.openshift.io/request": "configure-pac", + "mintmaker.appstudio.redhat.com/disabled": "true", # https://gitlab.cee.redhat.com/konflux/docs/users/-/blob/main/topics/mintmaker/user.md#offboarding-a-repository + } + }, + "spec": { + "application": application, + "componentName": component_name, + "source": { + "git": { + "url": source_url, + "revision": revision, + } + } + } + } + if image_repo: + obj["spec"]["containerImage"] = image_repo + if source_url: + obj["spec"].setdefault("source", {}).setdefault("git", {})["url"] = source_url + if revision: + obj["spec"].setdefault("source", {}).setdefault("git", {})["revision"] = revision + return obj + + async def ensure_component(self, name: str, application: str, component_name: str, + image_repo: Optional[str], source_url: Optional[str], revision: Optional[str]) -> resource.ResourceInstance: + component = self._new_component(name, application, component_name, image_repo, source_url, revision) + return await self._create_or_replace(component) + + @staticmethod + def _new_pipelinerun_for_image_build(generate_name: str, namespace: Optional[str], application_name: str, component_name: str, + git_url: str, commit_sha: str, target_branch: str, output_image: str, + build_platforms: Sequence[str], git_auth_secret: str = "pipelines-as-code-secret", + additional_tags: Optional[Sequence[str]] = None, skip_checks: bool = False) -> dict: + if additional_tags is None: + additional_tags = [] + https_url = art_util.convert_remote_git_to_https(git_url) + # TODO: In the future the PipelineRun template should be loaded from a remote git repo. + template_content = files("doozerlib").joinpath("backend").joinpath("konflux_image_build_pipelinerun.yaml").read_text() + template = jinja2.Template(template_content, autoescape=True) + rendered = template.render({ + "source_url": https_url, + "revision": commit_sha, + "target_branch": target_branch, + "git_auth_secret": git_auth_secret, + + }) + obj = yaml.load(rendered) + # Those fields in the template are specific to an image. They need to be removed. + del obj["metadata"]["name"] + del obj["metadata"]["annotations"]["pipelinesascode.tekton.dev/on-cel-expression"] + # Override the generated name with the provided one + if generate_name: + obj["metadata"]["generateName"] = generate_name + if namespace: + obj["metadata"]["namespace"] = namespace + else: + del obj["metadata"]["namespace"] + # Set the application and component names + obj["metadata"]["annotations"]["build.appstudio.openshift.io/repo"] = f"{https_url}?rev={commit_sha}" + obj["metadata"]["labels"]["appstudio.openshift.io/application"] = application_name + obj["metadata"]["labels"]["appstudio.openshift.io/component"] = component_name + + def _modify_param(params: List, name: str, value: Union[str, bool, list[str]]): + """ Modify a parameter in the params list. If the parameter does not exist, it is added. + + :param params: The list of parameters. + :param name: The name of the parameter. + :param value: The value of the parameter. + """ + if isinstance(value, bool): + value = "true" if value else "false" # boolean value should be converted to string + for param in params: + if param["name"] == name: + param["value"] = value + return + params.append({"name": name, "value": value}) + + # PipelineRun parameters to override in the template + params = obj["spec"]["params"] + _modify_param(params, "output-image", output_image) + _modify_param(params, "skip-checks", skip_checks) + _modify_param(params, "image-expires-after", "6w") + _modify_param(params, "build-platforms", list(build_platforms)) + + # See https://konflux-ci.dev/docs/how-tos/configuring/customizing-the-build/#configuring-timeouts + obj["spec"]["timeouts"] = {"pipeline": "12h"} + + # Task specific parameters to override in the template + for task in obj["spec"]["pipelineSpec"]["tasks"]: + match task["name"]: + case "build-images": + task["timeout"] = "12h" + case "apply-tags": + _modify_param(task["params"], "ADDITIONAL_TAGS", list(additional_tags)) + + # https://konflux.pages.redhat.com/docs/users/how-tos/configuring/overriding-compute-resources.html + # ose-installer-artifacts fails with OOM with default values, hence bumping memory limit + obj["spec"]["taskRunSpecs"] = [{ + "pipelineTaskName": "build-images", + "stepSpecs": [{ + "name": "sbom-syft-generate", + "computeResources": { + "requests": { + "memory": "5Gi" + }, + "limits": { + "memory": "10Gi" + } + } + }] + }] + + return obj + + async def start_pipeline_run_for_image_build( + self, + generate_name: str, + namespace: Optional[str], + application_name: str, + component_name: str, + git_url: str, + commit_sha: str, + target_branch: str, + output_image: str, + building_arches: Sequence[str], + git_auth_secret: str = "pipelines-as-code-secret", + additional_tags: Sequence[str] = [], + skip_checks: bool = False, + ): + """ + Start a PipelineRun for building an image. + + :param generate_name: The generateName for the PipelineRun. + :param namespace: The namespace for the PipelineRun. + :param application_name: The application name. + :param component_name: The component name. + :param git_url: The git URL. + :param commit_sha: The commit SHA. + :param target_branch: The target branch. + :param output_image: The output image. + :param building_arches: The architectures to build. + :param git_auth_secret: The git auth secret. + :param additional_tags: Additional tags to apply to the image. + :param skip_checks: Whether to skip checks. + :return: The PipelineRun resource. + """ + unsupported_arches = set(building_arches) - set(self.SUPPORTED_ARCHES) + if unsupported_arches: + raise ValueError(f"Unsupported architectures: {unsupported_arches}") + + build_platforms = [self.SUPPORTED_ARCHES[arch] for arch in building_arches] + pipelinerun_manifest = self._new_pipelinerun_for_image_build( + generate_name, + namespace, + application_name, + component_name, + git_url, + commit_sha, + target_branch, + output_image, + build_platforms, + git_auth_secret=git_auth_secret, + skip_checks=skip_checks, + additional_tags=additional_tags, + ) + if self.dry_run: + fake_pipelinerun = resource.ResourceInstance(self.dyn_client, pipelinerun_manifest) + fake_pipelinerun.metadata.name = f"{component_name}-dry-run" + LOGGER.warning(f"[DRY RUN] Would have created PipelineRun: {fake_pipelinerun.metadata.name}") + return fake_pipelinerun + + pipelinerun = await self._create(pipelinerun_manifest, async_req=True) + LOGGER.debug(f"Created PipelineRun: {self.build_pipeline_url(pipelinerun)}") + return pipelinerun + + @staticmethod + def build_pipeline_url(pipelinerun): + """ Returns the URL to the Konflux UI for the given PipelineRun. + + :param pipelinerun: The PipelineRun. + :return: The URL. + """ + pipelinerun_name = pipelinerun['metadata']['name'] + application = pipelinerun['metadata']['labels']['appstudio.openshift.io/application'] + return (f"{constants.KONFLUX_UI_HOST}/application-pipeline/" + f"workspaces/{constants.KONFLUX_UI_DEFAULT_WORKSPACE}/" + f"applications/{application}/" + f"pipelineruns/{pipelinerun_name}") + + async def wait_for_pipelinerun(self, pipelinerun_name: str, namespace: Optional[str] = None): + """ + Wait for a PipelineRun to complete. + + :param pipelinerun_name: The name of the PipelineRun. + :param namespace: The namespace of the PipelineRun. + :return: The PipelineRun. + """ + namespace = namespace or self.default_namespace + if self.dry_run: + await asyncio.sleep(3) + pipelinerun = { + "metadata": {"name": pipelinerun_name, "namespace": namespace}, + "apiVersion": "tekton.dev/v1", + "kind": "PipelineRun", + "status": {"conditions": [{"status": "True"}]} + } + self._logger.warning(f"[DRY RUN] Would have waited for PipelineRun {pipelinerun_name} to complete") + return resource.ResourceInstance(self.dyn_client, pipelinerun) + + api = await self._get_api("tekton.dev/v1", "PipelineRun") + + def _inner(): + watcher = watch.Watch() + status = "Not Found" + while True: + try: + obj = api.get(name=pipelinerun_name, namespace=namespace) + resource_version = obj.metadata.resourceVersion + for event in watcher.stream( + api.get, + resource_version=resource_version, + namespace=namespace, + serialize=False, + field_selector=f"metadata.name={pipelinerun_name}" + ): + assert isinstance(event, Dict) + obj = resource.ResourceInstance(api, event["object"]) + # status takes some time to appear + try: + status = obj.status.conditions[0].status + except AttributeError: + pass + self._logger.info("PipelineRun %s status: %s.", pipelinerun_name, status) + if status not in ["Unknown", "Not Found"]: + watcher.stop() + return obj + except TimeoutError: + self._logger.error("Timeout waiting for PipelineRun %s to complete", pipelinerun_name) + continue + except exceptions.ApiException as e: + if e.status == 410: + # If the last result is too old, an `ApiException` exception will be thrown with + # `code` 410. In that case we have to recover by retrying without resource_version. + self._logger.debug("%s: Resource version is too old. Recovering...", pipelinerun_name) + continue + raise + + return await exectools.to_thread(_inner) diff --git a/doozer/doozerlib/backend/konflux_image_builder.py b/doozer/doozerlib/backend/konflux_image_builder.py index 3f214c3b7..efda97d5c 100644 --- a/doozer/doozerlib/backend/konflux_image_builder.py +++ b/doozer/doozerlib/backend/konflux_image_builder.py @@ -1,33 +1,28 @@ import asyncio +import json import logging import os -import json -from datetime import datetime, timezone from dataclasses import dataclass +from datetime import datetime, timezone from pathlib import Path -from typing import Dict, Optional, Sequence, cast +from typing import Optional, cast -import jinja2 from artcommonlib import exectools -from artcommonlib import util as art_util +from artcommonlib.arch_util import go_arch_for_brew_arch +from artcommonlib.exectools import limit_concurrency +from artcommonlib.konflux.konflux_build_record import (ArtifactType, Engine, + KonfluxBuildOutcome, + KonfluxBuildRecord) +from artcommonlib.release_util import isolate_el_version_in_release from dockerfile_parse import DockerfileParser -from importlib_resources import files -from kubernetes import client, config, watch -from kubernetes.client import Configuration -from kubernetes.dynamic import DynamicClient, exceptions, resource -from ruamel.yaml import YAML +from kubernetes.dynamic import resource -from artcommonlib.exectools import limit_concurrency from doozerlib import constants from doozerlib.backend.build_repo import BuildRepo +from doozerlib.backend.konflux_client import KonfluxClient from doozerlib.image import ImageMetadata from doozerlib.source_resolver import SourceResolution -from artcommonlib.arch_util import go_arch_for_brew_arch -from artcommonlib.release_util import isolate_assembly_in_release, isolate_el_version_in_release -from artcommonlib.konflux.konflux_build_record import KonfluxBuildRecord, ArtifactType, Engine, KonfluxBuildOutcome - -yaml = YAML(typ="safe") LOGGER = logging.getLogger(__name__) @@ -43,9 +38,9 @@ class KonfluxImageBuilderConfig: """ Options for the KonfluxImageBuilder class. """ base_dir: Path group_name: str + namespace: str kubeconfig: Optional[str] = None context: Optional[str] = None - namespace: Optional[str] = None image_repo: str = constants.KONFLUX_DEFAULT_IMAGE_REPO skip_checks: bool = False dry_run: bool = False @@ -66,21 +61,23 @@ class KonfluxImageBuilder: def __init__(self, config: KonfluxImageBuilderConfig, logger: Optional[logging.Logger] = None) -> None: self._config = config self._logger = logger or LOGGER + self._konflux_client = KonfluxClient.from_kubeconfig(config.namespace, config.kubeconfig, config.context, config.dry_run) @limit_concurrency(limit=constants.MAX_KONFLUX_BUILD_QUEUE_SIZE) async def build(self, metadata: ImageMetadata): """ Build a container image with Konflux. """ + logger = self._logger.getChild(f"[{metadata.distgit_key}]") metadata.build_status = False try: dest_dir = self._config.base_dir.joinpath(metadata.qualified_key) if dest_dir.exists(): # Load exiting build source repository - build_repo = await BuildRepo.from_local_dir(dest_dir, self._logger) + build_repo = await BuildRepo.from_local_dir(dest_dir, logger) else: # Clone the build source repository source = None if metadata.has_source(): - self._logger.info(f"Resolving source for {metadata.qualified_key}") + logger.info(f"Resolving source for {metadata.qualified_key}") source = cast(SourceResolution, await exectools.to_thread(metadata.runtime.source_resolver.resolve_source, metadata)) else: raise IOError(f"Image {metadata.qualified_key} doesn't have upstream source. This is no longer supported.") @@ -89,14 +86,9 @@ async def build(self, metadata: ImageMetadata): "assembly_name": metadata.runtime.assembly, "distgit_key": metadata.distgit_key }) - build_repo = BuildRepo(url=source.url, branch=dest_branch, local_dir=dest_dir, logger=self._logger) + build_repo = BuildRepo(url=source.url, branch=dest_branch, local_dir=dest_dir, logger=logger) await build_repo.ensure_source() - # Load the Kubernetes configuration - cfg = Configuration() - config.load_kube_config(config_file=self._config.kubeconfig, context=self._config.context, - persist_config=False, client_configuration=cfg) - # Wait for parent members to be built parent_members = await self._wait_for_parent_members(metadata) failed_parents = [parent_member.distgit_key for parent_member in parent_members if parent_member is not None and not parent_member.build_status] @@ -110,39 +102,35 @@ async def build(self, metadata: ImageMetadata): if unsupported_arches: # TODO: Update once we are on the new cluster # raise ValueError(f"[{metadata.distgit_key}] Unsupported arches: {', '.join(unsupported_arches)}") - self._logger.warning(f"[{metadata.distgit_key}] Skipping arches: {', '.join(unsupported_arches)}") + logger.warning(f"Skipping arches: {', '.join(unsupported_arches)}") # Start the build - self._logger.info("Starting Konflux image build for %s...", metadata.distgit_key) + logger.info("Starting Konflux image build for %s...", metadata.distgit_key) retries = 3 error = None for attempt in range(retries): - self._logger.info("[%s] Build attempt %s/%s", metadata.distgit_key, attempt + 1, retries) - with client.ApiClient(configuration=cfg) as api_client: - dyn_client = DynamicClient(api_client) - - pipelinerun = await self._start_build(metadata, build_repo, dyn_client, building_arches) - await self.update_konflux_db(metadata, build_repo, pipelinerun, KonfluxBuildOutcome.PENDING, building_arches) - - pipelinerun_name = pipelinerun['metadata']['name'] - self._logger.info("[%s] Waiting for PipelineRun %s to complete...", metadata.distgit_key, pipelinerun_name) - pipelinerun = await self._wait_for_pipelinerun(dyn_client, pipelinerun_name) - self._logger.info("[%s] PipelineRun %s completed", metadata.distgit_key, pipelinerun_name) - - status = pipelinerun.status.conditions[0].status - outcome = KonfluxBuildOutcome.SUCCESS if status == "True" else KonfluxBuildOutcome.FAILURE - if self._config.dry_run: - self._logger.info("[%s] Dry run: Would have inserted build record in Konflux DB", - metadata.distgit_key) - else: - await self.update_konflux_db(metadata, build_repo, pipelinerun, outcome, building_arches) - - if status != "True": - error = KonfluxImageBuildError(f"Konflux image build for {metadata.distgit_key} failed", - pipelinerun_name, pipelinerun) - else: - metadata.build_status = True - break + logger.info("Build attempt %s/%s", attempt + 1, retries) + pipelinerun = await self._start_build(metadata, build_repo, building_arches) + await self.update_konflux_db(metadata, build_repo, pipelinerun, KonfluxBuildOutcome.PENDING, building_arches) + + pipelinerun_name = pipelinerun['metadata']['name'] + logger.info("Waiting for PipelineRun %s to complete...", pipelinerun_name) + pipelinerun = await self._konflux_client.wait_for_pipelinerun(pipelinerun_name, namespace=self._config.namespace) + logger.info("PipelineRun %s completed", pipelinerun_name) + + status = pipelinerun.status.conditions[0].status + outcome = KonfluxBuildOutcome.SUCCESS if status == "True" else KonfluxBuildOutcome.FAILURE + if self._config.dry_run: + logger.info("Dry run: Would have inserted build record in Konflux DB") + else: + await self.update_konflux_db(metadata, build_repo, pipelinerun, outcome, building_arches) + + if status != "True": + error = KonfluxImageBuildError(f"Konflux image build for {metadata.distgit_key} failed", + pipelinerun_name, pipelinerun) + else: + metadata.build_status = True + break if not metadata.build_status and error: raise error finally: @@ -151,22 +139,25 @@ async def build(self, metadata: ImageMetadata): async def _wait_for_parent_members(self, metadata: ImageMetadata): # If this image is FROM another group member, we need to wait on that group member to be built + logger = self._logger.getChild(f"[{metadata.distgit_key}]") parent_members = list(metadata.get_parent_members().values()) for parent_member in parent_members: if parent_member is None: continue # Parent member is not included in the group; no need to wait - self._logger.info("[%s] Parent image %s is building; waiting...", metadata.distgit_key, - parent_member.distgit_key) + logger.info("Parent image %s is building; waiting...", parent_member.distgit_key) # wait for parent member to be built while not parent_member.build_event.is_set(): # asyncio.sleep instead of Event.wait since it's less CPU intensive await asyncio.sleep(20) # check every 20 seconds return parent_members - async def _start_build(self, metadata: ImageMetadata, build_repo: BuildRepo, dyn_client: DynamicClient, building_arches: list): + async def _start_build(self, metadata: ImageMetadata, build_repo: BuildRepo, building_arches: list): + logger = self._logger.getChild(f"[{metadata.distgit_key}]") + if not build_repo.commit_hash: + raise IOError(f"The build branch {build_repo.branch} doesn't have any commits in the build repository {build_repo.https_url}") + + git_branch = build_repo.branch or build_repo.commit_hash git_url = build_repo.https_url - git_branch = build_repo.branch - assert build_repo.commit_hash is not None, f"[{metadata.distgit_key}] git_commit is required for Konflux image build" git_commit = build_repo.commit_hash df_path = build_repo.local_dir.joinpath("Dockerfile") @@ -176,9 +167,8 @@ async def _start_build(self, metadata: ImageMetadata, build_repo: BuildRepo, dyn # Ensure the Application resource exists app_name = self._config.group_name.replace(".", "-") - app_manifest = self._new_application(app_name, app_name) - app = await self._create_or_patch(dyn_client, app_manifest) - self._logger.info(f"[%s] Using application: {app['metadata']['name']}", metadata.distgit_key) + logger.info(f"Using application: {app_name}") + await self._konflux_client.ensure_application(name=app_name, display_name=app_name) # Ensure the component resource exists # Openshift doesn't allow dots or underscores in any of its fields, so we replace them with dashes @@ -198,60 +188,36 @@ async def _start_build(self, metadata: ImageMetadata, build_repo: BuildRepo, dyn f"{metadata.image_name_short}-{version}-{release}" ] default_revision = f"art-{self._config.group_name}-assembly-test-dgk-{metadata.distgit_key}" - - component_manifest = self._new_component( - component_name, - app_name, - component_name, - dest_image_repo, - git_url, - default_revision, + logger.info(f"Using component: {component_name}") + await self._konflux_client.ensure_component( + name=component_name, + application=app_name, + component_name=component_name, + image_repo=dest_image_repo, + source_url=git_url, + revision=default_revision, ) - component = await self._create_or_patch(dyn_client, component_manifest) - self._logger.info(f"[%s] Using component: {component['metadata']['name']}", metadata.distgit_key) - - # Create a PipelineRun - build_platforms = [self.SUPPORTED_ARCHES[arch] for arch in building_arches] - pipelineruns_api = await self._get_pipelinerun_api(dyn_client) - - pipelinerun_manifest = self._new_pipelinerun( - f"{component_name}-", # generate name needs a trailing dash - app_name, - component_name, - git_url, - git_commit, - git_branch, - f"{dest_image_repo}:{dest_image_tag}", - build_platforms, - skip_checks=self._config.skip_checks, - additional_tags=additional_tags, - ) - - if self._config.dry_run: - pipelinerun_manifest = resource.ResourceInstance(dyn_client, pipelinerun_manifest) - pipelinerun_manifest.metadata.name = f"{component_name}-dry-run" - self._logger.warning(f"[DRY RUN] [%s] Would have created PipelineRun: {pipelinerun_manifest.metadata.name}", metadata.distgit_key) - return pipelinerun_manifest - pipelinerun = await exectools.to_thread( - pipelineruns_api.create, + # Start a PipelineRun + pipelinerun = await self._konflux_client.start_pipeline_run_for_image_build( + generate_name=f"{component_name}-", namespace=self._config.namespace, - body=pipelinerun_manifest, - async_req=True, + application_name=app_name, + component_name=component_name, + git_url=git_url, + commit_sha=git_commit, + target_branch=git_branch, + output_image=f"{dest_image_repo}:{dest_image_tag}", + building_arches=building_arches, + additional_tags=additional_tags, + skip_checks=self._config.skip_checks, ) - pipelinerun = cast(resource.ResourceInstance, pipelinerun) - self._logger.info(f"[%s] Created PipelineRun: {self.build_pipeline_url(pipelinerun)}", metadata.distgit_key) + logger.info(f"Created PipelineRun: {self.build_pipeline_url(pipelinerun)}") return pipelinerun - @staticmethod - def build_pipeline_url(pipelinerun): - pipelinerun_name = pipelinerun['metadata']['name'] - application = pipelinerun['metadata']['labels']['appstudio.openshift.io/application'] - return (f"{constants.KONFLUX_UI_HOST}/application-pipeline/" - f"workspaces/{constants.KONFLUX_UI_DEFAULT_WORKSPACE}/" - f"applications/{application}/" - f"pipelineruns/{pipelinerun_name}") + def build_pipeline_url(self, pipelinerun): + return self._konflux_client.build_pipeline_url(pipelinerun) @staticmethod async def get_installed_packages(image_pullspec, arches) -> list: @@ -290,9 +256,9 @@ async def _get_for_arch(arch): return sorted(installed_packages) async def update_konflux_db(self, metadata, build_repo, pipelinerun, outcome, building_arches): + logger = self._logger.getChild(f"[{metadata.distgit_key}]") if not metadata.runtime.konflux_db: - self._logger.warning('Konflux DB connection is not initialized, not writing build record to the Konflux ' - 'DB.') + logger.warning('Konflux DB connection is not initialized, not writing build record to the Konflux DB.') return try: @@ -375,234 +341,7 @@ async def update_konflux_db(self, metadata, build_repo, pipelinerun, outcome, bu build_record = KonfluxBuildRecord(**build_record_params) metadata.runtime.konflux_db.add_build(build_record) - self._logger.info(f'[{metadata.distgit_key}] Konflux build info stored successfully with status {outcome}') + logger.info(f'Konflux build info stored successfully with status {outcome}') except Exception as err: - self._logger.error('Failed writing record to the konflux DB: %s', err) - - @staticmethod - def _new_application(name: str, display_name: str) -> dict: - obj = { - "apiVersion": "appstudio.redhat.com/v1alpha1", - "kind": "Application", - "metadata": { - "name": name - }, - "spec": { - "displayName": display_name, - "appModelRepository": {"url": ""}, - "gitOpsRepository": {"url": ""}, - } - } - return obj - - @staticmethod - async def _get_pipelinerun_api(dyn_client: DynamicClient): - return await exectools.to_thread( - dyn_client.resources.get, - api_version="tekton.dev/v1", - kind="PipelineRun", - ) - - @staticmethod - def _new_component(name: str, application: str, component_name: str, - image_repo: Optional[str], source_url: Optional[str], revision: Optional[str]) -> dict: - obj = { - "apiVersion": "appstudio.redhat.com/v1alpha1", - "kind": "Component", - "metadata": { - "name": name, - "annotations": { - "build.appstudio.openshift.io/pipeline": '{"name":"docker-build-multi-platform-oci-ta","bundle":"latest"}', - "build.appstudio.openshift.io/status": '{"pac":{"state":"disabled"}}', # will raise PRs to upstream repos (openshift-priv) if this is not set to false - # "build.appstudio.openshift.io/request": "configure-pac", - "mintmaker.appstudio.redhat.com/disabled": "true", # https://gitlab.cee.redhat.com/konflux/docs/users/-/blob/main/topics/mintmaker/user.md#offboarding-a-repository - } - }, - "spec": { - "application": application, - "componentName": component_name, - "source": { - "git": { - "url": source_url, - "revision": revision, - } - } - } - } - if image_repo: - obj["spec"]["containerImage"] = image_repo - if source_url: - obj["spec"].setdefault("source", {}).setdefault("git", {})["url"] = source_url - if revision: - obj["spec"].setdefault("source", {}).setdefault("git", {})["revision"] = revision - return obj - - @staticmethod - def _new_pipelinerun(generate_name: str, application_name: str, component_name: str, - git_url: str, commit_sha: str, target_branch: str, output_image: str, - build_platforms: Sequence[str], git_auth_secret: str = "pipelines-as-code-secret", - additional_tags: Sequence[str] = [], skip_checks: bool = False) -> dict: - https_url = art_util.convert_remote_git_to_https(git_url) - # TODO: In the future the PipelineRun template should be loaded from a remote git repo. - template_content = files("doozerlib").joinpath("backend").joinpath("konflux_image_build_pipelinerun.yaml").read_text() - template = jinja2.Template(template_content, autoescape=True) - rendered = template.render({ - "source_url": https_url, - "revision": commit_sha, - "target_branch": target_branch, - "git_auth_secret": git_auth_secret, - - }) - obj = yaml.load(rendered) - # Those fields in the template are specific to an image. They need to be removed. - del obj["metadata"]["name"] - del obj["metadata"]["namespace"] - del obj["metadata"]["annotations"]["pipelinesascode.tekton.dev/on-cel-expression"] - # Override the generated name with the provided one - if generate_name: - obj["metadata"]["generateName"] = generate_name - # Set the application and component names - obj["metadata"]["annotations"]["build.appstudio.openshift.io/repo"] = f"{https_url}?rev={commit_sha}" - obj["metadata"]["labels"]["appstudio.openshift.io/application"] = application_name - obj["metadata"]["labels"]["appstudio.openshift.io/component"] = component_name - - skip_checks_flag = False # Flag to insert the skip-checks param if it's missing - skip_checks_value = "true" if skip_checks else "false" # value type in konflux is string - image_expires_after_flag = False # Flag to insert the image-expires-after param if it's missing - image_expires_after_value = "6w" # Keep images for 6 weeks - for param in obj["spec"]["params"]: - if param["name"] == "output-image": - param["value"] = output_image - if param["name"] == "skip-checks": - param["value"] = skip_checks_value - skip_checks_flag = True - if param["name"] == "image-expires-after": - param["value"] = image_expires_after_value - skip_checks_flag = True - - if not skip_checks_flag: - obj["spec"]["params"].append({"name": "skip-checks", "value": skip_checks_value}) - - if not image_expires_after_flag: - obj["spec"]["params"].append({"name": "image-expires-after", "value": image_expires_after_value}) - - # See https://konflux-ci.dev/docs/how-tos/configuring/customizing-the-build/#configuring-timeouts - obj["spec"]["timeouts"] = {"pipeline": "12h"} - - for task in obj["spec"]["pipelineSpec"]["tasks"]: - if task["name"] == "build-images": - task["timeout"] = "12h" - elif task["name"] == "apply-tags": - task["params"].append({"name": "ADDITIONAL_TAGS", "value": list(additional_tags)}) - - obj["spec"]["params"].append({"name": "build-platforms", "value": list(build_platforms)}) - - # https://konflux.pages.redhat.com/docs/users/how-tos/configuring/overriding-compute-resources.html - # ose-installer-artifacts fails with OOM with default values, hence bumping memory limit - obj["spec"]["taskRunSpecs"] = [{ - "pipelineTaskName": "build-images", - "stepSpecs": [{ - "name": "sbom-syft-generate", - "computeResources": { - "requests": { - "memory": "5Gi" - }, - "limits": { - "memory": "10Gi" - } - } - }] - }] - - return obj - - async def _create_or_patch(self, dyn_client: DynamicClient, manifest: dict): - name = manifest["metadata"]["name"] - namespace = manifest["metadata"].get("namespace", self._config.namespace) - api_version = manifest["apiVersion"] - kind = manifest["kind"] - api = await exectools.to_thread( - dyn_client.resources.get, - api_version=api_version, - kind=kind, - ) - found = True - try: - await exectools.to_thread(api.get, name=name, namespace=namespace) - except exceptions.NotFoundError: - found = False - if self._config.dry_run: - if found: - self._logger.warning(f"[DRY RUN] Would have patched {api_version}/{kind} {namespace}/{name}") - else: - self._logger.warning(f"[DRY RUN] Would have created {api_version}/{kind} {namespace}/{name}") - return resource.ResourceInstance(dyn_client, manifest) - if found: - self._logger.info(f"Patching {api_version}/{kind} {namespace}/{name}") - new = await exectools.to_thread( - api.patch, - body=manifest, - namespace=namespace, - content_type="application/merge-patch+json", - ) - else: - self._logger.info(f"Creating {api_version}/{kind} {namespace}/{name}") - new = await exectools.to_thread( - api.create, - namespace=namespace, - body=manifest, - ) - new = cast(resource.ResourceInstance, new) - return new - - async def _wait_for_pipelinerun(self, dyn_client: DynamicClient, pipelinerun_name: str): - if self._config.dry_run: - await asyncio.sleep(3) - pipelinerun = { - "metadata": {"name": pipelinerun_name, "namespace": self._config.namespace}, - "apiVersion": "tekton.dev/v1", - "kind": "PipelineRun", - "status": {"conditions": [{"status": "True"}]} - } - self._logger.warning(f"[DRY RUN] Would have waited for PipelineRun {pipelinerun_name} to complete") - return resource.ResourceInstance(dyn_client, pipelinerun) - - api = await self._get_pipelinerun_api(dyn_client) - - def _inner(): - watcher = watch.Watch() - status = "Not Found" - while True: - try: - obj = api.get(name=pipelinerun_name, namespace=self._config.namespace) - resource_version = obj.metadata.resourceVersion - for event in watcher.stream( - api.get, - resource_version=resource_version, - namespace=self._config.namespace, - serialize=False, - field_selector=f"metadata.name={pipelinerun_name}" - ): - assert isinstance(event, Dict) - obj = resource.ResourceInstance(api, event["object"]) - # status takes some time to appear - try: - status = obj.status.conditions[0].status - except AttributeError: - pass - self._logger.info("PipelineRun %s status: %s.", pipelinerun_name, status) - if status not in ["Unknown", "Not Found"]: - return obj - except TimeoutError: - self._logger.error("Timeout waiting for PipelineRun %s to complete", pipelinerun_name) - continue - except exceptions.ApiException as e: - if e.status == 410: - # If the last result is too old, an `ApiException` exception will be thrown with - # `code` 410. In that case we have to recover by retrying without resource_version. - self._logger.debug("%s: Resource version %s is too old. Recovering...", pipelinerun_name, resource_version) - continue - raise - - return await exectools.to_thread(_inner) + logger.error('Failed writing record to the konflux DB: %s', err) diff --git a/doozer/doozerlib/cli/images_konflux.py b/doozer/doozerlib/cli/images_konflux.py index d263f92eb..62aa3c02b 100644 --- a/doozer/doozerlib/cli/images_konflux.py +++ b/doozer/doozerlib/cli/images_konflux.py @@ -124,7 +124,7 @@ def __init__( runtime: Runtime, konflux_kubeconfig: Optional[str], konflux_context: Optional[str], - konflux_namespace: Optional[str], + konflux_namespace: str, image_repo: str, skip_checks: bool, dry_run: bool, @@ -182,7 +182,7 @@ async def run(self): @click_coroutine async def images_konflux_build( runtime: Runtime, konflux_kubeconfig: Optional[str], konflux_context: Optional[str], - konflux_namespace: Optional[str], image_repo: str, skip_checks: bool, dry_run: bool): + konflux_namespace: str, image_repo: str, skip_checks: bool, dry_run: bool): cli = KonfluxBuildCli( runtime=runtime, konflux_kubeconfig=konflux_kubeconfig, konflux_context=konflux_context, konflux_namespace=konflux_namespace,