From 629709fe90868ff5c677f1bd8508fa68d742964c Mon Sep 17 00:00:00 2001 From: Kurt Wheeler Date: Wed, 10 May 2017 10:30:09 -0400 Subject: [PATCH] Link up processor (#10) Queues up a processor job after finishing a downloader job and adds needed processor discovery code. --- .../surveyor/array_express.py | 7 ++-- .../surveyor/external_source.py | 13 +++---- .../surveyor/message_queue.py | 24 +++++++++++++ .../downloaders/utils.py | 36 ++++++++++++++++--- .../processors/array_express.py | 2 +- .../processors/processor_registry.py | 11 ++++++ .../data_refinery_workers/processors/utils.py | 6 ++-- workers/environment.yml | 1 + .../{process_cel_to_pcl.R => cel_to_pcl.R} | 0 9 files changed, 81 insertions(+), 19 deletions(-) create mode 100644 foreman/data_refinery_foreman/surveyor/message_queue.py create mode 100644 workers/data_refinery_workers/processors/processor_registry.py rename workers/r_processors/{process_cel_to_pcl.R => cel_to_pcl.R} (100%) diff --git a/foreman/data_refinery_foreman/surveyor/array_express.py b/foreman/data_refinery_foreman/surveyor/array_express.py index 475cd170a..5cebee15d 100644 --- a/foreman/data_refinery_foreman/surveyor/array_express.py +++ b/foreman/data_refinery_foreman/surveyor/array_express.py @@ -10,8 +10,6 @@ ExternalSourceSurveyor, ProcessorPipeline ) -from data_refinery_workers.downloaders.array_express \ - import download_array_express # Import and set logger import logging @@ -22,17 +20,18 @@ class ArrayExpressSurveyor(ExternalSourceSurveyor): # Files API endpoint for ArrayExpress FILES_URL = "http://www.ebi.ac.uk/arrayexpress/json/v2/files" + DOWNLOADER_TASK = "data_refinery_workers.downloaders.array_express.download_array_express" def source_type(self): return "ARRAY_EXPRESS" def downloader_task(self): - return download_array_express + return self.DOWNLOADER_TASK def determine_pipeline(self, batch: Batch, key_values: List[BatchKeyValue] = []): - return ProcessorPipeline.MICRO_ARRAY_TO_PCL + return ProcessorPipeline.AFFY_TO_PCL def survey(self, survey_job: SurveyJob): accession_code = (SurveyJobKeyValue diff --git a/foreman/data_refinery_foreman/surveyor/external_source.py b/foreman/data_refinery_foreman/surveyor/external_source.py index bb31a675f..62bca47cb 100644 --- a/foreman/data_refinery_foreman/surveyor/external_source.py +++ b/foreman/data_refinery_foreman/surveyor/external_source.py @@ -31,7 +31,7 @@ class PipelineEnums(Enum): class ProcessorPipeline(PipelineEnums): """Pipelines which perform some kind of processing on the data.""" - MICRO_ARRAY_TO_PCL = "MICRO_ARRAY_TO_PCL" + AFFY_TO_PCL = "AFFY_TO_PCL" class DiscoveryPipeline(PipelineEnums): @@ -52,9 +52,9 @@ def source_type(self): @abc.abstractproperty def downloader_task(self): - """This property should return the Celery Downloader Task from the - data_refinery_workers project which should be queued to download - Batches discovered by this surveyor.""" + """This property should return the Celery Downloader Task name + from the data_refinery_workers project which should be queued + to download Batches discovered by this surveyor.""" return @abc.abstractmethod @@ -70,8 +70,6 @@ def handle_batch(self, batch: Batch, key_values: BatchKeyValue = None): batch.survey_job = self.survey_job batch.source_type = self.source_type() batch.status = BatchStatuses.NEW.value - batch.internal_location = os.path.join(batch.accession_code, - batch.pipeline_required) pipeline_required = self.determine_pipeline(batch, key_values) if (pipeline_required is DiscoveryPipeline) or batch.processed_format: @@ -82,6 +80,9 @@ def handle_batch(self, batch: Batch, key_values: BatchKeyValue = None): "is of the type DiscoveryPipeline.") raise InvalidProcessedFormatError(message) + batch.internal_location = os.path.join(batch.accession_code, + batch.pipeline_required) + @retry(stop_max_attempt_number=3) def save_batch_start_job(): batch.save() diff --git a/foreman/data_refinery_foreman/surveyor/message_queue.py b/foreman/data_refinery_foreman/surveyor/message_queue.py new file mode 100644 index 000000000..226a1df2b --- /dev/null +++ b/foreman/data_refinery_foreman/surveyor/message_queue.py @@ -0,0 +1,24 @@ +from __future__ import absolute_import, unicode_literals +import os +from celery import Celery + +""" +This module initializes a Celery app using the same name as the +Celery app defined in the workers project. This allows us to queue +tasks without needing to import the workers project. +This is desirable because the workers project has many additional +dependencies that the foreman project does not need. +""" + + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', + 'data_refinery_workers.settings') + +app = Celery('data_refinery_workers') + +# Using a string here means the worker don't have to serialize +# the configuration object to child processes. +# - namespace='CELERY' means all celery-related configuration keys +# should have a `CELERY_` prefix. +app.config_from_object('django.conf:settings', namespace='CELERY') diff --git a/workers/data_refinery_workers/downloaders/utils.py b/workers/data_refinery_workers/downloaders/utils.py index dc1d5c6fb..baa054e4d 100644 --- a/workers/data_refinery_workers/downloaders/utils.py +++ b/workers/data_refinery_workers/downloaders/utils.py @@ -1,7 +1,20 @@ import os import urllib +from retrying import retry from django.utils import timezone -from data_refinery_models.models import Batch, BatchStatuses, DownloaderJob +from data_refinery_models.models import ( + Batch, + BatchStatuses, + DownloaderJob, + ProcessorJob +) +from data_refinery_workers.processors.processor_registry \ + import processor_pipeline_registry + +# Import and set logger +import logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) # This path is within the Docker container. ROOT_URI = "/home/user/data_store/raw" @@ -15,16 +28,31 @@ def start_job(job: DownloaderJob): def end_job(job: DownloaderJob, batch: Batch, success): - """Record in the database that this job has completed. - This should also queue a processor job at some point.""" + """Record in the database that this job has completed, + create a processor job, and queue a processor task.""" job.success = success job.end_time = timezone.now() job.save() - if batch is not None: + @retry(stop_max_attempt_number=3) + def save_batch_create_job(): batch.status = BatchStatuses.DOWNLOADED.value batch.save() + logger.info("Creating processor job for batch #%d.", batch.id) + processor_job = ProcessorJob(batch=batch) + processor_job.save() + return processor_job + + @retry(stop_max_attempt_number=3) + def queue_task(processor_job): + processor_task = processor_pipeline_registry[batch.pipeline_required] + processor_task.delay(processor_job.id) + + if batch is not None: + processor_job = save_batch_create_job() + queue_task(processor_job) + def prepare_destination(batch: Batch): """Prepare the destination directory and return the full diff --git a/workers/data_refinery_workers/processors/array_express.py b/workers/data_refinery_workers/processors/array_express.py index 07d4ddd70..492eebd8b 100644 --- a/workers/data_refinery_workers/processors/array_express.py +++ b/workers/data_refinery_workers/processors/array_express.py @@ -44,7 +44,7 @@ def cel_to_pcl(kwargs: Dict): @shared_task -def process_array_express(job_id): +def affy_to_pcl(job_id): utils.run_pipeline({"job_id": job_id}, [utils.start_job, cel_to_pcl, diff --git a/workers/data_refinery_workers/processors/processor_registry.py b/workers/data_refinery_workers/processors/processor_registry.py new file mode 100644 index 000000000..79c0f37b9 --- /dev/null +++ b/workers/data_refinery_workers/processors/processor_registry.py @@ -0,0 +1,11 @@ +from data_refinery_workers.processors.array_express \ + import affy_to_pcl + +""" +This is a dictionary which maps valid values for Batch.pipeline_required +to the processor pipeline Celery task. +""" + +processor_pipeline_registry = { + "AFFY_TO_PCL": affy_to_pcl +} diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py index be8f6db53..be402223f 100644 --- a/workers/data_refinery_workers/processors/utils.py +++ b/workers/data_refinery_workers/processors/utils.py @@ -16,10 +16,8 @@ def start_job(kwargs: Dict): """Record in the database that this job is being started and - retrieve the job and batch from the database. - Retrieves the job and the job's batch from the database and - adds them to the dictionary passed in with the keys 'job' - and 'batch' respectively.""" + retrieves the job's batch from the database and + adds it to the dictionary passed in with the key 'batch'.""" job = kwargs["job"] job.worker_id = "For now there's only one. For now..." job.start_time = timezone.now() diff --git a/workers/environment.yml b/workers/environment.yml index 4d0e3d52b..f069186cd 100644 --- a/workers/environment.yml +++ b/workers/environment.yml @@ -16,3 +16,4 @@ dependencies: - django==1.10.6 - requests==2.13.0 - psycopg2==2.7.1 + - retrying==1.3.3 diff --git a/workers/r_processors/process_cel_to_pcl.R b/workers/r_processors/cel_to_pcl.R similarity index 100% rename from workers/r_processors/process_cel_to_pcl.R rename to workers/r_processors/cel_to_pcl.R