Skip to content

Commit

Permalink
Link up processor (#10)
Browse files Browse the repository at this point in the history
Queues up a processor job after finishing a downloader job and adds needed processor discovery code.
  • Loading branch information
kurtwheeler authored May 10, 2017
1 parent 14ad858 commit 629709f
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 19 deletions.
7 changes: 3 additions & 4 deletions foreman/data_refinery_foreman/surveyor/array_express.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
ExternalSourceSurveyor,
ProcessorPipeline
)
from data_refinery_workers.downloaders.array_express \
import download_array_express

# Import and set logger
import logging
Expand All @@ -22,17 +20,18 @@
class ArrayExpressSurveyor(ExternalSourceSurveyor):
# Files API endpoint for ArrayExpress
FILES_URL = "http://www.ebi.ac.uk/arrayexpress/json/v2/files"
DOWNLOADER_TASK = "data_refinery_workers.downloaders.array_express.download_array_express"

def source_type(self):
return "ARRAY_EXPRESS"

def downloader_task(self):
return download_array_express
return self.DOWNLOADER_TASK

def determine_pipeline(self,
batch: Batch,
key_values: List[BatchKeyValue] = []):
return ProcessorPipeline.MICRO_ARRAY_TO_PCL
return ProcessorPipeline.AFFY_TO_PCL

def survey(self, survey_job: SurveyJob):
accession_code = (SurveyJobKeyValue
Expand Down
13 changes: 7 additions & 6 deletions foreman/data_refinery_foreman/surveyor/external_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class PipelineEnums(Enum):

class ProcessorPipeline(PipelineEnums):
"""Pipelines which perform some kind of processing on the data."""
MICRO_ARRAY_TO_PCL = "MICRO_ARRAY_TO_PCL"
AFFY_TO_PCL = "AFFY_TO_PCL"


class DiscoveryPipeline(PipelineEnums):
Expand All @@ -52,9 +52,9 @@ def source_type(self):

@abc.abstractproperty
def downloader_task(self):
"""This property should return the Celery Downloader Task from the
data_refinery_workers project which should be queued to download
Batches discovered by this surveyor."""
"""This property should return the Celery Downloader Task name
from the data_refinery_workers project which should be queued
to download Batches discovered by this surveyor."""
return

@abc.abstractmethod
Expand All @@ -70,8 +70,6 @@ def handle_batch(self, batch: Batch, key_values: BatchKeyValue = None):
batch.survey_job = self.survey_job
batch.source_type = self.source_type()
batch.status = BatchStatuses.NEW.value
batch.internal_location = os.path.join(batch.accession_code,
batch.pipeline_required)

pipeline_required = self.determine_pipeline(batch, key_values)
if (pipeline_required is DiscoveryPipeline) or batch.processed_format:
Expand All @@ -82,6 +80,9 @@ def handle_batch(self, batch: Batch, key_values: BatchKeyValue = None):
"is of the type DiscoveryPipeline.")
raise InvalidProcessedFormatError(message)

batch.internal_location = os.path.join(batch.accession_code,
batch.pipeline_required)

@retry(stop_max_attempt_number=3)
def save_batch_start_job():
batch.save()
Expand Down
24 changes: 24 additions & 0 deletions foreman/data_refinery_foreman/surveyor/message_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

"""
This module initializes a Celery app using the same name as the
Celery app defined in the workers project. This allows us to queue
tasks without needing to import the workers project.
This is desirable because the workers project has many additional
dependencies that the foreman project does not need.
"""


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE',
'data_refinery_workers.settings')

app = Celery('data_refinery_workers')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
36 changes: 32 additions & 4 deletions workers/data_refinery_workers/downloaders/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
import os
import urllib
from retrying import retry
from django.utils import timezone
from data_refinery_models.models import Batch, BatchStatuses, DownloaderJob
from data_refinery_models.models import (
Batch,
BatchStatuses,
DownloaderJob,
ProcessorJob
)
from data_refinery_workers.processors.processor_registry \
import processor_pipeline_registry

# Import and set logger
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# This path is within the Docker container.
ROOT_URI = "/home/user/data_store/raw"
Expand All @@ -15,16 +28,31 @@ def start_job(job: DownloaderJob):


def end_job(job: DownloaderJob, batch: Batch, success):
"""Record in the database that this job has completed.
This should also queue a processor job at some point."""
"""Record in the database that this job has completed,
create a processor job, and queue a processor task."""
job.success = success
job.end_time = timezone.now()
job.save()

if batch is not None:
@retry(stop_max_attempt_number=3)
def save_batch_create_job():
batch.status = BatchStatuses.DOWNLOADED.value
batch.save()

logger.info("Creating processor job for batch #%d.", batch.id)
processor_job = ProcessorJob(batch=batch)
processor_job.save()
return processor_job

@retry(stop_max_attempt_number=3)
def queue_task(processor_job):
processor_task = processor_pipeline_registry[batch.pipeline_required]
processor_task.delay(processor_job.id)

if batch is not None:
processor_job = save_batch_create_job()
queue_task(processor_job)


def prepare_destination(batch: Batch):
"""Prepare the destination directory and return the full
Expand Down
2 changes: 1 addition & 1 deletion workers/data_refinery_workers/processors/array_express.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def cel_to_pcl(kwargs: Dict):


@shared_task
def process_array_express(job_id):
def affy_to_pcl(job_id):
utils.run_pipeline({"job_id": job_id},
[utils.start_job,
cel_to_pcl,
Expand Down
11 changes: 11 additions & 0 deletions workers/data_refinery_workers/processors/processor_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from data_refinery_workers.processors.array_express \
import affy_to_pcl

"""
This is a dictionary which maps valid values for Batch.pipeline_required
to the processor pipeline Celery task.
"""

processor_pipeline_registry = {
"AFFY_TO_PCL": affy_to_pcl
}
6 changes: 2 additions & 4 deletions workers/data_refinery_workers/processors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

def start_job(kwargs: Dict):
"""Record in the database that this job is being started and
retrieve the job and batch from the database.
Retrieves the job and the job's batch from the database and
adds them to the dictionary passed in with the keys 'job'
and 'batch' respectively."""
retrieves the job's batch from the database and
adds it to the dictionary passed in with the key 'batch'."""
job = kwargs["job"]
job.worker_id = "For now there's only one. For now..."
job.start_time = timezone.now()
Expand Down
1 change: 1 addition & 0 deletions workers/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ dependencies:
- django==1.10.6
- requests==2.13.0
- psycopg2==2.7.1
- retrying==1.3.3
File renamed without changes.

0 comments on commit 629709f

Please sign in to comment.