Skip to content

Commit

Permalink
workers: revision worker implementation
Browse files Browse the repository at this point in the history
PREVIOUS versions (most recent first):
- 8e73e5a
- 403e760
- 262f7a5

WIP DO NOT MERGE
Commit message TBD

- add main worker flag and capacity/throttle flags
- add method to parse diff and list affected files
- add more test coverage for revision_worker.py
- add mots integration (bug 1740107)
- add new RevisionWorker that pre-processes revisions (bug 1788728)
- add new start/stop commands to manage workers
- add new flags to stop workers gracefully (*_WORKER_STOPPED)
- add repo.use_revision_worker feature flag (bug 1788732)
- add mots hashes check
- include new Lando revision info via API endpoint
- refactor dependency and stack fetching and parsing using networkx
- rename old command lando-cli landing-worker to lando-cli start-landing-worker
- run pre/post mots query
- store mots output in revision model

TODO:
- detect stack change on page load
- add tests for new warnings
  • Loading branch information
zzzeid committed May 29, 2023
1 parent e84e171 commit 00ba1c3
Show file tree
Hide file tree
Showing 33 changed files with 1,760 additions and 167 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[flake8]
max-line-length = 88
select = C,E,F,W,B,B9
ignore = E203, E501, W503, B006
ignore = E203, E501, W503, B006, E712, E711
exclude =
.hg,
.git,
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ RUN cd / && pip install --no-cache /app
ENV PYTHONPATH /app
RUN chown -R app:app /app

# Create repos directory for transplanting in landing-worker
# Create repos directory for landing-worker and revision worker.
RUN mkdir /repos
RUN chown -R app:app /repos

# Run as a non-privileged user
USER app
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile-dev
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ENV PYTHONUNBUFFERED=1
ENV FLASK_RUN_PORT=9000
ENV FLASK_RUN_HOST=0.0.0.0
ENV FLASK_DEBUG=1
ENV HTTP_ALLOWED=1

ENTRYPOINT ["lando-cli"]
CMD ["run"]
Expand Down Expand Up @@ -48,6 +49,7 @@ RUN cd / && pip install --no-cache /app
ENV PYTHONPATH /app
RUN chown -R app:app /app

# Create repos directory for landing worker and revision worker.
RUN mkdir /repos
RUN chown -R app:app /repos

Expand Down
24 changes: 12 additions & 12 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,24 @@ services:
- smtp
lando-api.landing-worker:
image: lando-api
command: ["landing-worker"]
command: ["start-landing-worker"]
environment:
- ENV=localdev
- DATABASE_URL=postgresql://postgres:[email protected]/lando_api_dev
- SENTRY_DSN=
# See http://docs.celeryproject.org/en/stable/getting-started/brokers/redis.html#configuration
# for the full URL format.
- CELERY_BROKER_URL=redis://redis.queue/0
- OIDC_IDENTIFIER=https://lando-api.test
- OIDC_DOMAIN=https://auth0.test
- LANDO_UI_URL=https://lando.test
- REPO_CLONES_PATH=/repos
- REPOS_TO_LAND=localdev
CELERY_BROKER_URL: "redis://redis.queue/0"
DATABASE_URL: "postgresql://postgres:[email protected]/lando_api_dev"
ENV: "localdev"
LANDO_UI_URL: "https://lando.test"
OIDC_DOMAIN: "https://auth0.test"
OIDC_IDENTIFIER: "https://lando-api.test"
REPOS_TO_LAND: "localdev"
REPO_CLONES_PATH: "/repos"
SENTRY_DSN: ""
user: root
volumes:
- ./:/app
- ./migrations/:/migrations/
# Prevent writing python cache to the host.
- caches_cache:/app/.cache/
- repos:/repos
depends_on:
- lando-api.db
- redis.queue
Expand Down Expand Up @@ -157,3 +156,4 @@ volumes:
caches_pycache:
caches_cache:
caches_pytest_cache:
repos:
3 changes: 3 additions & 0 deletions landoapi/api/landing_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def put(landing_job_id: str, data: dict):

