Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workers: revision worker implementation #224

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ RUN cd / && pip install --no-cache /app
ENV PYTHONPATH /app
RUN chown -R app:app /app

# Create repos directory for transplanting in landing-worker
# Create repos directory for landing-worker and revision worker.
RUN mkdir /repos
RUN chown -R app:app /repos

# Run as a non-privileged user
USER app
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile-dev
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ENV PYTHONUNBUFFERED=1
ENV FLASK_RUN_PORT=9000
ENV FLASK_RUN_HOST=0.0.0.0
ENV FLASK_DEBUG=1
ENV HTTP_ALLOWED=1

ENTRYPOINT ["lando-cli"]
CMD ["run"]
Expand Down Expand Up @@ -48,6 +49,7 @@ RUN cd / && pip install --no-cache /app
ENV PYTHONPATH /app
RUN chown -R app:app /app

# Create repos directory for landing worker and revision worker.
RUN mkdir /repos
RUN chown -R app:app /repos

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ services:
- smtp
lando-api.landing-worker:
image: lando-api
command: ["landing-worker"]
command: ["start-landing-worker"]
environment:
- ENV=localdev
- DATABASE_URL=postgresql://postgres:[email protected]/lando_api_dev
Expand Down
3 changes: 3 additions & 0 deletions landoapi/api/landing_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def put(landing_job_id: str, data: dict):

if landing_job.status in (LandingJobStatus.SUBMITTED, LandingJobStatus.DEFERRED):
landing_job.transition_status(LandingJobAction.CANCEL)
for revision in landing_job.revisions:
# Unlock patches so they can be modified in the future.
revision.patch_locked = False
db.session.commit()
return {"id": landing_job.id}, 200
else:
Expand Down
6 changes: 2 additions & 4 deletions landoapi/api/transplants.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def post(phab: PhabricatorClient, data: dict):
}

raw_diff = phab.call_conduit("differential.getrawdiff", diffID=diff["id"])
lando_revision.set_patch(raw_diff, patch_data)
lando_revision.set_patch(raw_diff, patch_data, final=True)
db.session.commit()
lando_revisions.append(lando_revision)

Expand Down Expand Up @@ -445,11 +445,9 @@ def get_list(phab: PhabricatorClient, stack_revision_id: str):
limit=len(revision_phids),
)

# Return both transplants and landing jobs, since for repos that were switched
# both or either of these could be populated.

rev_ids = [phab.expect(r, "id") for r in phab.expect(revs, "data")]

# Find landing jobs based on related revisions or legacy revision_to_diff_id field.
landing_jobs = LandingJob.revisions_query(rev_ids).all()

return [job.serialize() for job in landing_jobs], 200
48 changes: 46 additions & 2 deletions landoapi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,64 @@ def worker(celery_arguments):
celery.worker_main((sys.argv[0],) + celery_arguments)


@cli.command(name="landing-worker")
def landing_worker():
@cli.command(name="start-landing-worker")
def start_landing_worker():
from landoapi.app import auth0_subsystem, lando_ui_subsystem
from landoapi.workers.landing_worker import LandingWorker

exclusions = [auth0_subsystem, lando_ui_subsystem]
for system in get_subsystems(exclude=exclusions):
system.ensure_ready()

worker = LandingWorker()
ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "0")
worker.start()


@cli.command(name="stop-landing-worker")
def stop_landing_worker():
from landoapi.storage import db_subsystem
from landoapi.workers.landing_worker import LandingWorker

db_subsystem.ensure_ready()
worker = LandingWorker()
ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "1")


@cli.command(name="start-revision-worker")
@click.argument("role")
def start_revision_worker(role):
from landoapi.app import auth0_subsystem, lando_ui_subsystem
from landoapi.workers.revision_worker import Processor, Supervisor

roles = {
"processor": Processor,
"supervisor": Supervisor,
}

if role not in roles:
raise ValueError(f"Unknown worker role specified ({role}).")

exclusions = [auth0_subsystem, lando_ui_subsystem]
for system in get_subsystems(exclude=exclusions):
system.ensure_ready()

