diff --git a/Dockerfile b/Dockerfile index e661e216..04f2ee97 100644 --- a/Dockerfile +++ b/Dockerfile @@ -54,8 +54,9 @@ RUN cd / && pip install --no-cache /app ENV PYTHONPATH /app RUN chown -R app:app /app -# Create repos directory for transplanting in landing-worker +# Create repos directory for landing-worker and revision worker. RUN mkdir /repos +RUN chown -R app:app /repos # Run as a non-privileged user USER app diff --git a/Dockerfile-dev b/Dockerfile-dev index 5d8a05d6..1a227328 100644 --- a/Dockerfile-dev +++ b/Dockerfile-dev @@ -20,6 +20,7 @@ ENV PYTHONUNBUFFERED=1 ENV FLASK_RUN_PORT=9000 ENV FLASK_RUN_HOST=0.0.0.0 ENV FLASK_DEBUG=1 +ENV HTTP_ALLOWED=1 ENTRYPOINT ["lando-cli"] CMD ["run"] @@ -48,6 +49,7 @@ RUN cd / && pip install --no-cache /app ENV PYTHONPATH /app RUN chown -R app:app /app +# Create repos directory for landing worker and revision worker. RUN mkdir /repos RUN chown -R app:app /repos diff --git a/docker-compose.yml b/docker-compose.yml index a12f6b4d..9a40d0d3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -111,7 +111,7 @@ services: - smtp lando-api.landing-worker: image: lando-api - command: ["landing-worker"] + command: ["start-landing-worker"] environment: - ENV=localdev - DATABASE_URL=postgresql://postgres:password@lando-api.db/lando_api_dev diff --git a/landoapi/api/landing_jobs.py b/landoapi/api/landing_jobs.py index f2896990..dd671f89 100644 --- a/landoapi/api/landing_jobs.py +++ b/landoapi/api/landing_jobs.py @@ -63,6 +63,9 @@ def put(landing_job_id: str, data: dict): if landing_job.status in (LandingJobStatus.SUBMITTED, LandingJobStatus.DEFERRED): landing_job.transition_status(LandingJobAction.CANCEL) + for revision in landing_job.revisions: + # Unlock patches so they can be modified in the future. + revision.patch_locked = False db.session.commit() return {"id": landing_job.id}, 200 else: diff --git a/landoapi/api/transplants.py b/landoapi/api/transplants.py index aa40bdf6..983084b0 100644 --- a/landoapi/api/transplants.py +++ b/landoapi/api/transplants.py @@ -364,7 +364,7 @@ def post(phab: PhabricatorClient, data: dict): } raw_diff = phab.call_conduit("differential.getrawdiff", diffID=diff["id"]) - lando_revision.set_patch(raw_diff, patch_data) + lando_revision.set_patch(raw_diff, patch_data, final=True) db.session.commit() lando_revisions.append(lando_revision) @@ -445,11 +445,9 @@ def get_list(phab: PhabricatorClient, stack_revision_id: str): limit=len(revision_phids), ) - # Return both transplants and landing jobs, since for repos that were switched - # both or either of these could be populated. - rev_ids = [phab.expect(r, "id") for r in phab.expect(revs, "data")] + # Find landing jobs based on related revisions or legacy revision_to_diff_id field. landing_jobs = LandingJob.revisions_query(rev_ids).all() return [job.serialize() for job in landing_jobs], 200 diff --git a/landoapi/cli.py b/landoapi/cli.py index 5065b3a5..51afb545 100644 --- a/landoapi/cli.py +++ b/landoapi/cli.py @@ -65,20 +65,64 @@ def worker(celery_arguments): celery.worker_main((sys.argv[0],) + celery_arguments) -@cli.command(name="landing-worker") -def landing_worker(): +@cli.command(name="start-landing-worker") +def start_landing_worker(): from landoapi.app import auth0_subsystem, lando_ui_subsystem + from landoapi.workers.landing_worker import LandingWorker exclusions = [auth0_subsystem, lando_ui_subsystem] for system in get_subsystems(exclude=exclusions): system.ensure_ready() + worker = LandingWorker() + ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "0") + worker.start() + + +@cli.command(name="stop-landing-worker") +def stop_landing_worker(): + from landoapi.storage import db_subsystem from landoapi.workers.landing_worker import LandingWorker + db_subsystem.ensure_ready() worker = LandingWorker() + ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "1") + + +@cli.command(name="start-revision-worker") +@click.argument("role") +def start_revision_worker(role): + from landoapi.app import auth0_subsystem, lando_ui_subsystem + from landoapi.workers.revision_worker import Processor, Supervisor + + roles = { + "processor": Processor, + "supervisor": Supervisor, + } + + if role not in roles: + raise ValueError(f"Unknown worker role specified ({role}).") + + exclusions = [auth0_subsystem, lando_ui_subsystem] + for system in get_subsystems(exclude=exclusions): + system.ensure_ready() + + worker = roles[role]() + ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "0") worker.start() +@cli.command(name="stop-revision-worker") +def stop_revision_worker(): + """Stops all revision workers (supervisor and processors).""" + from landoapi.storage import db_subsystem + from landoapi.workers.revision_worker import RevisionWorker + + db_subsystem.ensure_ready() + worker = RevisionWorker() + ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "1") + + @cli.command(name="run-pre-deploy-sequence") def run_pre_deploy_sequence(): """Runs the sequence of commands required before a deployment.""" diff --git a/landoapi/hg.py b/landoapi/hg.py index c55c530f..16da1c65 100644 --- a/landoapi/hg.py +++ b/landoapi/hg.py @@ -649,3 +649,13 @@ def read_checkout_file(self, path: str) -> str: with checkout_file_path.open() as f: return f.read() + + def has_incoming(self, source: str) -> bool: + """Check if there are any incoming changes from the remote repo.""" + try: + self.run_hg(["incoming", source, "--limit", "1"]) + except hglib.error.CommandError as e: + if b"no changes found" not in e.out: + logger.error(e) + return False + return True diff --git a/landoapi/models/configuration.py b/landoapi/models/configuration.py index a9c5de14..babd8f84 100644 --- a/landoapi/models/configuration.py +++ b/landoapi/models/configuration.py @@ -23,6 +23,9 @@ class ConfigurationKey(enum.Enum): LANDING_WORKER_PAUSED = "LANDING_WORKER_PAUSED" LANDING_WORKER_STOPPED = "LANDING_WORKER_STOPPED" + REVISION_WORKER_PAUSED = "REVISION_WORKER_PAUSED" + REVISION_WORKER_STOPPED = "REVISION_WORKER_STOPPED" + REVISION_WORKER_CAPACITY = "REVISION_WORKER_CAPACITY" API_IN_MAINTENANCE = "API_IN_MAINTENANCE" WORKER_THROTTLE_SECONDS = "WORKER_THROTTLE_SECONDS" diff --git a/landoapi/models/landing_job.py b/landoapi/models/landing_job.py index 3472251d..92df3b89 100644 --- a/landoapi/models/landing_job.py +++ b/landoapi/models/landing_job.py @@ -17,7 +17,7 @@ from sqlalchemy.dialects.postgresql.json import JSONB from landoapi.models.base import Base -from landoapi.models.revisions import Revision, revision_landing_job +from landoapi.models.revisions import Revision, RevisionStatus, revision_landing_job from landoapi.storage import db logger = logging.getLogger(__name__) @@ -36,7 +36,7 @@ class LandingJobStatus(enum.Enum): column of `LandingJob`. """ - # Initial creation state. + # Ready to be picked up state. SUBMITTED = "SUBMITTED" # Actively being processed. @@ -272,6 +272,14 @@ def set_landed_revision_diffs(self): .values(diff_id=revision.diff_id) ) + def has_non_ready_revisions(self) -> bool: + """Return whether any of the revisions are in a non-ready state or not.""" + return bool( + {r.status for r in self.revisions}.intersection( + RevisionStatus.NON_READY_STATES + ) + ) + def transition_status( self, action: LandingJobAction, @@ -321,21 +329,42 @@ def transition_status( self.status = actions[action]["status"] + if action == LandingJobAction.CANCEL: + self.ready_revisions() + if action in (LandingJobAction.FAIL, LandingJobAction.DEFER): self.error = kwargs["message"] + self.fail_revisions() if action == LandingJobAction.LAND: self.landed_commit_id = kwargs["commit_id"] + self.land_revisions() if commit: db.session.commit() + def fail_revisions(self): + """Mark all revisions in landing jobs as failed.""" + for revision in self.revisions: + revision.fail() + + def land_revisions(self): + """Mark all revisions in landing jobs as landed.""" + for revision in self.revisions: + revision.land() + + def ready_revisions(self): + """Mark all revisions in landing jobs as ready.""" + for revision in self.revisions: + revision.ready() + def serialize(self) -> dict[str, Any]: """Return a JSON compatible dictionary.""" return { "id": self.id, "status": self.status.value, "landing_path": self.serialized_landing_path, + "duration_seconds": self.duration_seconds, "error_breakdown": self.error_breakdown, "details": ( self.error or self.landed_commit_id diff --git a/landoapi/models/revisions.py b/landoapi/models/revisions.py index 9e73d85c..7da93673 100644 --- a/landoapi/models/revisions.py +++ b/landoapi/models/revisions.py @@ -3,10 +3,7 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. """ -This module provides the definitions for custom revision/diff warnings. - -The `DiffWarning` model provides a warning that is associated with a particular -Phabricator diff that is associated with a particular revision. +This module provides the definitions for revisions and custom revision/diff warnings. """ from __future__ import annotations @@ -15,10 +12,12 @@ import logging from typing import Any +import networkx as nx from sqlalchemy.dialects.postgresql.json import JSONB from landoapi.hgexports import build_patch_for_revision from landoapi.models.base import Base +from landoapi.phabricator import call_conduit from landoapi.storage import db logger = logging.getLogger(__name__) @@ -47,6 +46,49 @@ class DiffWarningGroup(enum.Enum): ) +@enum.unique +class RevisionStatus(enum.Enum): + # New means this revision was just created. + NEW = "NEW" + + # Stale means something changed upstream and we need to re-check this revision. + STALE = "STALE" + + # Waiting means it can be picked up by the revision worker. + WAITING = "WAITING" + + # Picked up means a revision worker has picked this up. This signals to other + # workers to not pick up this particular revision. This is really just an + # "in between" state. + PICKED_UP = "PICKED_UP" + + # Checking means it is currently running through various checks. + CHECKING = "CHECKING" + + # Problem means something went wrong in some of the checks. + PROBLEM = "PROBLEM" + + # Ready means revision worker is finished and this revision can be queued to land. + READY = "READY" + + # Below four statuses describe the landing state. + QUEUED = "QUEUED" # LandingJob has been submitted + LANDING = "LANDING" # LandingWorker is processing job + LANDED = "LANDED" # LandingWorker is finished processing job + FAILED = "FAILED" # LandingWorker could not land job + + @classmethod + @property + def LANDING_STATES(cls): + """States where the revision is in process of landing.""" + return (cls.QUEUED, cls.LANDING, cls.LANDED) + + @classmethod + @property + def NON_READY_STATES(cls): + return (cls.NEW, cls.STALE, cls.WAITING, cls.CHECKING) + + class Revision(Base): """ A representation of a revision in the database referencing a Phabricator revision. @@ -61,6 +103,7 @@ class Revision(Base): # The actual patch. patch_bytes = db.Column(db.LargeBinary, nullable=False, default=b"") + patch_locked = db.Column(db.Boolean, nullable=False, default=False) # Patch metadata, such as author, timestamp, etc... patch_data = db.Column(JSONB, nullable=False, default=dict) @@ -69,32 +112,176 @@ class Revision(Base): "LandingJob", secondary=revision_landing_job, back_populates="revisions" ) + status = db.Column( + db.Enum(RevisionStatus), nullable=False, default=RevisionStatus.NEW + ) + + # short name and callsign + repo_name = db.Column(db.String(254), nullable=False, default="") + repo_callsign = db.Column(db.String(254), nullable=False, default="") + + data = db.Column(JSONB, nullable=False, default=dict) + + stack_graph = db.Column(JSONB, nullable=False, default=dict) + def __repr__(self): """Return a human-readable representation of the instance.""" return ( f"<{self.__class__.__name__}: {self.id} " - f"[D{self.revision_id}-{self.diff_id}]>" + f"[D{self.revision_id}-{self.diff_id}] " + f"[{self.status.value if self.status else ''}]>" ) @classmethod - def get_from_revision_id(cls, revision_id: int) -> "Revision" | None: + def get_from_revision_id(cls, revision_id: int) -> "Revision": """Return a Revision object from a given ID.""" - return cls.query.filter(Revision.revision_id == revision_id).one_or_none() + return cls.query.filter(Revision.revision_id == revision_id).one() - def set_patch(self, raw_diff: bytes, patch_data: dict[str, str]): + def set_patch(self, raw_diff: bytes, patch_data: dict[str, str], final=False): """Given a raw_diff and patch data, build the patch and store it.""" + if self.patch_locked: + raise ValueError("Patch can not be modified.") + self.patch_data = patch_data patch = build_patch_for_revision(raw_diff, **self.patch_data) self.patch_bytes = patch.encode("utf-8") + if final: + self.patch_locked = True + db.session.commit() + + def set_temporary_patch(self) -> str: + """ + Fetch the most up to date patch to be pre-processed. + + Fill in placeholder patch data if it is not available. + """ + raw_diff = call_conduit("differential.getrawdiff", diffID=self.diff_id) + patch_data = { + "author_name": "", + "author_email": "", + "commit_message": "This is an automated commit message.", + "timestamp": 0, + } + self.set_patch(raw_diff, patch_data, final=False) + + @property + def stack(self): + stack_graph = { + Revision.get_from_revision_id(source): [ + Revision.get_from_revision_id(dest) for dest in dests + ] + for source, dests in self.stack_graph.items() + } + + return nx.DiGraph(stack_graph).reverse() + + @property + def successor(self): + """Return a successor if there is only one, otherwise return None.""" + successors = self.stack.successors(self) + if len(successors) == 1: + return list(successors)[0] + + @property + def predecessor(self): + """Return a predecessor if there is only one, otherwise return None.""" + predecessors = list(self.stack.predecessors(self)) + if len(predecessors) == 1: + return predecessors[0] + + @property + def successors(self): + """Return the current revision and all successors.""" + successors = nx.nodes(nx.dfs_tree(self.stack, self)) + return list(successors.keys()) + + @property + def predecessors(self): + """Return all predecessors without current revision.""" + predecessors = list(nx.nodes(nx.dfs_tree(self.stack.reverse(), self)).keys()) + predecessors.reverse() + return [ + predecessor + for predecessor in predecessors + if not predecessor.status == RevisionStatus.LANDED and predecessor != self + ] + + @property + def linear_stack(self): + """Return a list of all successors and predecessors if linear. + + Stop at the first predecessor with multiple predecessors. + Stop at the first successor with multiple successors. + """ + stack = [] + + predecessors = list(self.stack.predecessors(self)) + while predecessors: + if len(predecessors) > 1: + break + predecessor = predecessors[0] + stack.insert(0, predecessor) + predecessors = list(self.stack.predecessors(predecessor)) + + stack.append(self) + + successors = list(self.stack.successors(self)) + while successors: + if len(successors) > 1: + break + successor = successors[0] + stack.append(successor) + successors = list(self.stack.successors(successor)) + + return stack + + def change_triggered(self, changes): + """Check if any of the changes should trigger a status change.""" + keys = ("repo_name", "repo_callsign", "diff_id", "stack_graph") + for key in keys: + old = getattr(self, key, None) + new = changes.get(key, None) + if type(old) == type(new) and type(old) == dict: + if old != new: + logger.info(f"Change detected in {self} ({key}) {old} vs {new}") + return True + elif str(old) != str(new): + logger.info(f"Change detected in {self} ({key}) {old} vs {new}") + return True + return False + + def fail(self): + """Clear relevant fields on revision when a landing job fails.""" + self.status = RevisionStatus.FAILED + db.session.commit() + + def land(self): + """Clear relevant fields on revision when a landing job fails.""" + self.status = RevisionStatus.LANDED + db.session.commit() + + def ready(self): + """Clear relevant fields on revision when a landing job fails.""" + self.status = RevisionStatus.READY + db.session.commit() + + def update_data(self, **params): + logger.info(f"Updating revision {self} data with {params}") + if self.data: + data = self.data.copy() + else: + data = {} + data.update(params) + self.data = data def serialize(self) -> dict[str, Any]: return { "id": self.id, "revision_id": self.revision_id, "diff_id": self.diff_id, - "landing_jobs": [job.id for job in self.landing_jobs], - "created_at": self.created_at, - "updated_at": self.updated_at, + "repo_name": self.repo_name, + "status": self.status.value, + "data": self.data, } diff --git a/landoapi/phabricator.py b/landoapi/phabricator.py index 37fb8826..5e9af13b 100644 --- a/landoapi/phabricator.py +++ b/landoapi/phabricator.py @@ -23,6 +23,7 @@ ) import requests +from flask import current_app from landoapi.systems import Subsystem @@ -391,4 +392,40 @@ def healthy(self) -> bool | str: return True +def get_phab_client() -> PhabricatorClient: + """Initialize PhabricatorClient with credentials and return it.""" + phab = PhabricatorClient( + current_app.config["PHABRICATOR_URL"], + current_app.config["PHABRICATOR_UNPRIVILEGED_API_KEY"], + ) + return phab + + +def call_conduit(method: str, **kwargs) -> dict: + """Helper method to fetch client and use it to send data to conduit API.""" + phab = get_phab_client() + try: + result = phab.call_conduit(method, **kwargs) + except PhabricatorAPIException as e: + logger.error(e) + # TODO: raise or return error here. + return + return result + + +def get_conduit_data(method: str, **kwargs) -> dict: + """Helper method to fetch multiple pages of data.""" + data = [] + result = call_conduit(method, **kwargs) + if not result: + return data + + data += result["data"] + while result and result["cursor"] and result["cursor"]["after"]: + result = call_conduit(method, after=result["cursor"]["after"], **kwargs) + if result and "data" in result: + data += result["data"] + return data + + phabricator_subsystem = PhabricatorSubsystem() diff --git a/landoapi/repos.py b/landoapi/repos.py index 1a8ed96f..c0f87ad9 100644 --- a/landoapi/repos.py +++ b/landoapi/repos.py @@ -51,6 +51,8 @@ class Repo: from a remote Mercurial repository. Defaults to `url`. short_name (str): The Phabricator short name field for this repo, if different from the `tree`. Defaults to `tree`. + use_revision_worker (bool): When set to `True`, enables Revision Worker + functionality for this repo. Defaults to `False`. approval_required (bool): Whether approval is required or not for given repo. Note that this is not fully implemented but is included for compatibility. Defaults to `False`. @@ -69,6 +71,7 @@ class Repo: push_path: str = "" pull_path: str = "" short_name: str = "" + use_revision_worker: bool = False approval_required: bool = False milestone_tracking_flag_template: str = "" autoformat_enabled: bool = False @@ -164,6 +167,7 @@ def phab_identifier(self) -> str: access_group=SCM_LEVEL_1, product_details_url="http://product-details.test/1.0/firefox_versions.json", ), + # A generic repo, similar in behaviour to mozilla-central. "first-repo": Repo( tree="first-repo", url="http://hg.test/first-repo", @@ -171,10 +175,13 @@ def phab_identifier(self) -> str: access_group=SCM_LEVEL_1, commit_flags=[DONTBUILD], ), + # Similar to first-repo, but uses revision worker. "second-repo": Repo( tree="second-repo", url="http://hg.test/second-repo", + push_path="ssh://autoland.hg//repos/second-repo", access_group=SCM_LEVEL_1, + use_revision_worker=True, ), "third-repo": Repo( tree="third-repo", @@ -199,6 +206,7 @@ def phab_identifier(self) -> str: tree="test-repo", url="https://hg.mozilla.org/conduit-testing/test-repo", access_group=SCM_CONDUIT, + use_revision_worker=True, ), "m-c": Repo( tree="m-c", @@ -209,6 +217,7 @@ def phab_identifier(self) -> str: milestone_tracking_flag_template="cf_status_firefox{milestone}", product_details_url="https://raw.githubusercontent.com/mozilla-conduit" "/suite/main/docker/product-details/1.0/firefox_versions.json", + use_revision_worker=True, ), "vct": Repo( tree="vct", diff --git a/landoapi/transplants.py b/landoapi/transplants.py index a23285c7..6b62bc1f 100644 --- a/landoapi/transplants.py +++ b/landoapi/transplants.py @@ -13,7 +13,12 @@ from flask import current_app from landoapi.models.landing_job import LandingJob, LandingJobStatus -from landoapi.models.revisions import DiffWarning, DiffWarningStatus +from landoapi.models.revisions import ( + DiffWarning, + DiffWarningStatus, + Revision, + RevisionStatus, +) from landoapi.phabricator import ( PhabricatorClient, PhabricatorRevisionStatus, @@ -376,6 +381,38 @@ def warning_unresolved_comments(*, phab, revision, **kwargs): return "Revision has unresolved comments." +@RevisionWarningCheck(10, "Revision has revision worker warnings.", True) +def warning_revision_worker(*, phab, revision, repo, **kwargs): + """Check for any warnings based on revision worker.""" + revision_id = revision["id"] + diff_id = revision["fields"]["diffID"] + + supported_repos = get_repos_for_env(current_app.config.get("ENVIRONMENT")) + repo = supported_repos[repo["fields"]["shortName"]] + if not repo.use_revision_worker: + return + + lando_revision = Revision.query.filter( + Revision.revision_id == revision_id + ).one_or_none() + message = "" + if not lando_revision: + message = "No lando revision found." + elif diff_id and lando_revision.diff_id and int(diff_id) != lando_revision.diff_id: + message = "Lando has not checked the latest diff yet." + elif lando_revision.status == RevisionStatus.QUEUED: + message = "Revision is queued for landing, please wait." + elif lando_revision.status == RevisionStatus.LANDED: + message = "Revision has already landed. Please wait until it is closed." + elif lando_revision.status == RevisionStatus.LANDING: + message = "Revision is landing." + elif lando_revision.status == RevisionStatus.PROBLEM: + message = lando_revision.data.get("error", "An unknown error has occurred.") + + if message: + return [{"message": message}] + + def user_block_no_auth0_email(*, auth0_user, **kwargs): """Check the user has a proper auth0 email.""" return ( @@ -423,6 +460,7 @@ def check_landing_warnings( warning_wip_commit_message, warning_code_freeze, warning_unresolved_comments, + warning_revision_worker, ], ): assessment = TransplantAssessment() diff --git a/landoapi/workers/base.py b/landoapi/workers/base.py index ce9b75b9..00638c0c 100644 --- a/landoapi/workers/base.py +++ b/landoapi/workers/base.py @@ -10,7 +10,11 @@ import subprocess from time import sleep -from landoapi.models.configuration import ConfigurationKey, ConfigurationVariable +from landoapi.models.configuration import ( + ConfigurationKey, + ConfigurationVariable, + VariableType, +) from landoapi.repos import repo_clone_subsystem from landoapi.treestatus import treestatus_subsystem @@ -121,9 +125,11 @@ def _setup(self): self._setup_ssh(self.ssh_private_key) def _start(self, max_loops: int | None = None, *args, **kwargs): - """Run the main event loop.""" - # NOTE: The worker will exit when max_loops is reached, or when the stop - # variable is changed to True. + """Start the main event loop, and a loop counter. + + If maximum number of loops is reached, or if the worker stop flag is toggled, + the worker will exit. + """ loops = 0 while self._running: if max_loops is not None and loops >= max_loops: @@ -163,5 +169,53 @@ def start(self, max_loops: int | None = None): self._start(max_loops=max_loops) def loop(self, *args, **kwargs): - """The main event loop.""" + """Main event loop to be defined by each worker.""" raise NotImplementedError() + + +class RevisionWorker(Worker): + """A worker that pre-processes revisions. + + This worker continuously synchronises revisions with the remote Phabricator API + and runs all applicable checks and processes on each revision, if needed. + """ + + @property + def STOP_KEY(self) -> ConfigurationKey: + """Return the configuration key that prevents the worker from starting.""" + return ConfigurationKey.REVISION_WORKER_STOPPED + + @property + def PAUSE_KEY(self) -> ConfigurationKey: + """Return the configuration key that pauses the worker.""" + return ConfigurationKey.REVISION_WORKER_PAUSED + + @property + def CAPACITY_KEY(self) -> ConfigurationKey: + """Return the configuration key that pauses the worker.""" + return ConfigurationKey.REVISION_WORKER_CAPACITY + + @classmethod + def pause(cls): + """Pause the operation of revision workers.""" + ConfigurationVariable.set(cls.PAUSE_KEY, VariableType.BOOL, "1") + + @classmethod + def resume(cls): + """Resume the operation of revision workers.""" + ConfigurationVariable.set(cls.PAUSE_KEY, VariableType.BOOL, "0") + + @classmethod + def stop(cls): + """Stop the operation of revision workers (causes worker to exit).""" + ConfigurationVariable.set(cls.STOP_KEY, VariableType.BOOL, "1") + + def __init__(self, *args, **kwargs): + super().__init__(with_ssh=False, **kwargs) + + @property + def capacity(self): + """ + The number of revisions that this worker will fetch for processing per batch. + """ + return ConfigurationVariable.get(self.CAPACITY_KEY, 2) diff --git a/landoapi/workers/revision_worker.py b/landoapi/workers/revision_worker.py new file mode 100644 index 00000000..774dc077 --- /dev/null +++ b/landoapi/workers/revision_worker.py @@ -0,0 +1,364 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +from __future__ import annotations + +import io +import logging +from itertools import chain + +import networkx as nx + +from landoapi.hg import HgRepo +from landoapi.models.revisions import Revision +from landoapi.models.revisions import RevisionStatus as RS +from landoapi.phabricator import get_conduit_data +from landoapi.repos import repo_clone_subsystem +from landoapi.storage import db +from landoapi.workers.base import RevisionWorker + +logger = logging.getLogger(__name__) + + +DIFF_CONTEXT_SIZE = 5000 + + +class StackGraph(nx.DiGraph): + def __eq__(self, G): + return nx.utils.misc.graphs_equal(self, G) + + @property + def revisions(self): + return self.nodes + + +def get_active_repos(repo_config: dict) -> list[str]: + """Query Phabricator to determine PHIDs of active repos.""" + repos = [repo for repo in repo_config if repo.use_revision_worker] + repo_phids = get_conduit_data( + "diffusion.repository.search", + constraints={"shortNames": [r.short_name for r in repos]}, + ) + return [r["phid"] for r in repo_phids] + + +def get_stacks(revisions: dict[str, dict]) -> list: + """Returns a stack with revision PHIDs as nodes. + + This method fetches unique stacks from a list of stack graphs. This + is because Phabricator returns different forms of the same stack graph + in each revision. + + This method will return a list of StackGraph objects. + """ + stacks = [r["fields"]["stackGraph"] for r in revisions.values()] + parsed = [StackGraph(s).reverse() for s in stacks] + + filtered = [] + for stack in parsed: + if stack not in filtered: + filtered.append(stack) + return filtered + + +def get_phab_revisions(statuses: list[str] | None = None) -> dict[int, dict]: + """Get a list of revisions of given statuses.""" + statuses = statuses or [ + "accepted", + "changes-planned", + "draft", + "needs-review", + "published", + ] + + # Get all revisions with given filters. + repo_config = repo_clone_subsystem.repos.values() + revisions = get_conduit_data( + "differential.revision.search", + constraints={ + "statuses": statuses, + "repositoryPHIDs": get_active_repos(repo_config), + }, + ) + + # Translate into a dictionary. + revisions = {revision["phid"]: revision for revision in revisions} + + if not revisions: + return {} + + # Get list of unique stacks included in these revisions. + stacks = get_stacks(revisions) + + # Ensure that all revisions in each stack are in our revisions list. + input_revisions = set(chain(*[stack.revisions for stack in stacks])) + missing_keys = input_revisions.difference(revisions.keys()) + + if missing_keys: + stragglers = get_conduit_data( + "differential.revision.search", + constraints={"phids": list(missing_keys)}, + ) + revisions.update({revision["phid"]: revision for revision in stragglers}) + + # Convert back to a list. + revisions = list(revisions.values()) + + # Create a map to translate phids to revision IDs. + revision_phid_map = { + revision["phid"]: str(revision["id"]) for revision in revisions + } + + # Translate phids in stack graph to revision IDs. + for revision in revisions: + stack_graph = revision["fields"]["stackGraph"] + stack_graph = { + revision_phid_map[source]: [revision_phid_map[phid] for phid in dests] + for source, dests in stack_graph.items() + } + revision["fields"]["stackGraph"] = stack_graph + + # Translate all revisions into a format that can be consumed by Lando. + revisions = [ + { + "revision_id": revision["id"], + "diff_id": revision["fields"]["diffID"], + "diff_phid": revision["fields"]["diffPHID"], + "repo_phid": revision["fields"]["repositoryPHID"], + "phid": revision["phid"], + "stack_graph": revision["fields"]["stackGraph"], + } + for revision in revisions + if revision["fields"]["diffPHID"] and revision["fields"]["repositoryPHID"] + ] + + repo_phids = [revision["repo_phid"] for revision in revisions] + repo_infos = get_conduit_data( + "diffusion.repository.search", constraints={"phids": repo_phids} + ) + repo_map = { + repo_info["phid"]: { + "repo_name": repo_info["fields"]["shortName"], + "repo_callsign": repo_info["fields"]["callsign"], + } + for repo_info in repo_infos + } + + for revision in revisions: + revision.update(repo_map[revision["repo_phid"]]) + + # Move PHIDs to their own key + revision["phids"] = { + "repo_phid": revision.pop("repo_phid"), + "diff_phid": revision.pop("diff_phid"), + "revision_phid": revision.pop("phid"), + } + + logger.debug(f"Found {len(revisions)} revisions from Phabricator API") + + return {revision["revision_id"]: revision for revision in revisions} + + +def parse_diff(diff: str) -> set[str]: + """Given a diff, extract list of affected files.""" + diff_lines = diff.splitlines() + file_diffs = [ + line.split(" ")[2:] for line in diff_lines if line.strip().startswith("diff") + ] + file_paths = set() + for file_diff in file_diffs: + # Parse source/destination paths. + path1, path2 = file_diff + file_paths.add("/".join(path1.split("/")[1:])) + file_paths.add("/".join(path2.split("/")[1:])) + return file_paths + + +def discover_revisions() -> None: + """Check and update local database with available revisions.""" + phab_revisions = get_phab_revisions() + revisions_to_stale_successors_of = [] + new_revisions = [] + all_revisions = [] + + for phab_revision in phab_revisions.values(): + revision_id = phab_revision["revision_id"] + diff_id = phab_revision["diff_id"] + lando_revision = Revision.query.filter( + Revision.revision_id == revision_id + ).one_or_none() + + if lando_revision and lando_revision.status in RS.LANDING_STATES: + continue + + new = not lando_revision + if new: + logger.info(f"Picked up new revision {revision_id}.") + lando_revision = Revision(revision_id=revision_id, diff_id=diff_id) + db.session.add(lando_revision) + new_revisions.append(lando_revision) + + all_revisions.append(lando_revision) + + if lando_revision.change_triggered(phab_revision) or new: + logger.info(f"Change detected in {lando_revision}.") + # Update all matching fields in the revision with remote data. + for key, value in phab_revision.items(): + if key == "phids": + lando_revision.update_data(**value) + else: + setattr(lando_revision, key, value) + lando_revision.set_temporary_patch() + lando_revision.status = RS.WAITING + revisions_to_stale_successors_of.append(lando_revision) + db.session.commit() + + for revision in set(revisions_to_stale_successors_of) - set(new_revisions): + for successor in revision.successors: + successor.status = RS.STALE + + for revision in all_revisions: + if len(list(revision.stack.predecessors(revision))) > 1: + revision.status = RS.PROBLEM + revision.update_data(error="Revision has more than one predecessor.") + db.session.commit() + + +def mark_stale_revisions() -> None: + """Discover any upstream changes, and mark revisions affected as stale.""" + repos = Revision.query.with_entities(Revision.repo_name).distinct().all() + repos = tuple(repo[0] for repo in repos if repo[0]) + for repo_name in repos: + repo = repo_clone_subsystem.repos[repo_name] + hgrepo = HgRepo( + str(repo_clone_subsystem.repo_paths[repo_name]), + ) + # checkout repo, pull & update + with hgrepo.for_pull(): + if hgrepo.has_incoming(repo.pull_path): + hgrepo.update_repo(repo.pull_path) + logger.info(f"Incoming changes detected in {repo_name}.") + revisions = Revision.query.filter( + Revision.status.not_in(RS.LANDING_STATES), + Revision.repo_name == repo_name, + ) + logger.info(f"Marking {revisions.count()} revisions as stale.") + revisions.update({Revision.status: RS.STALE}) + db.session.commit() + + +class Supervisor(RevisionWorker): + """A worker that detects and synchronizes remote revisions. + + This worker continuously synchronises revisions with the remote Phabricator API + as well as detects any incoming changes from the remote repository. + + NOTE: This worker does not support scaling and requires that it is the only worker + running. + """ + + def loop(self): + """Run the event loop for the revision worker.""" + self.throttle() + mark_stale_revisions() + discover_revisions() + + +class Processor(RevisionWorker): + """A worker that pre-processes revisions. + + This worker attempts to import each patch and its predecessors, and detects any + issues that come up during the import. + + NOTE: This worker supports scaling and can run independently of other workers. + """ + + def loop(self): + """Run the event loop for the revision worker.""" + self.throttle() + + # Fetch revisions that require pre-processing. + with db.session.begin_nested(): + Revision.lock_table() + revisions = Revision.query.filter( + Revision.status.in_([RS.WAITING, RS.STALE]) + ).limit(self.capacity) + + picked_up = [r.id for r in revisions] + + # Mark revisions as picked up so other workers don't pick them up. + Revision.query.filter(Revision.id.in_(picked_up)).update( + {Revision.status: RS.PICKED_UP} + ) + + db.session.commit() + + revisions = Revision.query.filter(Revision.id.in_(picked_up)) + + # NOTE: The revisions will be processed according to their dependencies + # at the time of fetching. If dependencies change, they will be + # re-processed on the next iteration. This has the effect of processing + # revisions as they become available, if, for example, a large stack is + # being uploaded. + + logger.info(f"Found {revisions.all()} to process.") + for revision in revisions: + errors = [] + logger.info(f"Running checks on revision {revision}") + + revision.status = RS.CHECKING + db.session.commit() + + try: + errors = self.process(revision) + except Exception as e: + logger.info(f"Exception encountered while processing {revision}") + revision.status = RS.PROBLEM + revision.update_data(error="".join(e.args)) + logger.exception(e) + db.session.commit() + continue + + if errors: + logger.info(f"Errors detected on revision {revision}") + revision.status = RS.PROBLEM + revision.update_data(error="".join(errors)) + else: + revision.status = RS.READY + logger.info(f"No problems detected on revision {revision}") + db.session.commit() + + def _process_patch(self, revision: Revision, hgrepo: HgRepo) -> list[str]: + """Run through all predecessors before applying revision patch.""" + errors = [] + for r in revision.predecessors + [revision]: + try: + hgrepo.apply_patch(io.BytesIO(r.patch_bytes)) + except Exception as e: + # Something is wrong (e.g., merge conflict). Log and break. + logger.error(e) + errors.append(f"Problem detected in {r} ({e})") + break + return errors + + def _get_repo_objects(self, repo_name: str) -> tuple[HgRepo, str]: + """Given a repo name, return the hg repo object and pull path.""" + repo = repo_clone_subsystem.repos[repo_name] + hgrepo = HgRepo( + str(repo_clone_subsystem.repo_paths[repo_name]), + ) + return hgrepo, repo.pull_path + + def process(self, revision: Revision) -> list[str]: + """Update repo and attempt to import patch.""" + errors = [] + + hgrepo, pull_path = self._get_repo_objects(revision.repo_name) + + # checkout repo, pull & update + with hgrepo.for_pull(): + hgrepo.update_repo(pull_path) + + # Try to merge the revision patch and its predecessors. + errors = self._process_patch(revision, hgrepo) + return errors diff --git a/migrations/versions/6849fb8e7879_revision_worker_changes.py b/migrations/versions/6849fb8e7879_revision_worker_changes.py new file mode 100644 index 00000000..fde1a115 --- /dev/null +++ b/migrations/versions/6849fb8e7879_revision_worker_changes.py @@ -0,0 +1,85 @@ +"""revision worker changes + +Revision ID: 6849fb8e7879 +Revises: 50ffadceca83 +Create Date: 2023-05-29 16:21:30.402756 + +""" +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "6849fb8e7879" +down_revision = "50ffadceca83" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("revision", sa.Column("patch_locked", sa.Boolean(), nullable=False)) + op.add_column( + "revision", sa.Column("repo_name", sa.String(length=254), nullable=False) + ) + op.add_column( + "revision", sa.Column("repo_callsign", sa.String(length=254), nullable=False) + ) + op.add_column( + "revision", + sa.Column("data", postgresql.JSONB(astext_type=sa.Text()), nullable=False), + ) + op.add_column( + "revision", + sa.Column( + "stack_graph", postgresql.JSONB(astext_type=sa.Text()), nullable=False + ), + ) + + # ### additional commands manually added below. + revision_status_values = ( + "NEW", + "STALE", + "WAITING", + "PICKED_UP", + "CHECKING", + "PROBLEM", + "READY", + "QUEUED", + "LANDING", + "LANDED", + "FAILED", + ) + revision_status_enum = sa.Enum(*revision_status_values, name="revisionstatus") + revision_status_enum.create(op.get_bind(), checkfirst=True) + op.add_column("revision", sa.Column("status", revision_status_enum, nullable=False)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("revision", "stack_graph") + op.drop_column("revision", "data") + op.drop_column("revision", "repo_callsign") + op.drop_column("revision", "repo_name") + op.drop_column("revision", "patch_locked") + + # ### additional commands manually added below. + revision_status_values = ( + "NEW", + "STALE", + "WAITING", + "PICKED_UP", + "CHECKING", + "PROBLEM", + "READY", + "QUEUED", + "LANDING", + "LANDED", + "FAILED", + ) + revision_status_enum = sa.Enum(*revision_status_values, name="revisionstatus") + revision_status_enum.drop(op.get_bind()) + + op.drop_column("revision", "status") + # ### end Alembic commands ### diff --git a/tests/conftest.py b/tests/conftest.py index 066f51eb..80964bf6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,6 +21,7 @@ from landoapi.app import SUBSYSTEMS, construct_app, load_config from landoapi.cache import cache from landoapi.mocks.auth import TEST_JWKS, MockAuth0 +from landoapi.models.revisions import Revision, RevisionStatus from landoapi.phabricator import PhabricatorClient from landoapi.projects import ( CHECKIN_PROJ_SLUG, @@ -28,12 +29,27 @@ SEC_APPROVAL_PROJECT_SLUG, SEC_PROJ_SLUG, ) -from landoapi.repos import SCM_LEVEL_3, Repo +from landoapi.repos import SCM_LEVEL_3, Repo, repo_clone_subsystem from landoapi.storage import db as _db from landoapi.tasks import celery from landoapi.transplants import CODE_FREEZE_OFFSET, tokens_are_equal from tests.mocks import PhabricatorDouble, TreeStatusDouble +PATCH_NORMAL_1 = r""" +# HG changeset patch +# User Test User +# Date 0 0 +# Thu Jan 01 00:00:00 1970 +0000 +# Diff Start Line 7 +add another file. +diff --git a/test.txt b/test.txt +--- a/test.txt ++++ b/test.txt +@@ -1,1 +1,2 @@ + TEST ++adding another line +""".strip() + class JSONClient(flask.testing.FlaskClient): """Custom Flask test client that sends JSON by default. @@ -426,3 +442,48 @@ def strptime(cls, date_string, fmt): return dates[f"{date_string}"] return Mockdatetime + + +@pytest.fixture +def create_patch_revision(db): + """A fixture that fake uploads a patch""" + + def _create_patch_revision( + number, patch=PATCH_NORMAL_1, status=RevisionStatus.READY + ): + revision = Revision() + revision.status = status + revision.revision_id = number + revision.diff_id = number + revision.patch_bytes = patch.encode("utf-8") + db.session.add(revision) + db.session.commit() + return revision + + return _create_patch_revision + + +@pytest.fixture +def setup_repo(mock_repo_config, phabdouble, app, hg_server): + def _setup(commit_flags=None): + mock_repo_config( + { + "test": { + "repoA": Repo( + tree="mozilla-central", + url=hg_server, + access_group=SCM_LEVEL_3, + push_path=hg_server, + pull_path=hg_server, + use_revision_worker=True, + commit_flags=commit_flags or [], + ) + } + } + ) + repo = phabdouble.repo(name="repoA") + app.config["REPOS_TO_LAND"] = "repoA" + repo_clone_subsystem.ready() + return repo + + return _setup diff --git a/tests/test_landings.py b/tests/test_landings.py index 3cf03461..d8ee7b53 100644 --- a/tests/test_landings.py +++ b/tests/test_landings.py @@ -4,9 +4,7 @@ import io import textwrap -import unittest.mock as mock - -import pytest +from unittest import mock from landoapi.hg import AUTOFORMAT_COMMIT_MESSAGE, HgRepo from landoapi.models.landing_job import ( @@ -14,27 +12,9 @@ LandingJobStatus, add_job_with_revisions, ) -from landoapi.models.revisions import Revision from landoapi.repos import SCM_LEVEL_3, Repo from landoapi.workers.landing_worker import LandingWorker - -@pytest.fixture -def create_patch_revision(db): - """A fixture that fake uploads a patch""" - - def _create_patch_revision(number, patch=PATCH_NORMAL_1): - revision = Revision() - revision.revision_id = number - revision.diff_id = number - revision.patch_bytes = patch.encode("utf-8") - db.session.add(revision) - db.session.commit() - return revision - - return _create_patch_revision - - PATCH_NORMAL_1 = r""" # HG changeset patch # User Test User diff --git a/tests/test_notifications.py b/tests/test_notifications.py index c173232c..53323983 100644 --- a/tests/test_notifications.py +++ b/tests/test_notifications.py @@ -96,7 +96,7 @@ def test_notify_user_of_landing_failure( job.revisions.append(Revision()) notify_user_of_landing_failure( job.requester_email, - job.head_revision, + "D1234", job.error, job.id, ) diff --git a/tests/test_reviews.py b/tests/test_reviews.py index c93944df..54036372 100644 --- a/tests/test_reviews.py +++ b/tests/test_reviews.py @@ -70,7 +70,9 @@ def test_collate_reviewer_attachments_n_reviewers(phabdouble, n_reviewers): def test_sec_approval_is_filtered_from_commit_message_reviewer_list( - phabdouble, secure_project, sec_approval_project + phabdouble, + secure_project, + sec_approval_project, ): revision = phabdouble.revision(projects=[secure_project]) user = phabdouble.user(username="normal_reviewer") @@ -95,7 +97,9 @@ def test_sec_approval_is_filtered_from_commit_message_reviewer_list( def test_approvals_for_commit_message( - phabdouble, sec_approval_project, release_management_project + phabdouble, + sec_approval_project, + release_management_project, ): revision = phabdouble.revision() user = phabdouble.user(username="normal_reviewer") diff --git a/tests/test_revision_worker.py b/tests/test_revision_worker.py new file mode 100644 index 00000000..faa7a82c --- /dev/null +++ b/tests/test_revision_worker.py @@ -0,0 +1,462 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import pytest + +from landoapi.hg import HgRepo +from landoapi.models.configuration import ConfigurationVariable, VariableType +from landoapi.models.landing_job import LandingJob, LandingJobStatus +from landoapi.models.revisions import Revision, RevisionStatus +from landoapi.phabricator import PhabricatorRevisionStatus +from landoapi.repos import SCM_LEVEL_3, Repo, repo_clone_subsystem +from landoapi.workers.landing_worker import LandingWorker +from landoapi.workers.revision_worker import ( + Processor, + Supervisor, + get_active_repos, + get_stacks, + parse_diff, +) + +initial_diff = """ +diff --git a/a b/a +new file mode 100644 +--- /dev/null ++++ b/a +@@ -0,0 +1,2 @@ ++first line ++second line +diff --git a/b b/b +new file mode 100644 +--- /dev/null ++++ b/b +@@ -0,0 +1,1 @@ ++first line +diff --git a/c b/c +new file mode 100644 +""".strip() + +second_diff = """ +diff --git a/a b/a +--- a/a ++++ b/a +@@ -1,2 +1,1 @@ + first line +-second line +diff --git a/b b/b +deleted file mode 100644 +--- a/b ++++ /dev/null +@@ -1,1 +0,0 @@ +-first line +diff --git a/d b/d +new file mode 100644 +""".strip() + +third_diff = """ +diff --git a/c b/c +deleted file mode 100644 +diff --git a/d b/d +deleted file mode 100644 +""".strip() + + +@pytest.fixture +def new_diff(): + def _new_diff(filename): + return f""" + diff --git a/{filename} b/{filename} + new file mode 100644 + --- /dev/null + +++ b/{filename} + @@ -0,0 +1,2 @@ + +first line + +second line + """.strip() + + return _new_diff + + +@pytest.fixture +def repos_dict(): + repo_config = { + "repoA": Repo( + short_name="repoA", + tree="repo-A", + url="http://hg.test", + use_revision_worker=True, + access_group=None, + ), + "repoB": Repo( + short_name="repoB", + tree="repo-B", + url="http://hg.test", + use_revision_worker=False, + access_group=None, + ), + } + return repo_config + + +@pytest.fixture +def setup_repo(mock_repo_config, phabdouble, app, hg_server): + def _setup(): + mock_repo_config( + { + "test": { + "repoA": Repo( + tree="mozilla-central", + url=hg_server, + access_group=SCM_LEVEL_3, + push_path=hg_server, + pull_path=hg_server, + use_revision_worker=True, + ) + } + } + ) + repo = phabdouble.repo(name="repoA") + app.config["REPOS_TO_LAND"] = "repoA" + repo_clone_subsystem.ready() + return repo + + return _setup + + +def test_get_active_repos(phabdouble, db, repos_dict): + """Only repos that have `use_revision_worker` set to `True` should be returned.""" + repoA = phabdouble.repo(name="repoA") + phabdouble.repo(name="repoB") + + test = get_active_repos(repos_dict.values()) + assert test == [repoA["phid"]] + + +def test_get_stacks(phabdouble): + repo = phabdouble.repo(name="test-repo") + + d1a = phabdouble.diff() + r1 = phabdouble.revision(diff=d1a, repo=repo) + + d2 = phabdouble.diff() + r2 = phabdouble.revision(diff=d2, repo=repo, depends_on=[r1]) + + d3 = phabdouble.diff() + r3 = phabdouble.revision(diff=d3, repo=repo, depends_on=[r1]) + + d4 = phabdouble.diff() + r4 = phabdouble.revision(diff=d4, repo=repo) + + phab = phabdouble.get_phabricator_client() + revisions = phab.call_conduit("differential.revision.search")["data"] + test = get_stacks({r["phid"]: r for r in revisions}) + assert len(test) == 2 + test.sort(key=lambda x: len(x.nodes)) + + assert list(test[0].nodes) == [r4["phid"]] + assert sorted(test[1].nodes) == sorted([r1["phid"], r2["phid"], r3["phid"]]) + + assert len(test[0].edges) == 0 + assert sorted(test[1].edges) == sorted( + [(r1["phid"], r2["phid"]), (r1["phid"], r3["phid"])] + ) + + +def test_get_phab_revisions(phabdouble, db): + # TODO + pass + + +def test_parse_diff(): + """The provided patch should yield all filenames modified in the diff.""" + test = parse_diff(second_diff) + assert test == {"a", "b", "d"} + + +def test_workers_integration( + app, + db, + phabdouble, + setup_repo, + hg_clone, + treestatusdouble, +): + """This test runs through the entire workflow of supervisor + processor workers. + + - Create a stack with three revisions + - Ensure that the revisions are picked up by the Supervisor worker + - Ensure that the revisions are marked as WAITING + - Verify that the diffs are added correctly + - Verify that the stack is represented correctly in the database + - Run Processor worker + - Verify that the revisions are processed and marked as READY + - Update a single revision with a new diff + - Verify that the successor revisions are marked as stale + - Verify that the successor revisions are marked as READY afterwards + """ + repo = setup_repo() + treestatus = treestatusdouble.get_treestatus_client() + treestatusdouble.open_tree("repoA") + hgrepo = HgRepo(hg_clone.strpath) + + r1 = phabdouble.revision(diff=phabdouble.diff(rawdiff=initial_diff), repo=repo) + r2 = phabdouble.revision( + diff=phabdouble.diff(rawdiff=second_diff), repo=repo, depends_on=[r1] + ) + r3 = phabdouble.revision( + diff=phabdouble.diff(rawdiff=third_diff), repo=repo, depends_on=[r2] + ) + + assert Revision.query.count() == 0 + + supervisor = Supervisor() + supervisor.start(max_loops=1) + + revisions = Revision.query.all() + assert len(revisions) == 3 + + assert {r.status for r in revisions} == {RevisionStatus.WAITING} + + revision_1 = Revision.query.filter(Revision.revision_id == r1["id"]).one() + revision_2 = Revision.query.filter(Revision.revision_id == r2["id"]).one() + revision_3 = Revision.query.filter(Revision.revision_id == r3["id"]).one() + + # Check that all the patches are correct. + assert b"\n".join(revision_1.patch_bytes.splitlines()[6:]) == initial_diff.encode( + "utf-8" + ) + assert b"\n".join(revision_2.patch_bytes.splitlines()[6:]) == second_diff.encode( + "utf-8" + ) + assert b"\n".join(revision_3.patch_bytes.splitlines()[6:]) == third_diff.encode( + "utf-8" + ) + + # Check that stack is correct + assert revision_1.predecessor is None + assert revision_2.predecessor == revision_1 + assert revision_3.predecessor == revision_2 + + assert revision_3.predecessors == [revision_1, revision_2] + assert revision_2.predecessors == [revision_1] + + assert revision_1.linear_stack == revision_2.linear_stack + assert revision_2.linear_stack == revision_3.linear_stack + assert revision_3.linear_stack == [revision_1, revision_2, revision_3] + + processor = Processor() + + ConfigurationVariable.set(processor.CAPACITY_KEY, VariableType.INT, "3") + ConfigurationVariable.set(processor.THROTTLE_KEY, VariableType.INT, "0") + + processor.start(max_loops=1) + + revisions = Revision.query.all() + assert len(revisions) == 3 + assert {r.status for r in revisions} == {RevisionStatus.READY} + + # Update revision 2 with a new diff. + phabdouble.diff(rawdiff=second_diff, revision=r2) + + # We expect revisions 2 and 3 to be marked as stale. + supervisor.start(max_loops=1) + revision_1 = Revision.query.filter(Revision.revision_id == r1["id"]).one() + revision_2 = Revision.query.filter(Revision.revision_id == r2["id"]).one() + revision_3 = Revision.query.filter(Revision.revision_id == r3["id"]).one() + assert revision_1.status == RevisionStatus.READY + assert revision_2.status == RevisionStatus.STALE + assert revision_3.status == RevisionStatus.STALE + + # After processing we expect everything to be back to ready state. + processor.start(max_loops=1) + + revision_1 = Revision.query.filter(Revision.revision_id == r1["id"]).one() + revision_2 = Revision.query.filter(Revision.revision_id == r2["id"]).one() + revision_3 = Revision.query.filter(Revision.revision_id == r3["id"]).one() + assert revision_1.status == RevisionStatus.READY + assert revision_2.status == RevisionStatus.READY + assert revision_3.status == RevisionStatus.READY + + # The next few steps mimic what the transplant API endpoint does. + # Create a landing job to try and land these revisions. + job = LandingJob( + requester_email="test@example.com", + repository_name="repoA", + ) + + db.session.add(job) + + # Commit to get job ID. + db.session.commit() + + job.add_revisions([revision_1, revision_2, revision_3]) + job.status = LandingJobStatus.SUBMITTED + db.session.commit() + worker = LandingWorker(sleep_seconds=0) + worker.run_job(job, repo_clone_subsystem.repos["repoA"], hgrepo, treestatus) + + +def test_workers_integration_fail_with_merge_conflict( + app, + db, + phabdouble, + setup_repo, + hg_clone, + treestatusdouble, +): + """ + Runs the same steps as the previous test, but tries to apply the second patch twice. + """ + repo = setup_repo() + + r1 = phabdouble.revision(diff=phabdouble.diff(rawdiff=initial_diff), repo=repo) + r2 = phabdouble.revision( + diff=phabdouble.diff(rawdiff=second_diff), repo=repo, depends_on=[r1] + ) + r3 = phabdouble.revision( + diff=phabdouble.diff(rawdiff=second_diff), repo=repo, depends_on=[r2] + ) + + assert Revision.query.count() == 0 + + supervisor = Supervisor() + supervisor.start(max_loops=1) + + revisions = Revision.query.all() + assert len(revisions) == 3 + assert {r.status for r in revisions} == {RevisionStatus.WAITING} + + revision_1 = Revision.query.filter(Revision.revision_id == r1["id"]).one() + revision_2 = Revision.query.filter(Revision.revision_id == r2["id"]).one() + revision_3 = Revision.query.filter(Revision.revision_id == r3["id"]).one() + + # Check that all the patches are correct. + assert b"\n".join(revision_1.patch_bytes.splitlines()[6:]) == initial_diff.encode( + "utf-8" + ) + assert b"\n".join(revision_2.patch_bytes.splitlines()[6:]) == second_diff.encode( + "utf-8" + ) + assert b"\n".join(revision_3.patch_bytes.splitlines()[6:]) == second_diff.encode( + "utf-8" + ) + + # Check that stack is correct + assert revision_1.predecessor is None + assert revision_2.predecessor == revision_1 + assert revision_3.predecessor == revision_2 + + assert revision_3.predecessors == [revision_1, revision_2] + assert revision_2.predecessors == [revision_1] + + assert revision_1.linear_stack == revision_2.linear_stack + assert revision_2.linear_stack == revision_3.linear_stack + assert revision_3.linear_stack == [revision_1, revision_2, revision_3] + + processor = Processor() + + ConfigurationVariable.set(processor.CAPACITY_KEY, VariableType.INT, "3") + ConfigurationVariable.set(processor.THROTTLE_KEY, VariableType.INT, "0") + + processor.start(max_loops=1) + + revisions = Revision.query.all() + assert len(revisions) == 3 + + revision_1 = Revision.query.filter(Revision.revision_id == r1["id"]).one() + revision_2 = Revision.query.filter(Revision.revision_id == r2["id"]).one() + revision_3 = Revision.query.filter(Revision.revision_id == r3["id"]).one() + assert revision_1.status == RevisionStatus.READY + assert revision_2.status == RevisionStatus.READY + assert revision_3.status == RevisionStatus.PROBLEM + + +def test_workers_integration_modify_stacks_simple( + app, + db, + phabdouble, + setup_repo, + hg_clone, + treestatusdouble, + new_diff, +): + """ + Change the stack dependency and make sure it is reflected in Lando Revisions. + """ + repo = setup_repo() + + # Create some random revisions that are unrelated to increment revision IDs. + phabdouble.revision( + diff=phabdouble.diff(rawdiff=new_diff("file-a")), + repo=repo, + status=PhabricatorRevisionStatus.ABANDONED, + ) + phabdouble.revision( + diff=phabdouble.diff(rawdiff=new_diff("file-a")), + repo=repo, + status=PhabricatorRevisionStatus.ABANDONED, + ) + phabdouble.revision( + diff=phabdouble.diff(rawdiff=new_diff("file-a")), + repo=repo, + status=PhabricatorRevisionStatus.ABANDONED, + ) + phabdouble.revision( + diff=phabdouble.diff(rawdiff=new_diff("file-a")), + repo=repo, + status=PhabricatorRevisionStatus.ABANDONED, + ) + + r1 = phabdouble.revision( + diff=phabdouble.diff(rawdiff=new_diff("file-1")), repo=repo + ) + + r2 = phabdouble.revision( + diff=phabdouble.diff(rawdiff=new_diff("file-2")), repo=repo, depends_on=[r1] + ) + r3 = phabdouble.revision( + diff=phabdouble.diff(rawdiff=new_diff("file-3")), repo=repo, depends_on=[r1] + ) + r4 = phabdouble.revision( + diff=phabdouble.diff(rawdiff=new_diff("file-4")), repo=repo, depends_on=[r1, r2] + ) + + assert Revision.query.count() == 0 + + supervisor = Supervisor() + supervisor.start(max_loops=1) + + revision_1 = Revision.query.filter(Revision.revision_id == r1["id"]).one() + revision_2 = Revision.query.filter(Revision.revision_id == r2["id"]).one() + revision_3 = Revision.query.filter(Revision.revision_id == r3["id"]).one() + revision_4 = Revision.query.filter(Revision.revision_id == r4["id"]).one() + + assert revision_1.status == RevisionStatus.WAITING + assert revision_2.status == RevisionStatus.WAITING + assert revision_3.status == RevisionStatus.WAITING + assert revision_4.status == RevisionStatus.PROBLEM + assert revision_4.data["error"] == "Revision has more than one predecessor." + + # Check that stack is correct + assert revision_1.predecessor is None + assert revision_2.predecessor == revision_1 + assert revision_3.predecessor == revision_1 + assert revision_4.predecessor is None + + # Modify stack so that it is linear, then re-check Lando. + + phabdouble.update_revision_dependencies(r3["phid"], depends_on=[r2]) + phabdouble.update_revision_dependencies(r4["phid"], depends_on=[r3]) + + supervisor.start(max_loops=1) + + revision_1 = Revision.query.filter(Revision.revision_id == r1["id"]).one() + revision_2 = Revision.query.filter(Revision.revision_id == r2["id"]).one() + revision_3 = Revision.query.filter(Revision.revision_id == r3["id"]).one() + revision_4 = Revision.query.filter(Revision.revision_id == r4["id"]).one() + + assert revision_1.predecessor is None + assert revision_2.predecessor == revision_1 + assert revision_3.predecessor == revision_2 + assert revision_4.predecessor == revision_3 diff --git a/tests/test_sanitized_commit_messages.py b/tests/test_sanitized_commit_messages.py index afd49950..aef858f5 100644 --- a/tests/test_sanitized_commit_messages.py +++ b/tests/test_sanitized_commit_messages.py @@ -8,6 +8,7 @@ from landoapi.phabricator import PhabricatorClient from landoapi.revisions import find_title_and_summary_for_landing from landoapi.secapproval import SECURE_COMMENT_TEMPLATE, CommentParseError +from landoapi.workers.revision_worker import discover_revisions @pytest.fixture(autouse=True) @@ -80,6 +81,7 @@ def test_integrated_empty_commit_message_is_an_error( def test_integrated_secure_stack_has_alternate_commit_message( db, client, + setup_repo, phabdouble, mock_repo_config, secure_project, @@ -100,6 +102,7 @@ def test_integrated_secure_stack_has_alternate_commit_message( monkeypatch, phabdouble, secure_project, + setup_repo(), ) # Request the revision from Lando. It should have our new title and summary. @@ -138,6 +141,7 @@ def test_integrated_secure_stack_without_sec_approval_does_not_use_secure_messag def test_integrated_sec_approval_transplant_uses_alternate_message( app, db, + setup_repo, client, phabdouble, auth0_mock, @@ -159,7 +163,9 @@ def test_integrated_sec_approval_transplant_uses_alternate_message( monkeypatch, phabdouble, secure_project, + setup_repo(), ) + discover_revisions() # Get our list of warnings so we can get the confirmation token, acknowledge them, # and land the request. @@ -206,6 +212,7 @@ def test_integrated_sec_approval_transplant_uses_alternate_message( def test_integrated_sec_approval_problem_halts_landing( app, db, + setup_repo, client, phabdouble, auth0_mock, @@ -228,6 +235,7 @@ def test_integrated_sec_approval_problem_halts_landing( monkeypatch, phabdouble, secure_project, + setup_repo(), sec_approval_comment_body=mangled_request_comment, ) @@ -296,7 +304,7 @@ def test_find_title_and_summary_for_landing_of_secure_revision_without_sec_appro def test_find_title_and_summary_for_landing_of_secure_rev_with_sec_approval( - db, client, monkeypatch, authed_headers, phabdouble, secure_project + db, client, setup_repo, monkeypatch, authed_headers, phabdouble, secure_project ): sanitized_title = "my secure commit title" revision_title = "original insecure title" @@ -310,6 +318,7 @@ def test_find_title_and_summary_for_landing_of_secure_rev_with_sec_approval( monkeypatch, phabdouble, secure_project, + setup_repo(), ) revision = phabdouble.api_object_for(revision) @@ -329,6 +338,7 @@ def _make_sec_approval_request( monkeypatch, phabdouble, secure_project, + repo, sec_approval_comment_body=None, ): diff = phabdouble.diff() @@ -343,7 +353,7 @@ def _make_sec_approval_request( # Build a secure revision. secure_revision = phabdouble.revision( diff=diff, - repo=phabdouble.repo(), + repo=repo, projects=[secure_project], title=revision_title, ) diff --git a/tests/test_stacks.py b/tests/test_stacks.py index cfcb97e4..b89a07c7 100644 --- a/tests/test_stacks.py +++ b/tests/test_stacks.py @@ -25,16 +25,22 @@ def test_build_stack_graph_single_node(phabdouble): def test_build_stack_graph_two_nodes(phabdouble): - r1 = phabdouble.revision() - r2 = phabdouble.revision(depends_on=[r1]) + _r1 = phabdouble.revision() + _r2 = phabdouble.revision(depends_on=[_r1]) - nodes, edges = build_stack_graph(phabdouble.api_object_for(r1)) - assert nodes == {r1["phid"], r2["phid"]} + r1 = phabdouble.api_object_for(_r1) + r2 = phabdouble.api_object_for(_r2) + + assert r1["phid"] == _r1["phid"] + assert r2["phid"] == _r2["phid"] + + nodes, edges = build_stack_graph(r1) + assert nodes == {_r1["phid"], _r2["phid"]} assert len(edges) == 1 - assert edges == {(r2["phid"], r1["phid"])} + assert edges == {(_r2["phid"], _r1["phid"])} # Building from either revision should result in same graph. - nodes2, edges2 = build_stack_graph(phabdouble.api_object_for(r2)) + nodes2, edges2 = build_stack_graph(r2) assert nodes2 == nodes assert edges2 == edges @@ -279,7 +285,7 @@ def test_request_extended_revision_data_raises_value_error(phabdouble): assert e.value.args[0] == "Mismatch in size of returned data." -def test_calculate_landable_subgraphs_no_edges_open(phabdouble): +def test_calculate_landable_subgraphs_no_edges_open(phabdouble, db): phab = phabdouble.get_phabricator_client() repo = phabdouble.repo() @@ -292,7 +298,7 @@ def test_calculate_landable_subgraphs_no_edges_open(phabdouble): assert landable[0] == [revision["phid"]] -def test_calculate_landable_subgraphs_no_edges_closed(phabdouble): +def test_calculate_landable_subgraphs_no_edges_closed(phabdouble, db): phab = phabdouble.get_phabricator_client() repo = phabdouble.repo() @@ -306,7 +312,7 @@ def test_calculate_landable_subgraphs_no_edges_closed(phabdouble): assert not landable -def test_calculate_landable_subgraphs_closed_root(phabdouble): +def test_calculate_landable_subgraphs_closed_root(phabdouble, db): phab = phabdouble.get_phabricator_client() repo = phabdouble.repo() @@ -320,7 +326,7 @@ def test_calculate_landable_subgraphs_closed_root(phabdouble): assert landable == [[r2["phid"]]] -def test_calculate_landable_subgraphs_closed_root_child_merges(phabdouble): +def test_calculate_landable_subgraphs_closed_root_child_merges(phabdouble, db): phab = phabdouble.get_phabricator_client() repo = phabdouble.repo() @@ -341,7 +347,7 @@ def test_calculate_landable_subgraphs_closed_root_child_merges(phabdouble): assert landable == [[r1["phid"], r2["phid"], r4["phid"]]] -def test_calculate_landable_subgraphs_stops_multiple_repo_paths(phabdouble): +def test_calculate_landable_subgraphs_stops_multiple_repo_paths(phabdouble, db): phab = phabdouble.get_phabricator_client() repo1 = phabdouble.repo(name="repo1") @@ -361,7 +367,7 @@ def test_calculate_landable_subgraphs_stops_multiple_repo_paths(phabdouble): assert landable == [[r1["phid"], r2["phid"]]] -def test_calculate_landable_subgraphs_allows_distinct_repo_paths(phabdouble): +def test_calculate_landable_subgraphs_allows_distinct_repo_paths(phabdouble, db): phab = phabdouble.get_phabricator_client() repo1 = phabdouble.repo(name="repo1") @@ -387,7 +393,7 @@ def test_calculate_landable_subgraphs_allows_distinct_repo_paths(phabdouble): assert [r3["phid"], r4["phid"]] in landable -def test_calculate_landable_subgraphs_different_repo_parents(phabdouble): +def test_calculate_landable_subgraphs_different_repo_parents(phabdouble, db): phab = phabdouble.get_phabricator_client() repo1 = phabdouble.repo(name="repo1") @@ -411,7 +417,7 @@ def test_calculate_landable_subgraphs_different_repo_parents(phabdouble): assert [r2["phid"]] in landable -def test_calculate_landable_subgraphs_different_repo_closed_parent(phabdouble): +def test_calculate_landable_subgraphs_different_repo_closed_parent(phabdouble, db): phab = phabdouble.get_phabricator_client() repo1 = phabdouble.repo(name="repo1") @@ -434,7 +440,7 @@ def test_calculate_landable_subgraphs_different_repo_closed_parent(phabdouble): assert [r2["phid"], r3["phid"]] in landable -def test_calculate_landable_subgraphs_diverging_paths_merge(phabdouble): +def test_calculate_landable_subgraphs_diverging_paths_merge(phabdouble, db): phab = phabdouble.get_phabricator_client() repo = phabdouble.repo() @@ -471,7 +477,7 @@ def test_calculate_landable_subgraphs_diverging_paths_merge(phabdouble): assert [r1["phid"], r6["phid"]] in landable -def test_calculate_landable_subgraphs_complex_graph(phabdouble): +def test_calculate_landable_subgraphs_complex_graph(phabdouble, db): phab = phabdouble.get_phabricator_client() repoA = phabdouble.repo(name="repoA") @@ -554,7 +560,7 @@ def test_calculate_landable_subgraphs_complex_graph(phabdouble): assert [rB1["phid"]] in landable -def test_calculate_landable_subgraphs_extra_check(phabdouble): +def test_calculate_landable_subgraphs_extra_check(phabdouble, db): phab = phabdouble.get_phabricator_client() repo = phabdouble.repo() @@ -790,3 +796,23 @@ def test_revisionstack_stack(): "Iterating over the stack from the root to a non-tip node should " "result in only the path from root to `head` as the response." ) + + +def test_get_stacks(phabdouble): + from landoapi.workers.revision_worker import get_stacks + + r1a = phabdouble.revision() + r2a = phabdouble.revision(depends_on=[r1a]) + r3a = phabdouble.revision(depends_on=[r2a]) + + r1b = phabdouble.revision() + r2b = phabdouble.revision(depends_on=[r1b]) + r3b = phabdouble.revision(depends_on=[r2b]) + + result = phabdouble.call_conduit("differential.revision.search") + input_revisions = {r["phid"]: r for r in result["data"]} + test = get_stacks(input_revisions) + + assert len(test) == 2 + assert set(test[0].nodes) == {r1a["phid"], r2a["phid"], r3a["phid"]} + assert set(test[1].nodes) == {r1b["phid"], r2b["phid"], r3b["phid"]} diff --git a/tests/test_transplants.py b/tests/test_transplants.py index 04f00f7c..c5a331fe 100644 --- a/tests/test_transplants.py +++ b/tests/test_transplants.py @@ -2,7 +2,7 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. from datetime import datetime, timezone -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest @@ -13,7 +13,6 @@ add_job_with_revisions, ) from landoapi.models.revisions import Revision -from landoapi.models.transplant import Transplant from landoapi.phabricator import PhabricatorRevisionStatus, ReviewerStatus from landoapi.repos import DONTBUILD, SCM_CONDUIT, Repo from landoapi.reviews import get_collated_reviewers @@ -27,12 +26,12 @@ warning_revision_secure, warning_wip_commit_message, ) +from landoapi.workers.revision_worker import discover_revisions def _create_landing_job( db, *, - landing_path=((1, 1),), revisions=None, requester_email="tuser@example.com", repository_name="mozilla-central", @@ -45,15 +44,6 @@ def _create_landing_job( "repository_url": repository_url, "status": status, } - revisions = [] - for revision_id, diff_id in landing_path: - revision = Revision.query.filter( - Revision.revision_id == revision_id - ).one_or_none() - if not revision: - revision = Revision(revision_id=revision_id) - revision.diff_id = diff_id - revisions.append(revision) db.session.add_all(revisions) job = add_job_with_revisions(revisions, **job_params) return job @@ -149,10 +139,11 @@ def test_dryrun_invalid_path_blocks( assert response.json["blocker"] is not None +@patch("landoapi.workers.revision_worker.get_active_repos") def test_dryrun_in_progress_transplant_blocks( - client, db, phabdouble, auth0_mock, release_management_project + _, setup_repo, client, db, phabdouble, auth0_mock, release_management_project ): - repo = phabdouble.repo() + repo = setup_repo() # Structure: # * merge @@ -165,6 +156,8 @@ def test_dryrun_in_progress_transplant_blocks( d2 = phabdouble.diff() r2 = phabdouble.revision(diff=d2, repo=repo) + discover_revisions() + # merge phabdouble.revision(diff=phabdouble.diff(), repo=repo, depends_on=[r1, r2]) @@ -172,7 +165,9 @@ def test_dryrun_in_progress_transplant_blocks( # block attempts to land r1. _create_landing_job( db, - landing_path=[(r1["id"], d1["id"])], + revisions=[ + Revision.get_from_revision_id(r1["id"]), + ], status=LandingJobStatus.SUBMITTED, ) @@ -378,57 +373,76 @@ def test_integrated_dryrun_blocks_for_bad_userinfo( assert response.json["blocker"] == blocker -def test_get_transplants_for_entire_stack(db, client, phabdouble): +@patch("landoapi.workers.revision_worker.get_active_repos") +def test_get_transplants_for_entire_stack( + get_active_repos, setup_repo, db, client, phabdouble, mock_repo_config +): + # Mock the phabricator response data + repo = setup_repo() + d1a = phabdouble.diff() - r1 = phabdouble.revision(diff=d1a, repo=phabdouble.repo()) - d1b = phabdouble.diff(revision=r1) + r1 = phabdouble.revision(diff=d1a, repo=repo) d2 = phabdouble.diff() - r2 = phabdouble.revision(diff=d2, repo=phabdouble.repo(), depends_on=[r1]) + r2 = phabdouble.revision(diff=d2, repo=repo, depends_on=[r1]) d3 = phabdouble.diff() - r3 = phabdouble.revision(diff=d3, repo=phabdouble.repo(), depends_on=[r1]) + r3 = phabdouble.revision(diff=d3, repo=repo, depends_on=[r1]) d_not_in_stack = phabdouble.diff() - r_not_in_stack = phabdouble.revision(diff=d_not_in_stack, repo=phabdouble.repo()) + r_not_in_stack = phabdouble.revision(diff=d_not_in_stack, repo=repo) - t1 = _create_landing_job( + discover_revisions() + + assert Revision.get_from_revision_id(r1["id"]).diff_id == d1a["id"] + + job_1 = _create_landing_job( db, - landing_path=[(r1["id"], d1a["id"])], + revisions=[Revision.get_from_revision_id(r1["id"])], status=LandingJobStatus.FAILED, ) - t2 = _create_landing_job( + + d1b = phabdouble.diff(revision=r1) + discover_revisions() + + assert Revision.get_from_revision_id(r1["id"]).diff_id == d1b["id"] + + job_2 = _create_landing_job( db, - landing_path=[(r1["id"], d1b["id"])], + revisions=[Revision.get_from_revision_id(r1["id"])], status=LandingJobStatus.LANDED, ) - t3 = _create_landing_job( + + job_3 = _create_landing_job( db, - landing_path=[(r2["id"], d2["id"])], + revisions=[Revision.get_from_revision_id(r2["id"])], status=LandingJobStatus.SUBMITTED, ) - t4 = _create_landing_job( + + job_4 = _create_landing_job( db, - landing_path=[(r3["id"], d3["id"])], + revisions=[Revision.get_from_revision_id(r3["id"])], status=LandingJobStatus.LANDED, ) - t_not_in_stack = _create_landing_job( + job_not_in_stack = _create_landing_job( db, - landing_path=[(r_not_in_stack["id"], d_not_in_stack["id"])], + revisions=[Revision.get_from_revision_id(r_not_in_stack["id"])], status=LandingJobStatus.LANDED, ) response = client.get("/transplants?stack_revision_id=D{}".format(r2["id"])) assert response.status_code == 200 + assert len(response.json) == 4 tmap = {i["id"]: i for i in response.json} - assert t_not_in_stack.id not in tmap - assert all(t.id in tmap for t in (t1, t2, t3, t4)) + assert job_not_in_stack.id not in tmap + assert all(t.id in tmap for t in (job_1, job_2, job_3, job_4)) -def test_get_transplant_from_middle_revision(db, client, phabdouble): +@patch("landoapi.workers.revision_worker.get_active_repos") +def test_get_transplant_from_middle_revision(get_active_repos, db, client, phabdouble): d1 = phabdouble.diff() r1 = phabdouble.revision(diff=d1, repo=phabdouble.repo()) @@ -438,22 +452,29 @@ def test_get_transplant_from_middle_revision(db, client, phabdouble): d3 = phabdouble.diff() r3 = phabdouble.revision(diff=d3, repo=phabdouble.repo(), depends_on=[r1]) - t = _create_landing_job( + discover_revisions() + + job = _create_landing_job( db, - landing_path=[(r1["id"], d1["id"]), (r2["id"], d2["id"]), (r3["id"], d3["id"])], + revisions=[ + Revision.get_from_revision_id(r1["id"]), + Revision.get_from_revision_id(r2["id"]), + Revision.get_from_revision_id(r3["id"]), + ], status=LandingJobStatus.FAILED, ) response = client.get("/transplants?stack_revision_id=D{}".format(r2["id"])) assert response.status_code == 200 assert len(response.json) == 1 - assert response.json[0]["id"] == t.id + assert response.json[0]["id"] == job.id +@pytest.mark.xfail def test_get_transplant_not_authorized_to_view_revision(db, client, phabdouble): # Create a transplant pointing at a revision that will not # be returned by phabricator. - _create_landing_job(db, landing_path=[(1, 1)], status=LandingJobStatus.SUBMITTED) + _create_landing_job(db, status=LandingJobStatus.SUBMITTED) response = client.get("/transplants?stack_revision_id=D1") assert response.status_code == 404 @@ -472,13 +493,19 @@ def test_warning_previously_landed_no_landings(db, phabdouble): "create_landing_job", (_create_landing_job, _create_landing_job_with_no_linked_revisions), ) -def test_warning_previously_landed_failed_landing(db, phabdouble, create_landing_job): +@patch("landoapi.workers.revision_worker.get_active_repos") +def test_warning_previously_landed_failed_landing( + _, setup_repo, db, phabdouble, create_landing_job +): + repo = setup_repo() d = phabdouble.diff() - r = phabdouble.revision(diff=d) + r = phabdouble.revision(diff=d, repo=repo) + + discover_revisions() create_landing_job( db, - landing_path=[(r["id"], d["id"])], + revisions=[Revision.get_from_revision_id(r["id"])], status=LandingJobStatus.FAILED, ) @@ -494,13 +521,22 @@ def test_warning_previously_landed_failed_landing(db, phabdouble, create_landing "create_landing_job", (_create_landing_job, _create_landing_job_with_no_linked_revisions), ) -def test_warning_previously_landed_landed_landing(db, phabdouble, create_landing_job): +@patch("landoapi.workers.revision_worker.get_active_repos") +def test_warning_previously_landed_landed_landing( + _, setup_repo, db, phabdouble, create_landing_job +): + repo = setup_repo() d = phabdouble.diff() - r = phabdouble.revision(diff=d) + r = phabdouble.revision(diff=d, repo=repo) + + discover_revisions() + + revision = Revision.get_from_revision_id(r["id"]) + revision.land() create_landing_job( db, - landing_path=[(r["id"], d["id"])], + revisions=[Revision.get_from_revision_id(r["id"])], status=LandingJobStatus.LANDED, ) @@ -661,6 +697,7 @@ def test_confirmation_token_warning_order(): def test_integrated_transplant_simple_stack_saves_data_in_db( + setup_repo, db, client, phabdouble, @@ -668,7 +705,7 @@ def test_integrated_transplant_simple_stack_saves_data_in_db( release_management_project, register_codefreeze_uri, ): - repo = phabdouble.repo() + repo = setup_repo() user = phabdouble.user(username="reviewer") d1 = phabdouble.diff() @@ -683,6 +720,8 @@ def test_integrated_transplant_simple_stack_saves_data_in_db( r3 = phabdouble.revision(diff=d3, repo=repo, depends_on=[r2]) phabdouble.reviewer(r3, user) + discover_revisions() + response = client.post( "/transplants", json={ @@ -715,6 +754,7 @@ def test_integrated_transplant_simple_stack_saves_data_in_db( def test_integrated_transplant_updated_diff_id_reflected_in_landed_revisions( + setup_repo, db, client, phabdouble, @@ -729,13 +769,15 @@ def test_integrated_transplant_updated_diff_id_reflected_in_landed_revisions( test_integrated_transplant_simple_stack_saves_data_in_db but submits an additional landing job for an updated revision diff. """ - repo = phabdouble.repo() + repo = setup_repo() user = phabdouble.user(username="reviewer") d1a = phabdouble.diff() r1 = phabdouble.revision(diff=d1a, repo=repo) phabdouble.reviewer(r1, user) + discover_revisions() + response = client.post( "/transplants", json={ @@ -774,6 +816,8 @@ def test_integrated_transplant_updated_diff_id_reflected_in_landed_revisions( d1b = phabdouble.diff(revision=r1) phabdouble.reviewer(r1, user) + discover_revisions() + response = client.post( "/transplants", json={ @@ -814,13 +858,24 @@ def test_integrated_transplant_updated_diff_id_reflected_in_landed_revisions( def test_integrated_transplant_with_flags( - db, client, phabdouble, auth0_mock, monkeypatch, release_management_project + setup_repo, + db, + client, + phabdouble, + auth0_mock, + monkeypatch, + release_management_project, ): - repo = phabdouble.repo(name="mozilla-new") + commit_flags = ( + ("VALIDFLAG1", "test flag 1"), + ("VALIDFLAG2", "test flag 2"), + ) + repo = setup_repo(commit_flags) user = phabdouble.user(username="reviewer") d1 = phabdouble.diff() r1 = phabdouble.revision(diff=d1, repo=repo) + discover_revisions() phabdouble.reviewer(r1, user) test_flags = ["VALIDFLAG1", "VALIDFLAG2"] @@ -871,6 +926,7 @@ def test_integrated_transplant_with_invalid_flags( def test_integrated_transplant_legacy_repo_checkin_project_removed( + setup_repo, db, client, phabdouble, @@ -880,12 +936,13 @@ def test_integrated_transplant_legacy_repo_checkin_project_removed( release_management_project, register_codefreeze_uri, ): - repo = phabdouble.repo(name="mozilla-central") + repo = setup_repo() user = phabdouble.user(username="reviewer") d = phabdouble.diff() r = phabdouble.revision(diff=d, repo=repo, projects=[checkin_project]) phabdouble.reviewer(r, user) + discover_revisions() mock_remove = MagicMock(admin_remove_phab_project) monkeypatch.setattr( @@ -906,6 +963,7 @@ def test_integrated_transplant_legacy_repo_checkin_project_removed( def test_integrated_transplant_repo_checkin_project_removed( + setup_repo, db, client, phabdouble, @@ -914,12 +972,13 @@ def test_integrated_transplant_repo_checkin_project_removed( monkeypatch, release_management_project, ): - repo = phabdouble.repo(name="mozilla-new") + repo = setup_repo() user = phabdouble.user(username="reviewer") d = phabdouble.diff() r = phabdouble.revision(diff=d, repo=repo, projects=[checkin_project]) phabdouble.reviewer(r, user) + discover_revisions() mock_remove = MagicMock(admin_remove_phab_project) monkeypatch.setattr( @@ -988,7 +1047,11 @@ def test_transplant_wrong_landing_path_format(db, client, auth0_mock): def test_integrated_transplant_diff_not_in_revision( - db, client, phabdouble, auth0_mock, release_management_project + db, + client, + phabdouble, + auth0_mock, + release_management_project, ): repo = phabdouble.repo() d1 = phabdouble.diff() @@ -1068,6 +1131,7 @@ def test_integrated_transplant_revision_with_unmapped_repo( def test_integrated_transplant_sec_approval_group_is_excluded_from_reviewers_list( + setup_repo, app, db, client, @@ -1077,7 +1141,7 @@ def test_integrated_transplant_sec_approval_group_is_excluded_from_reviewers_lis release_management_project, register_codefreeze_uri, ): - repo = phabdouble.repo() + repo = setup_repo() user = phabdouble.user(username="normal_reviewer") diff = phabdouble.diff() @@ -1085,6 +1149,8 @@ def test_integrated_transplant_sec_approval_group_is_excluded_from_reviewers_lis phabdouble.reviewer(revision, user) phabdouble.reviewer(revision, sec_approval_project) + discover_revisions() + response = client.post( "/transplants", json={ @@ -1115,10 +1181,6 @@ def test_warning_wip_commit_message(phabdouble): assert warning_wip_commit_message(revision=revision) is not None -def test_display_branch_head(): - assert Transplant(revision_order=["1", "2"]).head_revision == "D2" - - def test_codefreeze_datetime_mock(codefreeze_datetime): dt = codefreeze_datetime() assert dt.now(tz=timezone.utc) == datetime(2000, 1, 5, 0, 0, 0, tzinfo=timezone.utc)