if landing_job.status in (LandingJobStatus.SUBMITTED, LandingJobStatus.DEFERRED):
landing_job.transition_status(LandingJobAction.CANCEL)
for revision in landing_job.revisions:
# Unlock patches so they can be modified in the future.
revision.patch_locked = False
db.session.commit()
return {"id": landing_job.id}, 200
else:
Expand Down
30 changes: 20 additions & 10 deletions landoapi/api/stacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from flask import current_app
from landoapi.commit_message import format_commit_message
from landoapi.decorators import require_phabricator_api_key
from landoapi.models.revisions import Revision
from landoapi.phabricator import PhabricatorClient
from landoapi.projects import (
get_release_managers,
Expand Down Expand Up @@ -116,19 +117,25 @@ def get(phab: PhabricatorClient, revision_id: str):
}

revisions_response = []
for revision_phid, revision in stack_data.revisions.items():
fields = PhabricatorClient.expect(revision, "fields")
for _phid, phab_revision in stack_data.revisions.items():
lando_revision = Revision.query.filter(
Revision.revision_id == phab_revision["id"]
).one_or_none()
revision_phid = PhabricatorClient.expect(phab_revision, "phid")
fields = PhabricatorClient.expect(phab_revision, "fields")
diff_phid = PhabricatorClient.expect(fields, "diffPHID")
repo_phid = PhabricatorClient.expect(fields, "repositoryPHID")
diff = stack_data.diffs[diff_phid]
human_revision_id = "D{}".format(PhabricatorClient.expect(revision, "id"))
human_revision_id = "D{}".format(PhabricatorClient.expect(phab_revision, "id"))
revision_url = urllib.parse.urljoin(
current_app.config["PHABRICATOR_URL"], human_revision_id
)
secure = revision_is_secure(revision, secure_project_phid)
commit_description = find_title_and_summary_for_display(phab, revision, secure)
bug_id = get_bugzilla_bug(revision)
reviewers = get_collated_reviewers(revision)
secure = revision_is_secure(phab_revision, secure_project_phid)
commit_description = find_title_and_summary_for_display(
phab, phab_revision, secure
)
bug_id = get_bugzilla_bug(phab_revision)
reviewers = get_collated_reviewers(phab_revision)
accepted_reviewers = reviewers_for_commit_message(
reviewers, users, projects, sec_approval_project_phid
)
Expand Down Expand Up @@ -163,16 +170,16 @@ def get(phab: PhabricatorClient, revision_id: str):
{
"id": human_revision_id,
"phid": revision_phid,
"status": serialize_status(revision),
"status": serialize_status(phab_revision),
"blocked_reason": blocked.get(revision_phid, ""),
"bug_id": bug_id,
"title": commit_description.title,
"url": revision_url,
"date_created": PhabricatorClient.to_datetime(
PhabricatorClient.expect(revision, "fields", "dateCreated")
PhabricatorClient.expect(phab_revision, "fields", "dateCreated")
).isoformat(),
"date_modified": PhabricatorClient.to_datetime(
PhabricatorClient.expect(revision, "fields", "dateModified")
PhabricatorClient.expect(phab_revision, "fields", "dateModified")
).isoformat(),
"summary": commit_description.summary,
"commit_message_title": commit_message_title,
Expand All @@ -183,6 +190,9 @@ def get(phab: PhabricatorClient, revision_id: str):
"reviewers": serialize_reviewers(reviewers, users, projects, diff_phid),
"is_secure": secure,
"is_using_secure_commit_message": commit_description.sanitized,
"lando_revision": lando_revision.serialize()
if lando_revision
else None,
}
)

Expand Down
6 changes: 2 additions & 4 deletions landoapi/api/transplants.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ def post(phab: PhabricatorClient, data: dict):
}