worker = roles[role]()
ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "0")
worker.start()


@cli.command(name="stop-revision-worker")
def stop_revision_worker():
"""Stops all revision workers (supervisor and processors)."""
from landoapi.storage import db_subsystem
from landoapi.workers.revision_worker import RevisionWorker

db_subsystem.ensure_ready()
worker = RevisionWorker()
ConfigurationVariable.set(worker.STOP_KEY, VariableType.BOOL, "1")


@cli.command(name="run-pre-deploy-sequence")
def run_pre_deploy_sequence():
"""Runs the sequence of commands required before a deployment."""
Expand Down
10 changes: 10 additions & 0 deletions landoapi/hg.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,3 +649,13 @@ def read_checkout_file(self, path: str) -> str:

with checkout_file_path.open() as f:
return f.read()

def has_incoming(self, source: str) -> bool:
"""Check if there are any incoming changes from the remote repo."""
try:
self.run_hg(["incoming", source, "--limit", "1"])
except hglib.error.CommandError as e:
if b"no changes found" not in e.out:
logger.error(e)
return False
return True
3 changes: 3 additions & 0 deletions landoapi/models/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class ConfigurationKey(enum.Enum):

LANDING_WORKER_PAUSED = "LANDING_WORKER_PAUSED"
LANDING_WORKER_STOPPED = "LANDING_WORKER_STOPPED"
REVISION_WORKER_PAUSED = "REVISION_WORKER_PAUSED"
REVISION_WORKER_STOPPED = "REVISION_WORKER_STOPPED"
REVISION_WORKER_CAPACITY = "REVISION_WORKER_CAPACITY"
API_IN_MAINTENANCE = "API_IN_MAINTENANCE"
WORKER_THROTTLE_SECONDS = "WORKER_THROTTLE_SECONDS"

Expand Down
33 changes: 31 additions & 2 deletions landoapi/models/landing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from sqlalchemy.dialects.postgresql.json import JSONB

from landoapi.models.base import Base
from landoapi.models.revisions import Revision, revision_landing_job
from landoapi.models.revisions import Revision, RevisionStatus, revision_landing_job
from landoapi.storage import db

logger = logging.getLogger(__name__)
Expand All @@ -36,7 +36,7 @@ class LandingJobStatus(enum.Enum):
column of `LandingJob`.
"""

# Initial creation state.
# Ready to be picked up state.
SUBMITTED = "SUBMITTED"

# Actively being processed.
Expand Down Expand Up @@ -272,6 +272,14 @@ def set_landed_revision_diffs(self):
.values(diff_id=revision.diff_id)
)

def has_non_ready_revisions(self) -> bool:
"""Return whether any of the revisions are in a non-ready state or not."""
return bool(
{r.status for r in self.revisions}.intersection(
RevisionStatus.NON_READY_STATES
)
)

def transition_status(
self,
action: LandingJobAction,
Expand Down Expand Up @@ -321,21 +329,42 @@ def transition_status(

self.status = actions[action]["status"]

if action == LandingJobAction.CANCEL:
self.ready_revisions()

if action in (LandingJobAction.FAIL, LandingJobAction.DEFER):
self.error = kwargs["message"]
self.fail_revisions()

if action == LandingJobAction.LAND:
self.landed_commit_id = kwargs["commit_id"]
self.land_revisions()

if commit:
db.session.commit()

def fail_revisions(self):
"""Mark all revisions in landing jobs as failed."""
for revision in self.revisions:
revision.fail()

def land_revisions(self):
"""Mark all revisions in landing jobs as landed."""
for revision in self.revisions:
revision.land()

def ready_revisions(self):
"""Mark all revisions in landing jobs as ready."""
for revision in self.revisions:
revision.ready()

def serialize(self) -> dict[str, Any]:
"""Return a JSON compatible dictionary."""
return {
"id": self.id,
"status": self.status.value,
"landing_path": self.serialized_landing_path,
"duration_seconds": self.duration_seconds,
"error_breakdown": self.error_breakdown,
"details": (
self.error or self.landed_commit_id
Expand Down
Loading