raw_diff = phab.call_conduit("differential.getrawdiff", diffID=diff["id"])
lando_revision.set_patch(raw_diff, patch_data)
lando_revision.set_patch(raw_diff, patch_data, final=True)
db.session.commit()
lando_revisions.append(lando_revision)

Expand Down Expand Up @@ -446,11 +446,9 @@ def get_list(phab: PhabricatorClient, stack_revision_id: str):
limit=len(revision_phids),
)

# Return both transplants and landing jobs, since for repos that were switched
# both or either of these could be populated.

rev_ids = [phab.expect(r, "id") for r in phab.expect(revs, "data")]

# Find landing jobs based on related revisions or legacy revision_to_diff_id field.
landing_jobs = LandingJob.revisions_query(rev_ids).all()

return [job.serialize() for job in landing_jobs], 200
50 changes: 47 additions & 3 deletions landoapi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,64 @@ def worker(celery_arguments):
celery.worker_main((sys.argv[0],) + celery_arguments)


@cli.command(name="landing-worker")
def landing_worker():
@cli.command(name="start-landing-worker")
def start_landing_worker():
from landoapi.app import auth0_subsystem, lando_ui_subsystem
from landoapi.workers.landing_worker import LandingWorker

exclusions = [auth0_subsystem, lando_ui_subsystem]
for system in get_subsystems(exclude=exclusions):
system.ensure_ready()

from landoapi.workers.landing_worker import LandingWorker
ConfigurationVariable.set(LandingWorker.STOP_KEY, VariableType.BOOL, "0")

worker = LandingWorker()
worker.start()


@cli.command(name="stop-landing-worker")
def stop_landing_worker():
from landoapi.workers.landing_worker import LandingWorker
from landoapi.storage import db_subsystem

db_subsystem.ensure_ready()
ConfigurationVariable.set(LandingWorker.STOP_KEY, VariableType.BOOL, "1")


@cli.command(name="start-revision-worker")
@click.argument("role")
def start_revision_worker(role):
from landoapi.app import auth0_subsystem, lando_ui_subsystem
from landoapi.workers.revision_worker import RevisionWorker, Supervisor, Processor

roles = {
"processor": Processor,
"supervisor": Supervisor,
}

if role not in roles:
raise ValueError(f"Unknown worker role specified ({role}).")

exclusions = [auth0_subsystem, lando_ui_subsystem]
for system in get_subsystems(exclude=exclusions):
system.ensure_ready()

ConfigurationVariable.set(RevisionWorker.STOP_KEY, VariableType.BOOL, "0")

worker = roles[role]()
worker.start()


@cli.command(name="stop-revision-worker")
def stop_revision_worker():
"""Stops all revision workers (supervisor and processors)."""
from landoapi.workers.revision_worker import RevisionWorker
from landoapi.storage import db_subsystem

db_subsystem.ensure_ready()
RevisionWorker.stop()


@cli.command(name="run-pre-deploy-sequence")
def run_pre_deploy_sequence():
"""Runs the sequence of commands required before a deployment."""
Expand Down
5 changes: 4 additions & 1 deletion landoapi/commit_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
IRC_NICK = r"[a-zA-Z0-9\-\_.]*[a-zA-Z0-9\-\_]+"

# fmt: off
REVIEWERS_RE = re.compile( # noqa: E131
REVIEWERS_RE = re.compile(
r"([\s\(\.\[;,])" # before "r" delimiter
+ r"(" + SPECIFIER + r")" # flag
+ r"(" # capture all reviewers
Expand Down Expand Up @@ -209,3 +209,6 @@ def bug_list_to_commit_string(bug_ids: Iterable[str]) -> str:
return "No bug"

return f"Bug {', '.join(sorted(set(bug_ids)))}"


# flake8: noqa: E131
12 changes: 11 additions & 1 deletion landoapi/hg.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import copy
import configparser
import copy
import logging
import os
import shlex
Expand Down Expand Up @@ -650,3 +650,13 @@ def read_checkout_file(self, path: str) -> str:

with checkout_file_path.open() as f:
return f.read()

def has_incoming(self, source: str) -> bool:
"""Check if there are any incoming changes from the remote repo."""
try:
self.run_hg(["incoming", source, "--limit", "1"])
except hglib.error.CommandError as e:
if b"no changes found" not in e.out:
logger.error(e)
return False
return True
3 changes: 1 addition & 2 deletions landoapi/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from landoapi.models.landing_job import LandingJob
from landoapi.models.revisions import Revision
from landoapi.models.secapproval import SecApprovalRequest
from landoapi.models.transplant import Transplant
from landoapi.models.configuration import ConfigurationVariable
from landoapi.models.revisions import DiffWarning
from landoapi.models.revisions import DiffWarning, Revision

__all__ = [
"LandingJob",
Expand Down
3 changes: 3 additions & 0 deletions landoapi/models/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class ConfigurationKey(enum.Enum):

LANDING_WORKER_PAUSED = "LANDING_WORKER_PAUSED"
LANDING_WORKER_STOPPED = "LANDING_WORKER_STOPPED"
REVISION_WORKER_PAUSED = "REVISION_WORKER_PAUSED"
REVISION_WORKER_STOPPED = "REVISION_WORKER_STOPPED"
REVISION_WORKER_CAPACITY = "REVISION_WORKER_CAPACITY"
API_IN_MAINTENANCE = "API_IN_MAINTENANCE"
WORKER_THROTTLE_SECONDS = "WORKER_THROTTLE_SECONDS"

Expand Down
33 changes: 31 additions & 2 deletions landoapi/models/landing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from sqlalchemy.dialects.postgresql.json import JSONB

from landoapi.models.base import Base
from landoapi.models.revisions import Revision, revision_landing_job
from landoapi.models.revisions import Revision, RevisionStatus, revision_landing_job
from landoapi.storage import db

logger = logging.getLogger(__name__)
Expand All @@ -38,7 +38,7 @@ class LandingJobStatus(enum.Enum):
column of `LandingJob`.
"""

# Initial creation state.
# Ready to be picked up state.
SUBMITTED = "SUBMITTED"

# Actively being processed.
Expand Down Expand Up @@ -274,6 +274,14 @@ def set_landed_revision_diffs(self):
.values(diff_id=revision.diff_id)
)

def has_non_ready_revisions(self) -> bool:
"""Return whether any of the revisions are in a non-ready state or not."""
return bool(
set(r.status for r in self.revisions).intersection(
RevisionStatus.NON_READY_STATES
)
)

def transition_status(
self,
action: LandingJobAction,
Expand Down Expand Up @@ -323,21 +331,42 @@ def transition_status(

self.status = actions[action]["status"]

if action == LandingJobAction.CANCEL:
self.ready_revisions()

if action in (LandingJobAction.FAIL, LandingJobAction.DEFER):
self.error = kwargs["message"]
self.fail_revisions()

if action == LandingJobAction.LAND:
self.landed_commit_id = kwargs["commit_id"]
self.land_revisions()

if commit:
db.session.commit()

def fail_revisions(self):
"""Mark all revisions in landing jobs as failed."""
for revision in self.revisions:
revision.fail()

def land_revisions(self):
"""Mark all revisions in landing jobs as landed."""
for revision in self.revisions:
revision.land()

def ready_revisions(self):
"""Mark all revisions in landing jobs as ready."""
for revision in self.revisions:
revision.ready()

def serialize(self) -> dict[str, Any]:
"""Return a JSON compatible dictionary."""
return {
"id": self.id,
"status": self.status.value,
"landing_path": self.serialized_landing_path,
"duration_seconds": self.duration_seconds,
"error_breakdown": self.error_breakdown,
"details": (
self.error or self.landed_commit_id
Expand Down
Loading

0 comments on commit 00ba1c3

Please sign in to comment.