diff --git a/.gitignore b/.gitignore
index 5fbeb8b24..5073009a8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
# Project specific files
workers/volume
+foreman/volume
# Byte-compiled / optimized / DLL files
__pycache__/
diff --git a/README.md b/README.md
index 8fce14b0e..7305e1a17 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,9 @@ supported by Greene Lab.
## Getting Started
-Note: The following steps assume you have already installed PostgreSQL (>=9.4) and Python (Most versions should work, but this has been tested with Python 3.5) on Ubuntu (Tested with 16.04. It should be possible to use other versions or even a Mac though).
+Note: The following steps assume you have already installed PostgreSQL (>=9.4)
+and Python (>=3.5) on Ubuntu (Tested with 16.04. It should be possible to use
+other versions or even a Mac though).
Run `./install.sh` to set up the virtualenv. It will activate the `dr_env`
for you the first time. This virtualenv is valid for the entire data_refinery
@@ -18,6 +20,18 @@ instructions on doing so.
## Development
+R files in this repo follow
+[Google's R Style Guide](https://google.github.io/styleguide/Rguide.xml).
+Python Files in this repo follow
+[PEP 8](https://www.python.org/dev/peps/pep-0008/). All files (including
+python and R) have a line limit of 100 characters.
+
+A `setup.cfg` file has been included in the root of this repo which specifies
+the line length limit for the autopep8 and flake8 linters. If you run either
+of those programs from anywhere within the project's directory tree they will
+enforce a limit of 100 instead of 80. This will also be true for editors which
+rely on them.
+
It can be useful to have an interactive python interpreter running within the
context of the Docker container. The `run_shell.sh` script has been provided
for this purpose. It is in the top level directory so that if you wish to
diff --git a/data_models/data_refinery_models/migrations/0001_initial.py b/data_models/data_refinery_models/migrations/0001_initial.py
index 78cc7d03c..92850c9fb 100644
--- a/data_models/data_refinery_models/migrations/0001_initial.py
+++ b/data_models/data_refinery_models/migrations/0001_initial.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Generated by Django 1.11 on 2017-05-02 18:50
+# Generated by Django 1.10.6 on 2017-05-26 15:12
from __future__ import unicode_literals
from django.db import migrations, models
@@ -22,14 +22,20 @@ class Migration(migrations.Migration):
('updated_at', models.DateTimeField()),
('source_type', models.CharField(max_length=256)),
('size_in_bytes', models.IntegerField()),
- ('download_url', models.CharField(max_length=2048)),
+ ('download_url', models.CharField(max_length=4096)),
('raw_format', models.CharField(max_length=256, null=True)),
('processed_format', models.CharField(max_length=256, null=True)),
('pipeline_required', models.CharField(max_length=256)),
- ('accession_code', models.CharField(max_length=32)),
+ ('platform_accession_code', models.CharField(max_length=32)),
+ ('experiment_accession_code', models.CharField(max_length=32)),
+ ('experiment_title', models.CharField(max_length=256)),
('status', models.CharField(max_length=20)),
+ ('release_date', models.DateField()),
+ ('last_uploaded_date', models.DateField()),
+ ('name', models.CharField(max_length=1024)),
('internal_location', models.CharField(max_length=256, null=True)),
- ('organism', models.IntegerField()),
+ ('organism_id', models.IntegerField()),
+ ('organism_name', models.CharField(max_length=256)),
],
options={
'db_table': 'batches',
@@ -60,12 +66,38 @@ class Migration(migrations.Migration):
('success', models.NullBooleanField()),
('num_retries', models.IntegerField(default=0)),
('worker_id', models.CharField(max_length=256, null=True)),
- ('batch', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='data_refinery_models.Batch')),
],
options={
'db_table': 'downloader_jobs',
},
),
+ migrations.CreateModel(
+ name='DownloaderJobsToBatches',
+ fields=[
+ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('created_at', models.DateTimeField(editable=False)),
+ ('updated_at', models.DateTimeField()),
+ ('batch', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='data_refinery_models.Batch')),
+ ('downloader_job', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='data_refinery_models.DownloaderJob')),
+ ],
+ options={
+ 'db_table': 'downloader_jobs_to_batches',
+ },
+ ),
+ migrations.CreateModel(
+ name='Organism',
+ fields=[
+ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('created_at', models.DateTimeField(editable=False)),
+ ('updated_at', models.DateTimeField()),
+ ('name', models.CharField(max_length=256)),
+ ('taxonomy_id', models.IntegerField()),
+ ('is_scientific_name', models.BooleanField(default=False)),
+ ],
+ options={
+ 'db_table': 'organisms',
+ },
+ ),
migrations.CreateModel(
name='ProcessorJob',
fields=[
@@ -78,12 +110,24 @@ class Migration(migrations.Migration):
('pipeline_applied', models.CharField(max_length=256)),
('num_retries', models.IntegerField(default=0)),
('worker_id', models.CharField(max_length=256, null=True)),
- ('batch', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='data_refinery_models.Batch')),
],
options={
'db_table': 'processor_jobs',
},
),
+ migrations.CreateModel(
+ name='ProcessorJobsToBatches',
+ fields=[
+ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('created_at', models.DateTimeField(editable=False)),
+ ('updated_at', models.DateTimeField()),
+ ('batch', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='data_refinery_models.Batch')),
+ ('processor_job', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='data_refinery_models.ProcessorJob')),
+ ],
+ options={
+ 'db_table': 'processor_jobs_to_batches',
+ },
+ ),
migrations.CreateModel(
name='SurveyJob',
fields=[
@@ -118,6 +162,6 @@ class Migration(migrations.Migration):
migrations.AddField(
model_name='batch',
name='survey_job',
- field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='data_refinery_models.SurveyJob'),
+ field=models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='data_refinery_models.SurveyJob'),
),
]
diff --git a/data_models/data_refinery_models/models.py b/data_models/data_refinery_models/models.py
deleted file mode 100644
index 4111afb1e..000000000
--- a/data_models/data_refinery_models/models.py
+++ /dev/null
@@ -1,138 +0,0 @@
-from django.db import models
-from django.utils import timezone
-from enum import Enum
-
-
-class TimeTrackedModel(models.Model):
- created_at = models.DateTimeField(editable=False)
- updated_at = models.DateTimeField()
-
- def save(self, *args, **kwargs):
- """ On save, update timestamps """
- current_time = timezone.now()
- if not self.id:
- self.created_at = current_time
- self.updated_at = current_time
- return super(TimeTrackedModel, self).save(*args, **kwargs)
-
- class Meta:
- abstract = True
-
-
-class SurveyJob(TimeTrackedModel):
- source_type = models.CharField(max_length=256)
- success = models.NullBooleanField(null=True)
-
- # The start time of the query used to replicate
- replication_started_at = models.DateTimeField(null=True)
-
- # The end time of the query used to replicate
- replication_ended_at = models.DateTimeField(null=True)
-
- # The start time of the job
- start_time = models.DateTimeField(null=True)
-
- # The end time of the job
- end_time = models.DateTimeField(null=True)
-
- class Meta:
- db_table = "survey_jobs"
-
-
-class SurveyJobKeyValue(TimeTrackedModel):
- """
- This table is used for tracking fields onto a SurveyJob record that
- would be sparsely populated if it was its own column.
- I.e. one source may have an extra field or two that are worth
- tracking but are specific to that source.
- """
- survey_job = models.ForeignKey(SurveyJob, on_delete=models.CASCADE)
- key = models.CharField(max_length=256)
- value = models.CharField(max_length=256)
-
- class Meta:
- db_table = "survey_job_key_values"
-
-
-class BatchStatuses(Enum):
- NEW = "NEW"
- DOWNLOADED = "DOWNLOADED"
- PROCESSED = "PROCESSED"
-
-
-class Batch(TimeTrackedModel):
- survey_job = models.ForeignKey(SurveyJob)
- source_type = models.CharField(max_length=256)
- size_in_bytes = models.IntegerField()
- download_url = models.CharField(max_length=2048)
- raw_format = models.CharField(max_length=256, null=True)
- processed_format = models.CharField(max_length=256, null=True)
- pipeline_required = models.CharField(max_length=256)
- accession_code = models.CharField(max_length=32)
- status = models.CharField(max_length=20)
-
- # This field will denote where in our system the file can be found
- internal_location = models.CharField(max_length=256, null=True)
-
- # This will utilize the organism taxonomy ID from NCBI
- organism = models.IntegerField()
-
- class Meta:
- db_table = "batches"
-
-
-class BatchKeyValue(TimeTrackedModel):
- """
- This table is used for tracking fields onto a Batch record that would
- be sparsely populated if it was its own column.
- I.e. one source may have an extra field or two that are worth tracking
- but are specific to that source.
- """
- batch = models.ForeignKey(Batch, on_delete=models.CASCADE)
- key = models.CharField(max_length=256)
- value = models.CharField(max_length=256)
-
- class Meta:
- db_table = "batch_key_values"
-
-
-class ProcessorJob(TimeTrackedModel):
- batch = models.ForeignKey(Batch, on_delete=models.CASCADE)
- start_time = models.DateTimeField(null=True)
- end_time = models.DateTimeField(null=True)
- success = models.NullBooleanField(null=True)
-
- # This field will contain an enumerated value specifying which processor
- # pipeline was applied during the processor job.
- pipeline_applied = models.CharField(max_length=256)
-
- # This field represents how many times this job has been retried. It starts
- # at 0 and each time the job has to be retried it will be incremented.
- # At some point there will probably be some code like:
- # if job.num_retries >= 3:
- # # do a bunch of logging
- # else:
- # # retry the job
- num_retries = models.IntegerField(default=0)
-
- # This point of this field is to identify which worker ran the job.
- # A few fields may actually be required or something other than just an id.
- worker_id = models.CharField(max_length=256, null=True)
-
- class Meta:
- db_table = "processor_jobs"
-
-
-class DownloaderJob(TimeTrackedModel):
- batch = models.ForeignKey(Batch, on_delete=models.CASCADE)
- start_time = models.DateTimeField(null=True)
- end_time = models.DateTimeField(null=True)
- success = models.NullBooleanField(null=True)
-
- # These two fields are analagous to the fields with the same names
- # in ProcessorJob, see their descriptions for more information
- num_retries = models.IntegerField(default=0)
- worker_id = models.CharField(max_length=256, null=True)
-
- class Meta:
- db_table = "downloader_jobs"
diff --git a/data_models/data_refinery_models/models/__init__.py b/data_models/data_refinery_models/models/__init__.py
new file mode 100644
index 000000000..5f4af8c27
--- /dev/null
+++ b/data_models/data_refinery_models/models/__init__.py
@@ -0,0 +1,13 @@
+from data_refinery_models.models.surveys import SurveyJob, SurveyJobKeyValue
+from data_refinery_models.models.batches import (
+ BatchStatuses,
+ Batch,
+ BatchKeyValue
+)
+from data_refinery_models.models.jobs import (
+ DownloaderJob,
+ ProcessorJob,
+ DownloaderJobsToBatches,
+ ProcessorJobsToBatches
+)
+from data_refinery_models.models.organism import Organism
diff --git a/data_models/data_refinery_models/models/base_models.py b/data_models/data_refinery_models/models/base_models.py
new file mode 100644
index 000000000..cf9df7afc
--- /dev/null
+++ b/data_models/data_refinery_models/models/base_models.py
@@ -0,0 +1,20 @@
+from django.db import models
+from django.utils import timezone
+
+
+class TimeTrackedModel(models.Model):
+ """Base model with auto created_at and updated_at fields."""
+
+ created_at = models.DateTimeField(editable=False)
+ updated_at = models.DateTimeField()
+
+ def save(self, *args, **kwargs):
+ """ On save, update timestamps """
+ current_time = timezone.now()
+ if not self.id:
+ self.created_at = current_time
+ self.updated_at = current_time
+ return super(TimeTrackedModel, self).save(*args, **kwargs)
+
+ class Meta:
+ abstract = True
diff --git a/data_models/data_refinery_models/models/batches.py b/data_models/data_refinery_models/models/batches.py
new file mode 100644
index 000000000..2c7c0563f
--- /dev/null
+++ b/data_models/data_refinery_models/models/batches.py
@@ -0,0 +1,64 @@
+from enum import Enum
+from django.db import models
+from data_refinery_models.models.base_models import TimeTrackedModel
+from data_refinery_models.models.surveys import SurveyJob
+
+
+class BatchStatuses(Enum):
+ """Valid values for the status field of the Batch model."""
+
+ NEW = "NEW"
+ DOWNLOADED = "DOWNLOADED"
+ PROCESSED = "PROCESSED"
+
+
+class Batch(TimeTrackedModel):
+ """Represents a batch of data.
+
+ The definition of a Batch is intentionally that vague. What a batch
+ is will vary from source to source. It could be a single file, or
+ a group of files with some kind of logical grouping such as an
+ experiment.
+ """
+
+ survey_job = models.ForeignKey(SurveyJob, on_delete=models.PROTECT)
+ source_type = models.CharField(max_length=256)
+ size_in_bytes = models.IntegerField()
+ download_url = models.CharField(max_length=4096)
+ raw_format = models.CharField(max_length=256, null=True)
+ processed_format = models.CharField(max_length=256, null=True)
+ pipeline_required = models.CharField(max_length=256)
+ platform_accession_code = models.CharField(max_length=32)
+ experiment_accession_code = models.CharField(max_length=32)
+ experiment_title = models.CharField(max_length=256)
+ status = models.CharField(max_length=20)
+ release_date = models.DateField()
+ last_uploaded_date = models.DateField()
+ name = models.CharField(max_length=1024)
+
+ # This field will denote where in our system the file can be found.
+ internal_location = models.CharField(max_length=256, null=True)
+
+ # This corresponds to the organism taxonomy ID from NCBI.
+ organism_id = models.IntegerField()
+ # This is the organism name as it appeared in the experiment.
+ organism_name = models.CharField(max_length=256)
+
+ class Meta:
+ db_table = "batches"
+
+
+class BatchKeyValue(TimeTrackedModel):
+ """Tracks additional fields for Batches.
+
+ Useful for fields that would be sparsely populated if they were
+ their own columns. I.e. one source may have an extra field or two
+ that are worth tracking but are specific to that source.
+ """
+
+ batch = models.ForeignKey(Batch, on_delete=models.CASCADE)
+ key = models.CharField(max_length=256)
+ value = models.CharField(max_length=256)
+
+ class Meta:
+ db_table = "batch_key_values"
diff --git a/data_models/data_refinery_models/models/jobs.py b/data_models/data_refinery_models/models/jobs.py
new file mode 100644
index 000000000..f6827ffc3
--- /dev/null
+++ b/data_models/data_refinery_models/models/jobs.py
@@ -0,0 +1,73 @@
+from django.db import models
+from data_refinery_models.models.base_models import TimeTrackedModel
+from data_refinery_models.models.batches import Batch
+
+
+class ProcessorJob(TimeTrackedModel):
+ """Records information about running a processor."""
+
+ start_time = models.DateTimeField(null=True)
+ end_time = models.DateTimeField(null=True)
+ success = models.NullBooleanField(null=True)
+
+ # This field will contain an enumerated value specifying which processor
+ # pipeline was applied during the processor job.
+ pipeline_applied = models.CharField(max_length=256)
+
+ # This field represents how many times this job has been retried. It starts
+ # at 0 and each time the job has to be retried it will be incremented.
+ # At some point there will probably be some code like:
+ # if job.num_retries >= 3:
+ # # do a bunch of logging
+ # else:
+ # # retry the job
+ num_retries = models.IntegerField(default=0)
+
+ # This point of this field is to identify which worker ran the job.
+ # A few fields may actually be required or something other than just an id.
+ worker_id = models.CharField(max_length=256, null=True)
+
+ class Meta:
+ db_table = "processor_jobs"
+
+
+class ProcessorJobsToBatches(TimeTrackedModel):
+ """Represents a many to many relationship.
+
+ Maps between ProcessorJobs and Batches.
+ """
+
+ batch = models.ForeignKey(Batch, on_delete=models.CASCADE)
+ processor_job = models.ForeignKey(ProcessorJob, on_delete=models.CASCADE)
+
+ class Meta:
+ db_table = "processor_jobs_to_batches"
+
+
+class DownloaderJob(TimeTrackedModel):
+ """Records information about running a Downloader."""
+
+ start_time = models.DateTimeField(null=True)
+ end_time = models.DateTimeField(null=True)
+ success = models.NullBooleanField(null=True)
+
+ # These two fields are analagous to the fields with the same names
+ # in ProcessorJob, see their descriptions for more information
+ num_retries = models.IntegerField(default=0)
+ worker_id = models.CharField(max_length=256, null=True)
+
+ class Meta:
+ db_table = "downloader_jobs"
+
+
+class DownloaderJobsToBatches(TimeTrackedModel):
+ """Represents a many to many relationship.
+
+ Maps between DownloaderJobs and Batches.
+ """
+
+ batch = models.ForeignKey(Batch, on_delete=models.CASCADE)
+ downloader_job = models.ForeignKey(DownloaderJob, on_delete=models.CASCADE)
+
+ class Meta:
+ db_table = "downloader_jobs_to_batches"
diff --git a/data_models/data_refinery_models/models/organism.py b/data_models/data_refinery_models/models/organism.py
new file mode 100644
index 000000000..a81d7921a
--- /dev/null
+++ b/data_models/data_refinery_models/models/organism.py
@@ -0,0 +1,129 @@
+import requests
+from xml.etree import ElementTree
+from django.db import models
+from data_refinery_models.models.base_models import TimeTrackedModel
+
+# Import and set logger
+import logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+NCBI_ROOT_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
+ESEARCH_URL = NCBI_ROOT_URL + "esearch.fcgi"
+EFETCH_URL = NCBI_ROOT_URL + "efetch.fcgi"
+TAXONOMY_DATABASE = "taxonomy"
+
+
+class UnscientificNameError(BaseException):
+ pass
+
+
+class InvalidNCBITaxonomyId(BaseException):
+ pass
+
+
+def get_scientific_name(taxonomy_id: int) -> str:
+ parameters = {"db": TAXONOMY_DATABASE, "id": str(taxonomy_id)}
+ response = requests.get(EFETCH_URL, parameters)
+
+ root = ElementTree.fromstring(response.text)
+ taxon_list = root.findall("Taxon")
+
+ if len(taxon_list) == 0:
+ logger.error("No names returned by ncbi.nlm.nih.gov for organism "
+ + "with taxonomy ID %d.",
+ taxonomy_id)
+ raise InvalidNCBITaxonomyId
+
+ return taxon_list[0].find("ScientificName").text.upper()
+
+
+def get_taxonomy_id(organism_name: str) -> int:
+ parameters = {"db": TAXONOMY_DATABASE, "term": organism_name}
+ response = requests.get(ESEARCH_URL, parameters)
+
+ root = ElementTree.fromstring(response.text)
+ id_list = root.find("IdList").findall("Id")
+
+ if len(id_list) == 0:
+ logger.error("Unable to retrieve NCBI taxonomy ID number for organism "
+ + "with name: %s",
+ organism_name)
+ return 0
+ elif len(id_list) > 1:
+ logger.warn("Organism with name %s returned multiple NCBI taxonomy ID "
+ + "numbers.",
+ organism_name)
+
+ return int(id_list[0].text)
+
+
+def get_taxonomy_id_scientific(organism_name: str) -> int:
+ parameters = {"db": TAXONOMY_DATABASE, "field": "scin", "term": organism_name}
+ response = requests.get(ESEARCH_URL, parameters)
+
+ root = ElementTree.fromstring(response.text)
+ id_list = root.find("IdList").findall("Id")
+
+ if len(id_list) == 0:
+ raise UnscientificNameError
+ elif len(id_list) > 1:
+ logger.warn("Organism with name %s returned multiple NCBI taxonomy ID "
+ + "numbers.",
+ organism_name)
+
+ return int(id_list[0].text)
+
+
+class Organism(TimeTrackedModel):
+ """Provides a lookup between organism name and taxonomy ids.
+
+ Should only be used via the two class methods get_name_for_id and
+ get_id_for_name. These methods will populate the database table
+ with any missing values by accessing the NCBI API.
+ """
+
+ name = models.CharField(max_length=256)
+ taxonomy_id = models.IntegerField()
+ is_scientific_name = models.BooleanField(default=False)
+
+ @classmethod
+ def get_name_for_id(cls, taxonomy_id: int) -> str:
+ try:
+ organism = (cls.objects
+ .filter(taxonomy_id=taxonomy_id)
+ .order_by("-is_scientific_name")
+ [0])
+ except IndexError:
+ name = get_scientific_name(taxonomy_id).upper()
+ organism = Organism(name=name,
+ taxonomy_id=taxonomy_id,
+ is_scientific_name=True)
+ organism.save()
+
+ return organism.name
+
+ @classmethod
+ def get_id_for_name(cls, name: str) -> id:
+ name = name.upper()
+ try:
+ organism = (cls.objects
+ .filter(name=name)
+ [0])
+ except IndexError:
+ is_scientific_name = False
+ try:
+ taxonomy_id = get_taxonomy_id_scientific(name)
+ is_scientific_name = True
+ except UnscientificNameError:
+ taxonomy_id = get_taxonomy_id(name)
+
+ organism = Organism(name=name,
+ taxonomy_id=taxonomy_id,
+ is_scientific_name=is_scientific_name)
+ organism.save()
+
+ return organism.taxonomy_id
+
+ class Meta:
+ db_table = "organisms"
diff --git a/data_models/data_refinery_models/models/surveys.py b/data_models/data_refinery_models/models/surveys.py
new file mode 100644
index 000000000..80e5d8f74
--- /dev/null
+++ b/data_models/data_refinery_models/models/surveys.py
@@ -0,0 +1,40 @@
+from django.db import models
+from data_refinery_models.models.base_models import TimeTrackedModel
+
+
+class SurveyJob(TimeTrackedModel):
+ """Records information about a Surveyor Job."""
+
+ source_type = models.CharField(max_length=256)
+ success = models.NullBooleanField(null=True)
+
+ # The start time of the query used to replicate
+ replication_started_at = models.DateTimeField(null=True)
+
+ # The end time of the query used to replicate
+ replication_ended_at = models.DateTimeField(null=True)
+
+ # The start time of the job
+ start_time = models.DateTimeField(null=True)
+
+ # The end time of the job
+ end_time = models.DateTimeField(null=True)
+
+ class Meta:
+ db_table = "survey_jobs"
+
+
+class SurveyJobKeyValue(TimeTrackedModel):
+ """Tracks additional fields for SurveyJobs.
+
+ Useful for fields that would be sparsely populated if they were
+ their own columns. I.e. one source may have an extra field or two
+ that are worth tracking but are specific to that source.
+ """
+
+ survey_job = models.ForeignKey(SurveyJob, on_delete=models.CASCADE)
+ key = models.CharField(max_length=256)
+ value = models.CharField(max_length=256)
+
+ class Meta:
+ db_table = "survey_job_key_values"
diff --git a/data_models/data_refinery_models/models/test_organisms.py b/data_models/data_refinery_models/models/test_organisms.py
new file mode 100644
index 000000000..f48216393
--- /dev/null
+++ b/data_models/data_refinery_models/models/test_organisms.py
@@ -0,0 +1,239 @@
+from unittest.mock import Mock, patch, call
+from django.test import TestCase
+from data_refinery_models.models.organism import (
+ Organism,
+ ESEARCH_URL,
+ EFETCH_URL,
+ InvalidNCBITaxonomyId,
+)
+
+ESEARCH_RESPONSE_XML = """
+
+
+ 1
+ 1
+ 0
+
+ 9606
+
+
+
+
+ homo sapiens[Scientific Name]
+ Scientific Name
+ 1
+ N
+
+ GROUP
+
+ homo sapiens[Scientific Name]
+"""
+
+ESEARCH_NOT_FOUND_XML = """
+
+
+ 0
+ 0
+ 0
+
+
+ (man[Scientific Name])
+
+ blah
+
+
+ No items found.
+
+"""
+
+EFETCH_RESPONSE_XML = """
+
+
+ 9606
+ Homo sapiens
+
+ human
+ man
+
+ authority
+ Homo sapiens Linnaeus, 1758
+
+
+ 9605
+ species
+ Primates
+
+ 1
+ Standard
+
+
+ 2
+ Vertebrate Mitochondrial
+
+ cellular organisms
+
+
+ 131567
+ cellular organisms
+ no rank
+
+
+ 1995/02/27 09:24:00
+ 2017/02/28 16:38:58
+ 1992/05/26 01:00:00
+
+
+"""
+
+EFETCH_NOT_FOUND_XML = """
+
+
+ID list is empty! Possibly it has no correct IDs.
+"""
+
+
+def mocked_requests_get(url, parameters):
+ mock = Mock(ok=True)
+ if url is not ESEARCH_URL:
+ mock.text = "This is wrong."
+ else:
+ try:
+ if parameters["field"] is "scin":
+ mock.text = ESEARCH_NOT_FOUND_XML
+ else:
+ mock.text = "This is also wrong."
+ except KeyError:
+ mock.text = ESEARCH_RESPONSE_XML
+
+ return mock
+
+
+class OrganismModelTestCase(TestCase):
+ def tearDown(self):
+ Organism.objects.all().delete()
+
+ @patch('data_refinery_models.models.organism.requests.get')
+ def test_cached_names_are_found(self, mock_get):
+ Organism.objects.create(name="HOMO SAPIENS",
+ taxonomy_id=9606,
+ is_scientific_name=True)
+
+ name = Organism.get_name_for_id(9606)
+
+ self.assertEqual(name, "HOMO SAPIENS")
+ mock_get.assert_not_called()
+
+ @patch('data_refinery_models.models.organism.requests.get')
+ def test_cached_ids_are_found(self, mock_get):
+ Organism.objects.create(name="HOMO SAPIENS",
+ taxonomy_id=9606,
+ is_scientific_name=True)
+
+ id = Organism.get_id_for_name("Homo Sapiens")
+
+ self.assertEqual(id, 9606)
+ mock_get.assert_not_called()
+
+ @patch('data_refinery_models.models.organism.requests.get')
+ def test_uncached_scientific_names_are_found(self, mock_get):
+ mock_get.return_value = Mock(ok=True)
+ mock_get.return_value.text = ESEARCH_RESPONSE_XML
+
+ taxonomy_id = Organism.get_id_for_name("Homo Sapiens")
+
+ self.assertEqual(taxonomy_id, 9606)
+ mock_get.assert_called_once_with(
+ ESEARCH_URL,
+ {"db": "taxonomy", "field": "scin", "term": "HOMO%20SAPIENS"}
+ )
+
+ # The first call should have stored the organism record in the
+ # database so this call should not make a request.
+ mock_get.reset_mock()
+ new_id = Organism.get_id_for_name("Homo Sapiens")
+
+ self.assertEqual(new_id, 9606)
+ mock_get.assert_not_called()
+
+ @patch('data_refinery_models.models.organism.requests.get')
+ def test_uncached_other_names_are_found(self, mock_get):
+ mock_get.side_effect = mocked_requests_get
+
+ taxonomy_id = Organism.get_id_for_name("Human")
+
+ self.assertEqual(taxonomy_id, 9606)
+ mock_get.assert_has_calls([
+ call(ESEARCH_URL,
+ {"db": "taxonomy", "field": "scin", "term": "HUMAN"}),
+ call(ESEARCH_URL,
+ {"db": "taxonomy", "term": "HUMAN"})])
+
+ # The first call should have stored the organism record in the
+ # database so this call should not make a request.
+ mock_get.reset_mock()
+ new_id = Organism.get_id_for_name("Human")
+
+ self.assertEqual(new_id, 9606)
+ mock_get.assert_not_called()
+
+ @patch('data_refinery_models.models.organism.requests.get')
+ def test_uncached_ids_are_found(self, mock_get):
+ mock_get.return_value = Mock(ok=True)
+ mock_get.return_value.text = EFETCH_RESPONSE_XML
+
+ organism_name = Organism.get_name_for_id(9606)
+
+ self.assertEqual(organism_name, "HOMO SAPIENS")
+ mock_get.assert_called_once_with(
+ EFETCH_URL,
+ {"db": "taxonomy", "id": "9606"}
+ )
+
+ # The first call should have stored the organism record in the
+ # database so this call should not make a request.
+ mock_get.reset_mock()
+ new_name = Organism.get_name_for_id(9606)
+
+ self.assertEqual(new_name, "HOMO SAPIENS")
+ mock_get.assert_not_called()
+
+ @patch('data_refinery_models.models.organism.requests.get')
+ def test_invalid_ids_cause_exceptions(self, mock_get):
+ mock_get.return_value = Mock(ok=True)
+ mock_get.return_value.text = EFETCH_NOT_FOUND_XML
+
+ with self.assertRaises(InvalidNCBITaxonomyId):
+ Organism.get_name_for_id(0)
+
+ @patch('data_refinery_models.models.organism.requests.get')
+ def test_unfound_names_return_0(self, mock_get):
+ """If we can't find an NCBI taxonomy ID for an organism name
+ we can keep things moving for a while without it.
+ get_taxonomy_id will log an error message which will prompt
+ a developer to investigate what the organism name that was
+ unable to be found is. Therefore setting the ID to 0 is the
+ right thing to do in this case despite not seeming like it.
+ """
+ mock_get.return_value = Mock(ok=True)
+ mock_get.return_value.text = ESEARCH_NOT_FOUND_XML
+
+ taxonomy_id = Organism.get_id_for_name("blah")
+
+ self.assertEqual(taxonomy_id, 0)
+ mock_get.assert_has_calls([
+ call(ESEARCH_URL,
+ {"db": "taxonomy", "field": "scin", "term": "BLAH"}),
+ call(ESEARCH_URL,
+ {"db": "taxonomy", "term": "BLAH"})])
+
+ # The first call should have stored the organism record in the
+ # database so this call should not make a request.
+ mock_get.reset_mock()
+ new_id = Organism.get_id_for_name("BLAH")
+
+ self.assertEqual(new_id, 0)
+ mock_get.assert_not_called()
diff --git a/data_models/data_refinery_models/tests.py b/data_models/data_refinery_models/models/test_time_tracked_models.py
similarity index 100%
rename from data_models/data_refinery_models/tests.py
rename to data_models/data_refinery_models/models/test_time_tracked_models.py
diff --git a/data_models/requirements.in b/data_models/requirements.in
index 08088127e..48852bf73 100644
--- a/data_models/requirements.in
+++ b/data_models/requirements.in
@@ -1,2 +1,3 @@
django
psycopg2
+requests
diff --git a/data_models/requirements.txt b/data_models/requirements.txt
index 380d9d088..a30db3f5a 100644
--- a/data_models/requirements.txt
+++ b/data_models/requirements.txt
@@ -6,3 +6,4 @@
#
django==1.10.6
psycopg2==2.7.1
+requests==2.13.0
diff --git a/foreman/data_refinery_foreman/surveyor/array_express.py b/foreman/data_refinery_foreman/surveyor/array_express.py
index 5cebee15d..7f32ca60e 100644
--- a/foreman/data_refinery_foreman/surveyor/array_express.py
+++ b/foreman/data_refinery_foreman/surveyor/array_express.py
@@ -1,10 +1,11 @@
import requests
from typing import List
+
from data_refinery_models.models import (
Batch,
BatchKeyValue,
- SurveyJob,
- SurveyJobKeyValue
+ SurveyJobKeyValue,
+ Organism
)
from data_refinery_foreman.surveyor.external_source import (
ExternalSourceSurveyor,
@@ -16,68 +17,111 @@
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
+EXPERIMENTS_URL = "https://www.ebi.ac.uk/arrayexpress/json/v3/experiments/"
+SAMPLES_URL = EXPERIMENTS_URL + "{}/samples"
-class ArrayExpressSurveyor(ExternalSourceSurveyor):
- # Files API endpoint for ArrayExpress
- FILES_URL = "http://www.ebi.ac.uk/arrayexpress/json/v2/files"
- DOWNLOADER_TASK = "data_refinery_workers.downloaders.array_express.download_array_express"
+class ArrayExpressSurveyor(ExternalSourceSurveyor):
def source_type(self):
return "ARRAY_EXPRESS"
def downloader_task(self):
- return self.DOWNLOADER_TASK
+ return "data_refinery_workers.downloaders.array_express.download_array_express"
def determine_pipeline(self,
batch: Batch,
key_values: List[BatchKeyValue] = []):
return ProcessorPipeline.AFFY_TO_PCL
- def survey(self, survey_job: SurveyJob):
- accession_code = (SurveyJobKeyValue
- .objects
- .filter(survey_job_id=survey_job.id,
- key__exact="accession_code")
- [:1]
- .get()
- .value)
- parameters = {"raw": "true", "array": accession_code}
-
- r = requests.get(self.FILES_URL, params=parameters)
- response_dictionary = r.json()
-
- try:
- experiments = response_dictionary["files"]["experiment"]
- except KeyError: # If the platform does not exist or has no files...
- logger.info(
- "No files were found with this platform accession code: %s",
- accession_code
- )
- return True
-
- logger.info("Found %d new experiments for Survey Job #%d.",
- len(experiments),
- survey_job.id)
-
- for experiment in experiments:
- data_files = experiment["file"]
-
- # If there is only one file object in data_files,
- # ArrayExpress does not put it in a list of size 1
- if (type(data_files) != list):
- data_files = [data_files]
-
- for data_file in data_files:
- if (data_file["kind"] == "raw"):
- url = data_file["url"].replace("\\", "")
- # This is another place where this is still a POC.
- # More work will need to be done to determine some
- # of these additional metadata fields.
- self.handle_batch(Batch(size_in_bytes=data_file["size"],
- download_url=url,
- raw_format="MICRO_ARRAY",
- processed_format="PCL",
- accession_code=accession_code,
- organism=1))
-
- return True
+ @staticmethod
+ def get_experiment_metadata(experiment_accession_code):
+ experiment_request = requests.get(EXPERIMENTS_URL + experiment_accession_code)
+ parsed_json = experiment_request.json()["experiments"]["experiment"][0]
+
+ experiment = {}
+
+ experiment["name"] = parsed_json["name"]
+ experiment["experiment_accession_code"] = experiment_accession_code
+
+ # If there is more than one arraydesign listed in the experiment
+ # then there is no other way to determine which array was used
+ # for which sample other than looking at the header of the CEL
+ # file. That obviously cannot happen until the CEL file has been
+ # downloaded so we can just mark it as UNKNOWN and let the
+ # downloader inspect the downloaded file to determine the
+ # array then.
+ if len(parsed_json["arraydesign"]) == 0:
+ logger.warn("Experiment %s has no arraydesign listed.", experiment_accession_code)
+ experiment["platform_accession_code"] = "UNKNOWN"
+ elif len(parsed_json["arraydesign"]) > 1:
+ experiment["platform_accession_code"] = "UNKNOWN"
+ else:
+ experiment["platform_accession_code"] = \
+ parsed_json["arraydesign"][0]["accession"]
+
+ experiment["release_date"] = parsed_json["releasedate"]
+
+ if "lastupdatedate" in parsed_json:
+ experiment["last_update_date"] = parsed_json["lastupdatedate"]
+ else:
+ experiment["last_update_date"] = parsed_json["releasedate"]
+
+ return experiment
+
+ def survey(self):
+ experiment_accession_code = (
+ SurveyJobKeyValue
+ .objects
+ .get(survey_job_id=self.survey_job.id,
+ key__exact="experiment_accession_code")
+ .value
+ )
+
+ logger.info("Surveying experiment with accession code: %s.", experiment_accession_code)
+
+ experiment = self.get_experiment_metadata(experiment_accession_code)
+
+ r = requests.get(SAMPLES_URL.format(experiment_accession_code))
+ samples = r.json()["experiment"]["sample"]
+
+ batches = []
+ for sample in samples:
+ if "file" not in sample:
+ continue
+
+ organism_name = "UNKNOWN"
+ for characteristic in sample["characteristic"]:
+ if characteristic["category"].upper() == "ORGANISM":
+ organism_name = characteristic["value"].upper()
+
+ if organism_name == "UNKNOWN":
+ logger.error("Sample from experiment %s did not specify the organism name.",
+ experiment_accession_code)
+ organism_id = 0
+ else:
+ organism_id = Organism.get_id_for_name(organism_name)
+
+ for sample_file in sample["file"]:
+ if sample_file["type"] != "data":
+ continue
+
+ batches.append(Batch(
+ size_in_bytes=-1, # Will have to be determined later
+ download_url=sample_file["comment"]["value"],
+ raw_format=sample_file["name"].split(".")[-1],
+ processed_format="PCL",
+ platform_accession_code=experiment["platform_accession_code"],
+ experiment_accession_code=experiment_accession_code,
+ organism_id=organism_id,
+ organism_name=organism_name,
+ experiment_title=experiment["name"],
+ release_date=experiment["release_date"],
+ last_uploaded_date=experiment["last_update_date"],
+ name=sample_file["name"]
+ ))
+
+ # Group batches based on their download URL and handle each group.
+ download_urls = {batch.download_url for batch in batches}
+ for url in download_urls:
+ batches_with_url = [batch for batch in batches if batch.download_url == url]
+ self.handle_batches(batches_with_url)
diff --git a/foreman/data_refinery_foreman/surveyor/external_source.py b/foreman/data_refinery_foreman/surveyor/external_source.py
index 62bca47cb..0be944435 100644
--- a/foreman/data_refinery_foreman/surveyor/external_source.py
+++ b/foreman/data_refinery_foreman/surveyor/external_source.py
@@ -3,13 +3,16 @@
from enum import Enum
from typing import List
from retrying import retry
+from django.db import transaction
from data_refinery_models.models import (
Batch,
BatchStatuses,
- BatchKeyValue,
DownloaderJob,
+ DownloaderJobsToBatches,
SurveyJob
)
+from data_refinery_foreman.surveyor.message_queue import app
+
# Import and set logger
import logging
@@ -25,7 +28,8 @@ class PipelineEnums(Enum):
"""An abstract class to enumerate valid processor pipelines.
Enumerations which extend this class are valid values for the
- pipeline_required field of the Batches table."""
+ pipeline_required field of the Batches table.
+ """
pass
@@ -35,8 +39,7 @@ class ProcessorPipeline(PipelineEnums):
class DiscoveryPipeline(PipelineEnums):
- """Pipelines which discover what kind of processing is appropriate
- for the data."""
+ """Pipelines which discover appropriate processing for the data."""
pass
@@ -52,56 +55,69 @@ def source_type(self):
@abc.abstractproperty
def downloader_task(self):
- """This property should return the Celery Downloader Task name
- from the data_refinery_workers project which should be queued
- to download Batches discovered by this surveyor."""
+ """Abstract property representing the downloader task.
+
+ Should return the Celery Downloader Task name from the
+ data_refinery_workers project which should be queued to
+ download Batches discovered by this surveyor.
+ """
return
@abc.abstractmethod
def determine_pipeline(self,
- batch: Batch,
- key_values: List[BatchKeyValue] = []):
- """Determines the appropriate processor pipeline for the batch
- and returns a string that represents a processor pipeline.
- Must return a member of PipelineEnums."""
+ batch: Batch):
+ """Determines the appropriate pipeline for the batch.
+
+ Returns a string that represents a processor pipeline.
+ Must return a member of PipelineEnums.
+ """
return
- def handle_batch(self, batch: Batch, key_values: BatchKeyValue = None):
- batch.survey_job = self.survey_job
- batch.source_type = self.source_type()
- batch.status = BatchStatuses.NEW.value
+ def handle_batches(self, batches: List[Batch]):
+ for batch in batches:
+ batch.survey_job = self.survey_job
+ batch.source_type = self.source_type()
+ batch.status = BatchStatuses.NEW.value
- pipeline_required = self.determine_pipeline(batch, key_values)
- if (pipeline_required is DiscoveryPipeline) or batch.processed_format:
- batch.pipeline_required = pipeline_required.value
- else:
- message = ("Batches must have the processed_format field set "
- "unless the pipeline returned by determine_pipeline "
- "is of the type DiscoveryPipeline.")
- raise InvalidProcessedFormatError(message)
+ pipeline_required = self.determine_pipeline(batch)
+ if (pipeline_required is DiscoveryPipeline) or batch.processed_format:
+ batch.pipeline_required = pipeline_required.value
+ else:
+ message = ("Batches must have the processed_format field set "
+ "unless the pipeline returned by determine_pipeline "
+ "is of the type DiscoveryPipeline.")
+ raise InvalidProcessedFormatError(message)
- batch.internal_location = os.path.join(batch.accession_code,
- batch.pipeline_required)
+ batch.internal_location = os.path.join(batch.platform_accession_code,
+ batch.pipeline_required)
@retry(stop_max_attempt_number=3)
- def save_batch_start_job():
- batch.save()
- downloader_job = DownloaderJob(batch=batch)
+ @transaction.atomic
+ def save_batches_start_job():
+ downloader_job = DownloaderJob()
downloader_job.save()
- self.downloader_task().delay(downloader_job.id)
+
+ for batch in batches:
+ batch.save()
+ downloader_job_to_batch = DownloaderJobsToBatches(batch=batch,
+ downloader_job=downloader_job)
+ downloader_job_to_batch.save()
+
+ app.send_task(self.downloader_task(), args=[downloader_job.id])
try:
- save_batch_start_job()
- except Exception as e:
- logger.error("Failed to save batch to database three times "
- + "because error: %s. Terminating survey job #%d.",
- type(e).__name__,
- self.survey_job.id)
+ save_batches_start_job()
+ except Exception:
+ logger.exception(("Failed to save batches to database three times. "
+ "Terminating survey job #%d."),
+ self.survey_job.id)
raise
@abc.abstractmethod
- def survey(self, survey_job: SurveyJob):
- """Implementations of this function should do the following:
+ def survey(self):
+ """Abstract method to survey a source.
+
+ Implementations of this method should do the following:
1. Query the external source to discover batches that should be
downloaded.
2. Create a Batch object for each discovered batch and optionally
diff --git a/foreman/data_refinery_foreman/surveyor/management/commands/start.py b/foreman/data_refinery_foreman/surveyor/management/commands/start.py
deleted file mode 100644
index 7bc5e4a89..000000000
--- a/foreman/data_refinery_foreman/surveyor/management/commands/start.py
+++ /dev/null
@@ -1,7 +0,0 @@
-from django.core.management.base import BaseCommand
-from data_refinery_foreman.surveyor import surveyor
-
-
-class Command(BaseCommand):
- def handle(self, *args, **options):
- surveyor.test()
diff --git a/foreman/data_refinery_foreman/surveyor/management/commands/survey.py b/foreman/data_refinery_foreman/surveyor/management/commands/survey.py
new file mode 100644
index 000000000..42de9d798
--- /dev/null
+++ b/foreman/data_refinery_foreman/surveyor/management/commands/survey.py
@@ -0,0 +1,31 @@
+"""
+This command will create and run survey jobs for each experiment in the
+experiment_list. experiment list should be a file containing one
+experiment accession code per line.
+"""
+
+from django.core.management.base import BaseCommand
+from data_refinery_foreman.surveyor import surveyor
+
+# Import and set logger
+import logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+class Command(BaseCommand):
+ def add_arguments(self, parser):
+ parser.add_argument(
+ "experiment_list",
+ help=("A file containing a list of experiment accession codes to "
+ "survey, download, and process. These should be listed one "
+ "per line. Should be a path relative to the foreman "
+ "directory."))
+
+ def handle(self, *args, **options):
+ if options["experiment_list"] is None:
+ logger.error("You must specify an experiment list.")
+ return 1
+ else:
+ surveyor.survey_experiments(options["experiment_list"])
+ return 0
diff --git a/foreman/data_refinery_foreman/surveyor/surveyor.py b/foreman/data_refinery_foreman/surveyor/surveyor.py
index af7b4a49f..5823da04a 100644
--- a/foreman/data_refinery_foreman/surveyor/surveyor.py
+++ b/foreman/data_refinery_foreman/surveyor/surveyor.py
@@ -57,7 +57,7 @@ def run_job(survey_job: SurveyJob):
return survey_job
try:
- job_success = surveyor.survey(survey_job)
+ job_success = surveyor.survey()
except Exception as e:
logger.error("Exception caught while running job #%d with message: %s",
survey_job.id,
@@ -73,8 +73,20 @@ def test():
survey_job = SurveyJob(source_type="ARRAY_EXPRESS")
survey_job.save()
key_value_pair = SurveyJobKeyValue(survey_job=survey_job,
- key="accession_code",
- value="A-AFFY-1")
+ key="experiment_accession_code",
+ value="E-MTAB-3050")
key_value_pair.save()
run_job(survey_job)
return
+
+
+def survey_experiments(experiments_list_file):
+ with open(experiments_list_file, "r") as experiments:
+ for experiment in experiments:
+ survey_job = SurveyJob(source_type="ARRAY_EXPRESS")
+ survey_job.save()
+ key_value_pair = SurveyJobKeyValue(survey_job=survey_job,
+ key="experiment_accession_code",
+ value=experiment.rstrip())
+ key_value_pair.save()
+ run_job(survey_job)
diff --git a/foreman/data_refinery_foreman/surveyor/test_array_express.py b/foreman/data_refinery_foreman/surveyor/test_array_express.py
index 8c7f28d7d..7cb26d9cb 100644
--- a/foreman/data_refinery_foreman/surveyor/test_array_express.py
+++ b/foreman/data_refinery_foreman/surveyor/test_array_express.py
@@ -1,151 +1,298 @@
import json
+import datetime
from unittest.mock import Mock, patch
from django.test import TestCase
from data_refinery_models.models import (
Batch,
+ DownloaderJob,
SurveyJob,
- SurveyJobKeyValue
+ SurveyJobKeyValue,
+ Organism
+)
+from data_refinery_foreman.surveyor.array_express import (
+ ArrayExpressSurveyor,
+ EXPERIMENTS_URL,
)
-from data_refinery_foreman.surveyor.array_express import ArrayExpressSurveyor
-
-
-class MockTask():
- def delay(self, id):
- return True
-class SurveyTestCase(TestCase):
- experiments_json = """
- {
- "files": {
+EXPERIMENTS_JSON = """
+{
+ "experiments": {
"api-revision": "091015",
- "api-version": 2,
+ "api-version": 3,
"experiment": [
{
"accession": "E-MTAB-3050",
+ "arraydesign": [
+ {
+ "accession": "A-AFFY-1",
+ "count": 5,
+ "id": 11048,
+ "legacy_id": 5728564,
+ "name": "Affymetrix GeneChip Human U95Av2 [HG_U95Av2]"
+ }
+ ],
+ "bioassaydatagroup": [
+ {
+ "arraydesignprovider": null,
+ "bioassaydatacubes": 5,
+ "bioassays": 5,
+ "dataformat": "rawData",
+ "id": null,
+ "isderived": 0,
+ "name": "rawData"
+ }
+ ],
+ "description": [
+ {
+ "id": null,
+ "text": "description tex"
+ }
+ ],
+ "experimentalvariable": [
+ {
+ "name": "cell type",
+ "value": [
+ "differentiated",
+ "expanded",
+ "freshly isolated"
+ ]
+ }
+ ],
+ "experimentdesign": [
+ "cell type comparison design",
+ "development or differentiation design"
+ ],
+ "experimenttype": [
+ "transcription profiling by array"
+ ],
+ "id": 511696,
+ "lastupdatedate": "2014-10-30",
+ "name": "Microarray analysis of in vitro differentiation",
+ "organism": [
+ "Homo sapiens"
+ ],
+ "protocol": [
+ {
+ "accession": "P-MTAB-41859",
+ "id": 1092859
+ }
+ ],
+ "provider": [
+ {
+ "contact": "Joel Habener",
+ "email": "jhabener@partners.org",
+ "role": "submitter"
+ }
+ ],
+ "releasedate": "2014-10-31",
+ "samplecharacteristic": [
+ {
+ "category": "age",
+ "value": [
+ "38 year",
+ "54 year"
+ ]
+ }
+ ]
+ }
+ ],
+ "revision": "091015",
+ "total": 1,
+ "total-assays": 5,
+ "total-samples": 2,
+ "version": 3.0
+ }
+} """
+
+SAMPLES_JSON = """
+{
+ "experiment": {
+ "accession": "E-MTAB-3050",
+ "api-revision": "091015",
+ "api-version": 3,
+ "revision": "091015",
+ "sample": [
+ {
+ "assay": {
+ "name": "1007409-C30057"
+ },
+ "characteristic": [
+ {
+ "category": "organism",
+ "value": "Homo sapiens"
+ }
+ ],
+ "extract": {
+ "name": "donor A islets RNA"
+ },
"file": [
{
- "extension": "zip",
- "kind": "raw",
- "lastmodified": "2014-10-30T10:15:00",
- "location": "E-MTAB-3050.raw.1.zip",
- "name": "E-MTAB-3050.raw.1.zip",
- "size": 14876114,
- "url":
- "http://www.ebi.ac.uk/arrayexpress/files/E-MTAB-3050/E-MTAB-3050.raw.1.zip"
+ "comment": {
+ "name": "ArrayExpress FTP file",
+ "value": "ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/MTAB/E-MTAB-3050/E-MTAB-3050.raw.1.zip"
+ },
+ "name": "C30057.CEL",
+ "type": "data",
+ "url": "ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/MTAB/E-MTAB-3050/E-MTAB-3050.raw.1.zip/C30057.CEL"
},
{
- "extension": "xls",
- "kind": "adf",
- "lastmodified": "2010-03-14T02:31:00",
- "location": "A-AFFY-1.adf.xls",
- "name": "A-AFFY-1.adf.xls",
- "size": 2040084,
- "url":
- "http://www.ebi.ac.uk/arrayexpress/files/A-AFFY-1/A-AFFY-1.adf.xls"
+ "comment": {
+ "name": "Derived ArrayExpress FTP file",
+ "value": "ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/MTAB/E-MTAB-3050/E-MTAB-3050.processed.1.zip"
+ },
+ "name": "C30057.txt",
+ "type": "derived data",
+ "url": "ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/MTAB/E-MTAB-3050/E-MTAB-3050.processed.1.zip/C30057.txt"
+ }
+ ],
+ "labeled-extract": {
+ "label": "biotin",
+ "name": "donor A islets LEX"
+ },
+ "source": {
+ "name": "donor A islets"
+ },
+ "variable": [
+ {
+ "name": "cell type",
+ "value": "freshly isolated"
}
]
},
{
- "accession": "E-MTAB-3042",
+ "assay": {
+ "name": "1007409-C30058"
+ },
+ "characteristic": [
+ {
+ "category": "organism",
+ "value": "Homo sapiens"
+ }
+ ],
+ "extract": {
+ "name": "donor A expanded cells RNA"
+ },
"file": [
{
- "extension": "txt",
- "kind": "idf",
- "lastmodified": "2014-10-28T10:15:00",
- "location": "E-MTAB-3042.idf.txt",
- "name": "E-MTAB-3042.idf.txt",
- "size": 5874,
- "url":
- "http://www.ebi.ac.uk/arrayexpress/files/E-MTAB-3042/E-MTAB-3042.idf.txt"
+ "comment": {
+ "name": "ArrayExpress FTP file",
+ "value": "ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/MTAB/E-MTAB-3050/E-MTAB-3050.raw.1.zip"
+ },
+ "name": "C30058.CEL",
+ "type": "data",
+ "url": "ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/MTAB/E-MTAB-3050/E-MTAB-3050.raw.1.zip/C30058.CEL"
},
{
- "extension": "zip",
- "kind": "raw",
- "lastmodified": "2014-10-28T10:15:00",
- "location": "E-MTAB-3042.raw.1.zip",
- "name": "E-MTAB-3042.raw.1.zip",
- "size": 5525709,
- "url":
- "http://www.ebi.ac.uk/arrayexpress/files/E-MTAB-3042/E-MTAB-3042.raw.1.zip"
+ "comment": {
+ "name": "Derived ArrayExpress FTP file",
+ "value": "ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/MTAB/E-MTAB-3050/E-MTAB-3050.processed.1.zip"
+ },
+ "name": "C30058.txt",
+ "type": "derived data",
+ "url": "ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/MTAB/E-MTAB-3050/E-MTAB-3050.processed.1.zip/C30058.txt"
+ }
+ ],
+ "labeled-extract": {
+ "label": "biotin",
+ "name": "donor A expanded cells LEX"
+ },
+ "source": {
+ "name": "donor A islets"
+ },
+ "variable": [
+ {
+ "name": "cell type",
+ "value": "expanded"
}
]
}
],
- "revision": 130311,
- "total-experiments": 108,
- "version": 1.2
+ "version": 1.0
}
-}
-"""
+}""" # noqa
+
+
+def mocked_requests_get(url):
+ mock = Mock(ok=True)
+ if url == (EXPERIMENTS_URL + "E-MTAB-3050"):
+ mock.json.return_value = json.loads(EXPERIMENTS_JSON)
+ else:
+ mock.json.return_value = json.loads(SAMPLES_JSON)
+ return mock
+
+
+class SurveyTestCase(TestCase):
def setUp(self):
survey_job = SurveyJob(source_type="ARRAY_EXPRESS")
survey_job.save()
self.survey_job = survey_job
key_value_pair = SurveyJobKeyValue(survey_job=survey_job,
- key="accession_code",
- value="A-AFFY-1")
+ key="experiment_accession_code",
+ value="E-MTAB-3050")
key_value_pair.save()
+ # Insert the organism into the database so the model doesn't call the
+ # taxonomy API to populate it.
+ organism = Organism(name="HOMO SAPIENS",
+ taxonomy_id=9606,
+ is_scientific_name=True)
+ organism.save()
+
def tearDown(self):
SurveyJob.objects.all().delete()
SurveyJobKeyValue.objects.all().delete()
Batch.objects.all().delete
- @patch("data_refinery_foreman.surveyor.array_express.requests.get")
- @patch("data_refinery_foreman.surveyor.array_express.ArrayExpressSurveyor.downloader_task") # noqa
- def test_multiple_experiements(self, mock_task, mock_get):
- """Multiple experiments are turned into multiple batches"""
+ @patch('data_refinery_foreman.surveyor.array_express.requests.get')
+ def test_experiment_object(self, mock_get):
+ """The get_experiment_metadata function extracts all experiment metadata
+ from the experiments API."""
mock_get.return_value = Mock(ok=True)
- mock_get.return_value.json.return_value = json.loads(
- self.experiments_json)
- mock_task.return_value = MockTask()
+ mock_get.return_value.json.return_value = json.loads(EXPERIMENTS_JSON)
- ae_surveyor = ArrayExpressSurveyor(self.survey_job)
- self.assertTrue(ae_surveyor.survey(self.survey_job))
- self.assertEqual(2, Batch.objects.all().count())
-
- # Note that this json has the `file` key mapped to a dictionary
- # instead of a list. This is behavior exhibited by the API
- # and is being tested on purpose here.
- experiment_json = """
- {
- "files": {
- "api-revision": "091015",
- "api-version": 2,
- "experiment": [
- {
- "accession": "E-MTAB-3050",
- "file": {
- "extension": "zip",
- "kind": "raw",
- "lastmodified": "2014-10-30T10:15:00",
- "location": "E-MTAB-3050.raw.1.zip",
- "name": "E-MTAB-3050.raw.1.zip",
- "size": 14876114,
- "url":
- "http://www.ebi.ac.uk/arrayexpress/files/E-MTAB-3050/E-MTAB-3050.raw.1.zip"
- }
- }
- ],
- "revision": 130311,
- "total-experiments": 108,
- "version": 1.2
- }
-}
-"""
+ experiment = ArrayExpressSurveyor.get_experiment_metadata("E-MTAB-3050")
+ self.assertEqual("Microarray analysis of in vitro differentiation", experiment["name"])
+ self.assertEqual("E-MTAB-3050", experiment["experiment_accession_code"])
+ self.assertEqual("A-AFFY-1", experiment["platform_accession_code"])
+ self.assertEqual("2014-10-31", experiment["release_date"])
+ self.assertEqual("2014-10-30", experiment["last_update_date"])
@patch('data_refinery_foreman.surveyor.array_express.requests.get')
- @patch("data_refinery_foreman.surveyor.array_express.ArrayExpressSurveyor.downloader_task") # noqa
- def test_single_experiment(self, mock_task, mock_get):
- """A single experiment is turned into a single batch."""
- mock_get.return_value = Mock(ok=True)
- mock_get.return_value.json.return_value = json.loads(
- self.experiment_json)
- mock_task.return_value = MockTask()
+ @patch('data_refinery_foreman.surveyor.message_queue.app.send_task')
+ def test_survey(self, mock_send_task, mock_get):
+ """survey generates one Batch per sample with all possible fields populated.
+ This test also tests the handle_batches method of ExternalSourceSurveyor
+ which isn't tested on its own because it is an abstract class."""
+ mock_send_task.return_value = Mock(ok=True)
+ mock_get.side_effect = mocked_requests_get
ae_surveyor = ArrayExpressSurveyor(self.survey_job)
- self.assertTrue(ae_surveyor.survey(self.survey_job))
- self.assertEqual(1, Batch.objects.all().count())
+ ae_surveyor.survey()
+
+ self.assertEqual(2, len(mock_send_task.mock_calls))
+ batches = Batch.objects.all()
+ self.assertEqual(2, len(batches))
+ downloader_jobs = DownloaderJob.objects.all()
+ self.assertEqual(2, len(downloader_jobs))
+
+ batch = batches[0]
+ self.assertEqual(batch.survey_job.id, self.survey_job.id)
+ self.assertEqual(batch.source_type, "ARRAY_EXPRESS")
+ self.assertEqual(batch.size_in_bytes, -1)
+ self.assertEqual(batch.download_url, "ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/MTAB/E-MTAB-3050/E-MTAB-3050.raw.1.zip/C30057.CEL") # noqa
+ self.assertEqual(batch.raw_format, "CEL")
+ self.assertEqual(batch.processed_format, "PCL")
+ self.assertEqual(batch.pipeline_required, "AFFY_TO_PCL")
+ self.assertEqual(batch.platform_accession_code, "A-AFFY-1")
+ self.assertEqual(batch.experiment_accession_code, "E-MTAB-3050")
+ self.assertEqual(batch.experiment_title, "Microarray analysis of in vitro differentiation")
+ self.assertEqual(batch.status, "NEW")
+ self.assertEqual(batch.release_date, datetime.date(2014, 10, 31))
+ self.assertEqual(batch.last_uploaded_date, datetime.date(2014, 10, 30))
+ self.assertEqual(batch.name, "C30057.CEL")
+ self.assertEqual(batch.internal_location, "A-AFFY-1/AFFY_TO_PCL/")
+ self.assertEqual(batch.organism_id, 9606)
+ self.assertEqual(batch.organism_name, "HOMO SAPIENS")
diff --git a/foreman/data_refinery_foreman/surveyor/test_surveyor.py b/foreman/data_refinery_foreman/surveyor/test_surveyor.py
index 359496b8d..4662a03b8 100644
--- a/foreman/data_refinery_foreman/surveyor/test_surveyor.py
+++ b/foreman/data_refinery_foreman/surveyor/test_surveyor.py
@@ -26,7 +26,7 @@ def test_calls_survey(self, survey_method):
job.save()
surveyor.run_job(job)
- survey_method.assert_called_with(job)
+ survey_method.assert_called()
self.assertIsInstance(job.replication_ended_at, datetime.datetime)
self.assertIsInstance(job.start_time, datetime.datetime)
self.assertIsInstance(job.end_time, datetime.datetime)
diff --git a/foreman/requirements.in b/foreman/requirements.in
index 4095eb400..61c1f238e 100644
--- a/foreman/requirements.in
+++ b/foreman/requirements.in
@@ -2,4 +2,5 @@ django
celery
psycopg2
requests
+GEOparse
retrying
diff --git a/foreman/requirements.txt b/foreman/requirements.txt
index 60a1060f3..40caa0728 100644
--- a/foreman/requirements.txt
+++ b/foreman/requirements.txt
@@ -8,10 +8,15 @@ amqp==2.1.4 # via kombu
billiard==3.5.0.2 # via celery
celery==4.0.2
django==1.10.6
+GEOparse==0.1.10
kombu==4.0.2 # via celery
+numpy==1.12.1 # via geoparse, pandas
+pandas==0.19.2 # via geoparse
psycopg2==2.7.1
-pytz==2016.10 # via celery
+python-dateutil==2.6.0 # via pandas
+pytz==2017.2 # via celery, django, pandas
requests==2.13.0
retrying==1.3.3
-six==1.10.0 # via retrying
+six==1.10.0 # via python-dateutil, retrying
vine==1.1.3 # via amqp
+wgetter==0.6 # via geoparse
diff --git a/foreman/run_surveyor.sh b/foreman/run_surveyor.sh
index d941d228f..e88b381f0 100755
--- a/foreman/run_surveyor.sh
+++ b/foreman/run_surveyor.sh
@@ -11,12 +11,20 @@ cd $script_directory
# move up a level
cd ..
+# Set up the data volume directory if it does not already exist
+volume_directory="$script_directory/volume"
+if [ ! -d "$volume_directory" ]; then
+ mkdir $volume_directory
+ chmod 775 $volume_directory
+fi
+
docker build -t dr_foreman -f foreman/Dockerfile .
HOST_IP=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
docker run \
- --link some-rabbit:rabbit \
+ --link message-queue:rabbit \
--add-host=database:$HOST_IP \
--env-file foreman/environments/dev \
- dr_foreman
+ --volume $volume_directory:/home/user/data_store \
+ dr_foreman survey "$@"
diff --git a/requirements.in b/requirements.in
index bf848a288..71a4ddceb 100644
--- a/requirements.in
+++ b/requirements.in
@@ -2,3 +2,5 @@ django
psycopg2
pip-tools
autopep8
+flake8
+requests
diff --git a/requirements.txt b/requirements.txt
index a5296e1de..6d68f38b4 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -8,7 +8,11 @@ autopep8==1.3.1
click==6.7 # via pip-tools
django==1.10.6
first==2.0.1 # via pip-tools
+flake8==3.3.0
+mccabe==0.6.1 # via flake8
pip-tools==1.9.0
psycopg2==2.7.1
-pycodestyle==2.3.1 # via autopep8
+pycodestyle==2.3.1 # via autopep8, flake8
+pyflakes==1.5.0 # via flake8
+requests==2.13.0
six==1.10.0 # via pip-tools
diff --git a/run_rabbitmq.sh b/run_rabbitmq.sh
new file mode 100755
index 000000000..7477b963d
--- /dev/null
+++ b/run_rabbitmq.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+docker run -d --hostname rabbit-queue --name message-queue rabbitmq:3
diff --git a/run_shell.sh b/run_shell.sh
index 3e45fdf24..557a50703 100755
--- a/run_shell.sh
+++ b/run_shell.sh
@@ -5,21 +5,30 @@
# By default the Docker container will be for the foreman project.
# This can be changed by modifying the --env-file command line arg,
# changing foreman/Dockerfile to the appropriate Dockerfile,
+# changing the volume_directory path,
# and by modifying the Dockerfile.shell file appropriately.
# This script should always run as if it were being called from
# the directory it lives in.
-script_directory=`dirname "${BASH_SOURCE[0]}"`
+script_directory=`cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd`
cd $script_directory
+# Set up the data volume directory if it does not already exist
+volume_directory="$script_directory/foreman/volume"
+if [ ! -d "$volume_directory" ]; then
+ mkdir $volume_directory
+ chmod 775 $volume_directory
+fi
+
docker build -t dr_shell -f foreman/Dockerfile .
HOST_IP=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
docker run \
- --link some-rabbit:rabbit \
+ --link message-queue:rabbit \
--add-host=database:$HOST_IP \
--env-file foreman/environments/dev \
--volume /tmp:/tmp \
+ --volume $volume_directory:/home/user/data_store \
--entrypoint ./manage.py \
--interactive dr_shell shell
diff --git a/schema.png b/schema.png
new file mode 100644
index 000000000..d4e22c16b
Binary files /dev/null and b/schema.png differ
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 000000000..2be2471d9
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,5 @@
+[pep8]
+max-line-length: 99
+
+[flake8]
+max-line-length: 99
diff --git a/workers/Dockerfile b/workers/Dockerfile
index 4a37b858d..b696681e5 100644
--- a/workers/Dockerfile
+++ b/workers/Dockerfile
@@ -18,4 +18,4 @@ COPY workers/ .
USER user
-ENTRYPOINT ["celery", "-l", "info", "-A", "data_refinery_workers", "-c", "1", "worker"]
+CMD ["celery", "-l", "info", "-A", "data_refinery_workers", "-c", "1", "worker"]
diff --git a/workers/Dockerfile.tests b/workers/Dockerfile.tests
new file mode 100644
index 000000000..3c55740aa
--- /dev/null
+++ b/workers/Dockerfile.tests
@@ -0,0 +1,23 @@
+FROM continuumio/miniconda3:4.3.11
+
+RUN groupadd user && useradd --create-home --home-dir /home/user -g user user
+WORKDIR /home/user
+
+COPY workers/environment.yml .
+
+RUN conda env create -f environment.yml
+
+ENV PATH "/opt/conda/envs/workers/bin:$PATH"
+
+COPY data_models/dist/data-refinery-models-* data_models/
+
+# Get the latest version from the dist directory.
+RUN pip install data_models/$(ls data_models -1 | tail -1)
+
+COPY workers/ .
+
+USER user
+
+ENTRYPOINT ["python3.6", "manage.py"]
+
+CMD ["test", "--no-input"]
diff --git a/workers/data_refinery_workers/downloaders/array_express.py b/workers/data_refinery_workers/downloaders/array_express.py
index e88d9e7bf..f51179ddc 100644
--- a/workers/data_refinery_workers/downloaders/array_express.py
+++ b/workers/data_refinery_workers/downloaders/array_express.py
@@ -1,10 +1,20 @@
from __future__ import absolute_import, unicode_literals
-import requests
+import urllib.request
+import os
+import shutil
+import zipfile
+from typing import List
+from contextlib import closing
from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.exceptions import ObjectDoesNotExist
-from data_refinery_models.models import Batch, DownloaderJob
+from data_refinery_models.models import (
+ Batch,
+ DownloaderJob,
+ DownloaderJobsToBatches
+)
from data_refinery_workers.downloaders import utils
+import logging
logger = get_task_logger(__name__)
@@ -12,8 +22,50 @@
CHUNK_SIZE = 1024 * 256
+def _verify_batch_grouping(batches: List[Batch], job_id):
+ """All batches in the same job should have the same downloader url"""
+ for batch in batches:
+ if batch.download_url != batches[0].download_url:
+ logger.error(("A Batch doesn't have the same download URL as the other batches"
+ " in downloader job #%d."),
+ job_id)
+ raise ValueError("A batch doesn't have the same download url as other batches.")
+
+
+def _download_file(download_url, file_path, job_id):
+ try:
+ logger.debug("Downloading file from %s to %s. (Job #%d)",
+ download_url,
+ file_path,
+ job_id)
+ target_file = open(file_path, "wb")
+ with closing(urllib.request.urlopen(download_url)) as request:
+ shutil.copyfileobj(request, target_file, CHUNK_SIZE)
+ except Exception:
+ logging.exception("Exception caught while running Job #%d.",
+ job_id)
+ raise
+ finally:
+ target_file.close()
+
+
+def _extract_file(file_path, job_id):
+ try:
+ zip_ref = zipfile.ZipFile(file_path, 'r')
+ zip_ref.extractall(os.path.dirname(file_path))
+ except Exception:
+ logging.exception("Exception caught while extracting %s during Job #%d.",
+ file_path,
+ job_id)
+ raise
+ finally:
+ zip_ref.close()
+ os.remove(file_path)
+
+
@shared_task
def download_array_express(job_id):
+ logger.debug("Starting job with id: %s.", job_id)
try:
job = DownloaderJob.objects.get(id=job_id)
except ObjectDoesNotExist:
@@ -23,44 +75,34 @@ def download_array_express(job_id):
success = True
utils.start_job(job)
- try:
- batch = Batch.objects.get(id=job.batch_id)
- except ObjectDoesNotExist:
- logger.error("Cannot find batch record with ID %d.", job.batch_id)
- utils.end_job(job, None, False)
- return
+ batch_relations = DownloaderJobsToBatches.objects.filter(downloader_job_id=job_id)
+ batches = [br.batch for br in batch_relations]
- target_file_path = utils.prepare_destination(batch)
+ if len(batches) > 0:
+ target_file_path = utils.prepare_destination(batches[0])
+ download_url = batches[0].download_url
+ else:
+ logger.error("No batches found for job #%d.",
+ job_id)
+ success = False
- logger.info("Downloading file from %s to %s. (Batch #%d, Job #%d)",
- batch.download_url,
- target_file_path,
- batch.id,
- job_id)
+ if success:
+ try:
+ _verify_batch_grouping(batches, job_id)
- try:
- target_file = open(target_file_path, "wb")
- request = requests.get(batch.download_url, stream=True)
-
- for chunk in request.iter_content(CHUNK_SIZE):
- if chunk:
- target_file.write(chunk)
- target_file.flush()
- except Exception as e:
- success = False
- logger.error("Exception caught while running Job #%d for Batch #%d "
- + "with message: %s",
- job_id,
- batch.id,
- e)
- finally:
- target_file.close()
- request.close()
+ # The files for all of the batches in the grouping are
+ # contained within the same zip file. Therefore only
+ # download the one.
+ _download_file(download_url, target_file_path, job_id)
+ _extract_file(target_file_path, job_id)
+ except Exception:
+ # Exceptions are already logged and handled.
+ # Just need to mark the job as failed.
+ success = False
if success:
- logger.info("File %s (Batch #%d) downloaded successfully in Job #%d.",
- batch.download_url,
- batch.id,
- job_id)
+ logger.debug("File %s downloaded and extracted successfully in Job #%d.",
+ download_url,
+ job_id)
- utils.end_job(job, batch, success)
+ utils.end_job(job, batches, success)
diff --git a/workers/data_refinery_workers/downloaders/management/commands/queue_downloader.py b/workers/data_refinery_workers/downloaders/management/commands/queue_downloader.py
index 856956197..61c20cac0 100644
--- a/workers/data_refinery_workers/downloaders/management/commands/queue_downloader.py
+++ b/workers/data_refinery_workers/downloaders/management/commands/queue_downloader.py
@@ -8,7 +8,8 @@
SurveyJob,
Batch,
BatchStatuses,
- DownloaderJob
+ DownloaderJob,
+ DownloaderJobsToBatches
)
from data_refinery_workers.downloaders.array_express \
import download_array_express
@@ -33,18 +34,27 @@ def handle(self, *args, **options):
survey_job=survey_job,
source_type="ARRAY_EXPRESS",
size_in_bytes=0,
- download_url="http://www.ebi.ac.uk/arrayexpress/files/E-MTAB-3050/E-MTAB-3050.raw.1.zip", # noqa
- raw_format="MICRO_ARRAY",
+ download_url="ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/GEOD/E-GEOD-59071/E-GEOD-59071.raw.3.zip", # noqa
+ raw_format="CEL",
processed_format="PCL",
- pipeline_required="MICRO_ARRAY_TO_PCL",
- accession_code="A-AFFY-1",
- internal_location="A-AFFY-1/MICRO_ARRAY_TO_PCL/",
- organism=1,
+ pipeline_required="AFFY_TO_PCL",
+ platform_accession_code="A-AFFY-141",
+ experiment_accession_code="E-GEOD-59071",
+ experiment_title="It doesn't really matter.",
+ name="GSM1426072_CD_colon_active_2.CEL",
+ internal_location="A-AFFY-141/AFFY_TO_PCL/",
+ organism_id=9606,
+ organism_name="HOMO SAPIENS",
+ release_date="2017-05-05",
+ last_uploaded_date="2017-05-05",
status=BatchStatuses.NEW.value
)
batch.save()
- downloader_job = DownloaderJob(batch=batch)
+ downloader_job = DownloaderJob()
downloader_job.save()
+ downloader_job_to_batch = DownloaderJobsToBatches(batch=batch,
+ downloader_job=downloader_job)
+ downloader_job_to_batch.save()
logger.info("Queuing a task.")
download_array_express.delay(downloader_job.id)
diff --git a/workers/data_refinery_workers/downloaders/management/commands/queue_task.py b/workers/data_refinery_workers/downloaders/management/commands/queue_task.py
deleted file mode 100644
index aedb718ac..000000000
--- a/workers/data_refinery_workers/downloaders/management/commands/queue_task.py
+++ /dev/null
@@ -1,47 +0,0 @@
-from django.core.management.base import BaseCommand
-from data_refinery_models.models import (
- SurveyJob,
- Batch,
- BatchStatuses,
- DownloaderJob
-)
-from data_refinery_workers.downloaders.array_express \
- import download_array_express
-
-
-# Import and set logger
-import logging
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
-
-# Just a temporary way to queue a celery task
-# without running the surveyor.
-class Command(BaseCommand):
- def handle(self, *args, **options):
- # Create all the dummy data that would have been created
- # before a downloader job could have been generated.
- survey_job = SurveyJob(
- source_type="ARRAY_EXPRESS"
- )
- survey_job.save()
-
- batch = Batch(
- survey_job=survey_job,
- source_type="ARRAY_EXPRESS",
- size_in_bytes=0,
- download_url="http://www.ebi.ac.uk/arrayexpress/files/E-MTAB-3050/E-MTAB-3050.raw.1.zip", # noqa
- raw_format="MICRO_ARRAY",
- processed_format="PCL",
- pipeline_required="MICRO_ARRAY_TO_PCL",
- accession_code="A-AFFY-1",
- internal_location="expression_data/array_express/A-AFFY-1/",
- organism=1,
- status=BatchStatuses.NEW.value
- )
- batch.save()
-
- downloader_job = DownloaderJob(batch=batch)
- downloader_job.save()
- logger.info("Queuing a task.")
- download_array_express.delay(downloader_job.id)
diff --git a/workers/data_refinery_workers/downloaders/management/commands/queue_test_task.py b/workers/data_refinery_workers/downloaders/management/commands/queue_test_task.py
deleted file mode 100644
index 9befd38b3..000000000
--- a/workers/data_refinery_workers/downloaders/management/commands/queue_test_task.py
+++ /dev/null
@@ -1,48 +0,0 @@
-from django.core.management.base import BaseCommand
-from data_refinery_models.models import (
- SurveyJob,
- Batch,
- BatchStatuses,
- DownloaderJob
-)
-from data_refinery_workers.downloaders.array_express \
- import download_array_express
-
-
-# Import and set logger
-import logging
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
-
-class Command(BaseCommand):
- """Just a temporary way to queue a celery task without running
- the surveyor."""
-
- def handle(self, *args, **options):
- # Create all the dummy data that would have been created
- # before a downloader job could have been generated.
- survey_job = SurveyJob(
- source_type="ARRAY_EXPRESS"
- )
- survey_job.save()
-
- batch = Batch(
- survey_job=survey_job,
- source_type="ARRAY_EXPRESS",
- size_in_bytes=0,
- download_url="http://www.ebi.ac.uk/arrayexpress/files/E-MTAB-3050/E-MTAB-3050.raw.1.zip", # noqa
- raw_format="MICRO_ARRAY",
- processed_format="PCL",
- pipeline_required="MICRO_ARRAY_TO_PCL",
- accession_code="A-AFFY-1",
- internal_location="expression_data/array_express/A-AFFY-1/",
- organism=1,
- status=BatchStatuses.NEW.value
- )
- batch.save()
-
- downloader_job = DownloaderJob(batch=batch)
- downloader_job.save()
- logger.info("Queuing a test task.")
- download_array_express.delay(downloader_job.id)
diff --git a/workers/data_refinery_workers/downloaders/test_array_express.py b/workers/data_refinery_workers/downloaders/test_array_express.py
new file mode 100644
index 000000000..e2e825469
--- /dev/null
+++ b/workers/data_refinery_workers/downloaders/test_array_express.py
@@ -0,0 +1,117 @@
+import copy
+from unittest.mock import patch, MagicMock
+from django.test import TestCase
+from data_refinery_models.models import (
+ SurveyJob,
+ Batch,
+ BatchStatuses,
+ DownloaderJob,
+ DownloaderJobsToBatches,
+ ProcessorJob,
+ ProcessorJobsToBatches
+)
+from data_refinery_workers.downloaders import array_express
+
+
+class DownloadArrayExpressTestCase(TestCase):
+ def test_good_batch_grouping(self):
+ """Returns true if all batches have the same download_url."""
+ batches = [Batch(download_url="https://example.com"),
+ Batch(download_url="https://example.com"),
+ Batch(download_url="https://example.com")]
+ job_id = 1
+ self.assertIsNone(array_express._verify_batch_grouping(batches, job_id))
+
+ def test_bad_batch_grouping(self):
+ """Raises exception if all batches don't have the same download_url."""
+ batches = [Batch(download_url="https://example.com"),
+ Batch(download_url="https://example.com"),
+ Batch(download_url="https://wompwomp.com")]
+ job_id = 1
+ with self.assertRaises(ValueError):
+ array_express._verify_batch_grouping(batches, job_id)
+
+ @patch("data_refinery_workers.downloaders.array_express.utils.processor_pipeline_registry")
+ @patch("data_refinery_workers.downloaders.array_express._verify_batch_grouping")
+ @patch("data_refinery_workers.downloaders.array_express._download_file")
+ @patch("data_refinery_workers.downloaders.array_express._extract_file")
+ @patch("data_refinery_workers.downloaders.array_express.utils.prepare_destination")
+ def test_download(self,
+ prepare_destination,
+ _extract_file,
+ _download_file,
+ _verify_batch_grouping,
+ pipeline_registry):
+ # Set up mocks:
+ mock_processor_task = MagicMock()
+ mock_processor_task.delay = MagicMock()
+ mock_processor_task.delay.return_value = None
+ pipeline_registry.__getitem__ = MagicMock()
+ pipeline_registry.__getitem__.return_value = mock_processor_task
+ target_file_path = "target_file_path"
+ prepare_destination.return_value = "target_file_path"
+
+ # Set up database records:
+ survey_job = SurveyJob(
+ source_type="ARRAY_EXPRESS"
+ )
+ survey_job.save()
+
+ download_url = "ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/GEOD/E-GEOD-59071/E-GEOD-59071.raw.3.zip/GSM1426072_CD_colon_active_2.CEL" # noqa
+ batch = Batch(
+ survey_job=survey_job,
+ source_type="ARRAY_EXPRESS",
+ size_in_bytes=0,
+ download_url=download_url,
+ raw_format="CEL",
+ processed_format="PCL",
+ pipeline_required="AFFY_TO_PCL",
+ platform_accession_code="A-AFFY-1",
+ experiment_accession_code="E-MTAB-3050",
+ experiment_title="It doesn't really matter.",
+ name="CE1234",
+ internal_location="A-AFFY-1/MICRO_ARRAY_TO_PCL/",
+ organism_id=9606,
+ organism_name="HOMO SAPIENS",
+ release_date="2017-05-05",
+ last_uploaded_date="2017-05-05",
+ status=BatchStatuses.NEW.value
+ )
+ batch2 = copy.deepcopy(batch)
+ batch2.name = "CE2345"
+ batch.save()
+ batch2.save()
+
+ downloader_job = DownloaderJob()
+ downloader_job.save()
+ downloader_job_to_batch = DownloaderJobsToBatches(batch=batch,
+ downloader_job=downloader_job)
+ downloader_job_to_batch.save()
+ downloader_job_to_batch2 = DownloaderJobsToBatches(batch=batch2,
+ downloader_job=downloader_job)
+ downloader_job_to_batch2.save()
+
+ # Call the task we're testing:
+ array_express.download_array_express.apply(args=(downloader_job.id,)).get()
+
+ # Verify that all expected functionality is run:
+ prepare_destination.assert_called_once()
+ _verify_batch_grouping.assert_called_once()
+ _download_file.assert_called_with(download_url, target_file_path, downloader_job.id)
+ _extract_file.assert_called_with(target_file_path, downloader_job.id)
+
+ mock_processor_task.delay.assert_called()
+
+ # Verify that the database has been updated correctly:
+ batches = Batch.objects.all()
+ for batch in batches:
+ self.assertEqual(batch.status, BatchStatuses.DOWNLOADED.value)
+
+ downloader_job = DownloaderJob.objects.get()
+ self.assertTrue(downloader_job.success)
+ self.assertIsNotNone(downloader_job.end_time)
+
+ processor_jobs = ProcessorJob.objects.all()
+ self.assertEqual(len(processor_jobs), 2)
+ processor_jobs_to_batches = ProcessorJobsToBatches.objects.all()
+ self.assertEqual(len(processor_jobs_to_batches), 2)
diff --git a/workers/data_refinery_workers/downloaders/utils.py b/workers/data_refinery_workers/downloaders/utils.py
index baa054e4d..4a70d4073 100644
--- a/workers/data_refinery_workers/downloaders/utils.py
+++ b/workers/data_refinery_workers/downloaders/utils.py
@@ -2,11 +2,13 @@
import urllib
from retrying import retry
from django.utils import timezone
+from django.db import transaction
from data_refinery_models.models import (
Batch,
BatchStatuses,
DownloaderJob,
- ProcessorJob
+ ProcessorJob,
+ ProcessorJobsToBatches
)
from data_refinery_workers.processors.processor_registry \
import processor_pipeline_registry
@@ -27,21 +29,23 @@ def start_job(job: DownloaderJob):
job.save()
-def end_job(job: DownloaderJob, batch: Batch, success):
- """Record in the database that this job has completed,
- create a processor job, and queue a processor task."""
- job.success = success
- job.end_time = timezone.now()
- job.save()
+def end_job(job: DownloaderJob, batches: Batch, success):
+ """Record in the database that this job has completed.
+ Create a processor job and queue a processor task for each batch
+ if the job was successful.
+ """
@retry(stop_max_attempt_number=3)
- def save_batch_create_job():
+ def save_batch_create_job(batch):
batch.status = BatchStatuses.DOWNLOADED.value
batch.save()
- logger.info("Creating processor job for batch #%d.", batch.id)
- processor_job = ProcessorJob(batch=batch)
+ logger.debug("Creating processor job for batch #%d.", batch.id)
+ processor_job = ProcessorJob()
processor_job.save()
+ processor_job_to_batch = ProcessorJobsToBatches(batch=batch,
+ processor_job=processor_job)
+ processor_job_to_batch.save()
return processor_job
@retry(stop_max_attempt_number=3)
@@ -49,14 +53,23 @@ def queue_task(processor_job):
processor_task = processor_pipeline_registry[batch.pipeline_required]
processor_task.delay(processor_job.id)
- if batch is not None:
- processor_job = save_batch_create_job()
- queue_task(processor_job)
+ if success:
+ for batch in batches:
+ with transaction.atomic():
+ processor_job = save_batch_create_job(batch)
+ queue_task(processor_job)
+
+ job.success = success
+ job.end_time = timezone.now()
+ job.save()
def prepare_destination(batch: Batch):
- """Prepare the destination directory and return the full
- path the Batch's file should be downloaded to."""
+ """Prepare the destination directory for the batch.
+
+ Also returns the full path the Batch's file should be downloaded
+ to.
+ """
target_directory = os.path.join(ROOT_URI, batch.internal_location)
os.makedirs(target_directory, exist_ok=True)
diff --git a/workers/data_refinery_workers/processors/array_express.py b/workers/data_refinery_workers/processors/array_express.py
index 492eebd8b..0836864e9 100644
--- a/workers/data_refinery_workers/processors/array_express.py
+++ b/workers/data_refinery_workers/processors/array_express.py
@@ -1,6 +1,15 @@
+"""This processor is currently out of date. It is designed to process
+multiple CEL files at a time, but that is not how we are going to
+process Array Express files. I have rewritten the Array Express
+surveyor/downloader to support this, but we don't have the new
+processor yet. This will run, which is good enough for testing
+the system, however since it will change so much the processor
+itself is not yet tested.
+"""
+
+
from __future__ import absolute_import, unicode_literals
import os
-import zipfile
from typing import Dict
import rpy2.robjects as ro
from celery import shared_task
@@ -11,33 +20,18 @@
def cel_to_pcl(kwargs: Dict):
- batch = kwargs["batch"]
+ # Array Express processor jobs have one batch per job.
+ batch = kwargs["batches"][0]
- temp_directory = utils.ROOT_URI + "temp/" + batch.internal_location
+ from_directory = utils.ROOT_URI + "raw/" + batch.internal_location
target_directory = utils.ROOT_URI + "processed/" + batch.internal_location
- os.makedirs(temp_directory, exist_ok=True)
os.makedirs(target_directory, exist_ok=True)
-
- raw_file_name = batch.download_url.split('/')[-1]
- zip_location = (utils.ROOT_URI + "raw/" + batch.internal_location
- + raw_file_name)
-
- if os.path.isfile(zip_location):
- zip_ref = zipfile.ZipFile(zip_location, 'r')
- zip_ref.extractall(temp_directory)
- zip_ref.close()
- else:
- logger.error("Missing file: %s", zip_location)
- return {"success": False}
-
- # Experiment code should be added to the batches data model
- experiment_code = raw_file_name.split('/')[0]
- new_name = experiment_code + ".pcl"
+ new_name = batch.name + "." + batch.processed_format
ro.r('source("/home/user/r_processors/process_cel_to_pcl.R")')
ro.r['ProcessCelFiles'](
- temp_directory,
- "Hs", # temporary until organism discovery is working
+ from_directory,
+ "Hs", # temporary until organism handling is more defined
target_directory + new_name)
return kwargs
@@ -48,5 +42,4 @@ def affy_to_pcl(job_id):
utils.run_pipeline({"job_id": job_id},
[utils.start_job,
cel_to_pcl,
- utils.cleanup_temp_data,
utils.end_job])
diff --git a/workers/data_refinery_workers/processors/management/commands/queue_processor.py b/workers/data_refinery_workers/processors/management/commands/queue_processor.py
index eae9bbade..a75f3c3bb 100644
--- a/workers/data_refinery_workers/processors/management/commands/queue_processor.py
+++ b/workers/data_refinery_workers/processors/management/commands/queue_processor.py
@@ -1,20 +1,21 @@
"""This command is intended for development purposes.
It creates the database records necessary for a processor job to
run and queues one. It assumes that the file
-/home/user/data_store/raw/A-AFFY-1/MICRO_ARRAY_TO_PCL/E-MTAB-3050.raw.1.zip
+/home/user/data_store/raw/A-AFFY-141/AFFY_TO_PCL/GSM1426072_CD_colon_active_2.CEL
exists.
The easiest way to run this is with the tester.sh script.
-(Changing queue_downloader to queue_processor.)"""
+(Changing queue_downloader to queue_processor.)
+"""
from django.core.management.base import BaseCommand
from data_refinery_models.models import (
SurveyJob,
Batch,
BatchStatuses,
- ProcessorJob
+ ProcessorJob,
+ ProcessorJobsToBatches
)
-from data_refinery_workers.processors.array_express \
- import process_array_express
+from data_refinery_workers.processors.array_express import affy_to_pcl
# Import and set logger
@@ -36,18 +37,27 @@ def handle(self, *args, **options):
survey_job=survey_job,
source_type="ARRAY_EXPRESS",
size_in_bytes=0,
- download_url="http://www.ebi.ac.uk/arrayexpress/files/E-MTAB-3050/E-MTAB-3050.raw.1.zip", # noqa
- raw_format="MICRO_ARRAY",
+ download_url="ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/GEOD/E-GEOD-59071/E-GEOD-59071.raw.3.zip", # noqa
+ raw_format="CEL",
processed_format="PCL",
- pipeline_required="MICRO_ARRAY_TO_PCL",
- accession_code="A-AFFY-1",
- internal_location="A-AFFY-1/MICRO_ARRAY_TO_PCL/",
- organism=1,
+ pipeline_required="AFFY_TO_PCL",
+ platform_accession_code="A-AFFY-141",
+ experiment_accession_code="E-GEOD-59071",
+ experiment_title="It doesn't really matter.",
+ name="GSM1426072_CD_colon_active_2.CEL",
+ internal_location="A-AFFY-141/AFFY_TO_PCL/",
+ organism_id=9606,
+ organism_name="HOMO SAPIENS",
+ release_date="2017-05-05",
+ last_uploaded_date="2017-05-05",
status=BatchStatuses.NEW.value
)
batch.save()
- processor_job = ProcessorJob(batch=batch)
+ processor_job = ProcessorJob()
processor_job.save()
+ downloader_job_to_batch = ProcessorJobsToBatches(batch=batch,
+ processor_job=processor_job)
+ downloader_job_to_batch.save()
logger.info("Queuing a processor job.")
- process_array_express.delay(processor_job.id)
+ affy_to_pcl.delay(processor_job.id)
diff --git a/workers/data_refinery_workers/processors/test_utils.py b/workers/data_refinery_workers/processors/test_utils.py
new file mode 100644
index 000000000..c88fe3400
--- /dev/null
+++ b/workers/data_refinery_workers/processors/test_utils.py
@@ -0,0 +1,180 @@
+import copy
+from unittest.mock import patch, MagicMock
+from django.test import TestCase
+from data_refinery_models.models import (
+ SurveyJob,
+ Batch,
+ BatchStatuses,
+ DownloaderJob,
+ DownloaderJobsToBatches,
+ ProcessorJob,
+ ProcessorJobsToBatches
+)
+from data_refinery_workers.processors import utils
+
+
+def init_batch():
+ survey_job = SurveyJob(
+ source_type="ARRAY_EXPRESS"
+ )
+ survey_job.save()
+
+ return Batch(
+ survey_job=survey_job,
+ source_type="ARRAY_EXPRESS",
+ size_in_bytes=0,
+ download_url="ftp://ftp.ebi.ac.uk/pub/databases/microarray/data/experiment/GEOD/E-GEOD-59071/E-GEOD-59071.raw.3.zip/GSM1426072_CD_colon_active_2.CEL", # noqa
+ raw_format="CEL",
+ processed_format="PCL",
+ pipeline_required="AFFY_TO_PCL",
+ platform_accession_code="A-AFFY-1",
+ experiment_accession_code="E-MTAB-3050",
+ experiment_title="It doesn't really matter.",
+ name="CE1234",
+ internal_location="A-AFFY-1/MICRO_ARRAY_TO_PCL/",
+ organism_id=9606,
+ organism_name="HOMO SAPIENS",
+ release_date="2017-05-05",
+ last_uploaded_date="2017-05-05",
+ status=BatchStatuses.DOWNLOADED.value
+ )
+
+
+class StartJobTestCase(TestCase):
+ def test_success(self):
+ batch = init_batch()
+ batch2 = copy.deepcopy(batch)
+ batch2.name = "CE2345"
+ batch.save()
+ batch2.save()
+
+ processor_job = ProcessorJob()
+ processor_job.save()
+ processor_job_to_batch = ProcessorJobsToBatches(batch=batch,
+ processor_job=processor_job)
+ processor_job_to_batch.save()
+ processor_job_to_batch2 = ProcessorJobsToBatches(batch=batch2,
+ processor_job=processor_job)
+ processor_job_to_batch2.save()
+
+ kwargs = utils.start_job({"job": processor_job})
+ # start_job preserves the "job" key
+ self.assertEqual(kwargs["job"], processor_job)
+
+ # start_job finds the batches and returns them
+ self.assertEqual(len(kwargs["batches"]), 2)
+
+ def test_failure(self):
+ """Fails because there are no batches for the job."""
+ processor_job = ProcessorJob()
+ processor_job.save()
+
+ kwargs = utils.start_job({"job": processor_job})
+ self.assertFalse(kwargs["success"])
+
+
+class EndJobTestCase(TestCase):
+ def test_success(self):
+ batch = init_batch()
+ batch2 = copy.deepcopy(batch)
+ batch2.name = "CE2345"
+ batch.save()
+ batch2.save()
+
+ processor_job = ProcessorJob()
+ processor_job.save()
+
+ utils.end_job({"job": processor_job,
+ "batches": [batch, batch2]})
+
+ processor_job.refresh_from_db()
+ self.assertTrue(processor_job.success)
+ self.assertIsNotNone(processor_job.end_time)
+
+ batches = Batch.objects.all()
+ for batch in batches:
+ self.assertEqual(batch.status, BatchStatuses.PROCESSED.value)
+
+ def test_failure(self):
+ batch = init_batch()
+ batch2 = copy.deepcopy(batch)
+ batch2.name = "CE2345"
+ batch.save()
+ batch2.save()
+
+ processor_job = ProcessorJob()
+ processor_job.save()
+
+ utils.end_job({"success": False,
+ "job": processor_job,
+ "batches": [batch, batch2]})
+
+ processor_job.refresh_from_db()
+ self.assertFalse(processor_job.success)
+ self.assertIsNotNone(processor_job.end_time)
+
+ batches = Batch.objects.all()
+ for batch in batches:
+ self.assertEqual(batch.status, BatchStatuses.DOWNLOADED.value)
+
+
+class RunPipelineTestCase(TestCase):
+ def test_no_job(self):
+ mock_processor = MagicMock()
+ utils.run_pipeline({"job_id": 100}, [mock_processor])
+ mock_processor.assert_not_called()
+
+ def test_processor_failure(self):
+ processor_job = ProcessorJob()
+ processor_job.save()
+ job_dict = {"job_id": processor_job.id,
+ "job": processor_job}
+
+ mock_processor = MagicMock()
+ mock_processor.__name__ = "Fake processor."
+ return_dict = copy.copy(job_dict)
+ return_dict["success"] = False
+ mock_processor.return_value = return_dict
+
+ utils.run_pipeline(job_dict, [mock_processor])
+ mock_processor.assert_called_once()
+ processor_job.refresh_from_db()
+ self.assertFalse(processor_job.success)
+ self.assertIsNotNone(processor_job.end_time)
+
+ def test_value_passing(self):
+ """The keys added to kwargs and returned by processors will be
+ passed through to other processors.
+ """
+ batch = init_batch()
+ batch.save()
+ processor_job = ProcessorJob()
+ processor_job.save()
+ processor_jobs_to_batches = ProcessorJobsToBatches(batch=batch,
+ processor_job=processor_job)
+ processor_jobs_to_batches.save()
+
+ mock_processor = MagicMock()
+ mock_dict = {"something_to_pass_along": True,
+ "job": processor_job,
+ "batches": [batch]}
+ mock_processor.return_value = mock_dict
+
+ def processor_function(kwargs):
+ self.assertTrue(kwargs["something_to_pass_along"])
+ return kwargs
+
+ test_processor = MagicMock(side_effect=processor_function)
+
+ utils.run_pipeline({"job_id": processor_job.id},
+ [utils.start_job,
+ mock_processor,
+ test_processor,
+ utils.end_job])
+
+ processor_job.refresh_from_db()
+ self.assertTrue(processor_job.success)
+ self.assertIsNotNone(processor_job.end_time)
+
+ batch.refresh_from_db()
+ self.assertEqual(batch.status, BatchStatuses.PROCESSED.value)
diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py
index be402223f..b803056f8 100644
--- a/workers/data_refinery_workers/processors/utils.py
+++ b/workers/data_refinery_workers/processors/utils.py
@@ -1,9 +1,6 @@
-import os
-import urllib
-import shutil
from django.utils import timezone
from typing import List, Dict, Callable
-from data_refinery_models.models import Batch, BatchStatuses, ProcessorJob
+from data_refinery_models.models import BatchStatuses, ProcessorJob, ProcessorJobsToBatches
# Import and set logger
import logging
@@ -15,27 +12,34 @@
def start_job(kwargs: Dict):
- """Record in the database that this job is being started and
- retrieves the job's batch from the database and
- adds it to the dictionary passed in with the key 'batch'."""
+ """A processor function to start jobs.
+
+ Record in the database that this job is being started and
+ retrieves the job's batches from the database and adds them to the
+ dictionary passed in with the key 'batches'.
+ """
job = kwargs["job"]
job.worker_id = "For now there's only one. For now..."
job.start_time = timezone.now()
job.save()
- try:
- batch = Batch.objects.get(id=job.batch_id)
- except Batch.DoesNotExist:
- logger.error("Cannot find batch record with ID %d.", job.batch_id)
+ batch_relations = ProcessorJobsToBatches.objects.filter(processor_job_id=job.id)
+ batches = [br.batch for br in batch_relations]
+
+ if len(batches) == 0:
+ logger.error("No batches found for job #%d.", job.id)
return {"success": False}
- kwargs["batch"] = batch
+ kwargs["batches"] = batches
return kwargs
def end_job(kwargs: Dict):
- """Record in the database that this job has completed and that
- the batch has been processed if successful."""
+ """A processor function to end jobs.
+
+ Record in the database that this job has completed and that
+ the batch has been processed if successful.
+ """
job = kwargs["job"]
if "success" in kwargs:
@@ -48,51 +52,34 @@ def end_job(kwargs: Dict):
job.save()
if job.success:
- batch = kwargs["batch"]
- batch.status = BatchStatuses.PROCESSED.value
- batch.save()
+ batches = kwargs["batches"]
+ for batch in batches:
+ batch.status = BatchStatuses.PROCESSED.value
+ batch.save()
# Every processor returns a dict, however end_job is always called
# last so it doesn't need to contain anything.
return {}
-def cleanup_temp_data(kwargs: Dict):
- """Removes data from raw/ and temp/ directories related to the batch."""
- batch = kwargs["batch"]
-
- path = urllib.parse.urlparse(batch.download_url).path
- raw_file_name = os.path.basename(path)
- raw_file_location = os.path.join(ROOT_URI,
- "raw",
- batch.internal_location,
- raw_file_name)
- temp_directory = os.path.join(ROOT_URI, "temp", batch.internal_location)
- os.remove(raw_file_location)
- shutil.rmtree(temp_directory)
-
- return kwargs
-
-
def run_pipeline(start_value: Dict, pipeline: List[Callable]):
"""Runs a pipeline of processor functions.
- start_value must contain a key 'job_id' which is a valid id
- for a ProcessorJob record.
+ start_value must contain a key 'job_id' which is a valid id for a
+ ProcessorJob record.
Each processor fuction must accept a dictionary and return a
dictionary.
- Any processor function which returns a dictionary
- containing a key of 'success' with a value of False will cause
- the pipeline to terminate with a call to utils.end_job.
+ Any processor function which returns a dictionary containing a key
+ of 'success' with a value of False will cause the pipeline to
+ terminate with a call to utils.end_job.
- The key 'job' is reserved for the ProcessorJob currently being run.
- The key 'batch' is reserved for the Batch that is currently being
- processed.
- It is required that the dictionary returned by each processor
- function preserve the mappings for 'job' and 'batch' that were
- passed into it.
+ The key 'job' is reserved for the ProcessorJob currently being
+ run. The key 'batches' is reserved for the Batches that are
+ currently being processed. It is required that the dictionary
+ returned by each processor function preserve the mappings for
+ 'job' and 'batches' that were passed into it.
"""
job_id = start_value["job_id"]
@@ -102,6 +89,10 @@ def run_pipeline(start_value: Dict, pipeline: List[Callable]):
logger.error("Cannot find processor job record with ID %d.", job_id)
return
+ if len(pipeline) == 0:
+ logger.error("Empty pipeline specified for job #%d.",
+ job_id)
+
last_result = start_value
last_result["job"] = job
for processor in pipeline:
diff --git a/workers/run_tests.sh b/workers/run_tests.sh
new file mode 100755
index 000000000..6691e1beb
--- /dev/null
+++ b/workers/run_tests.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+# Script for executing Django PyUnit tests within a Docker container.
+
+# This script should always run as if it were being called from
+# the directory it lives in.
+script_directory=`dirname "${BASH_SOURCE[0]}" | xargs realpath`
+cd $script_directory
+
+# However in order to give Docker access to all the code we have to
+# move up a level
+cd ..
+
+docker build -t dr_worker -f workers/Dockerfile.tests .
+
+HOST_IP=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
+
+docker run \
+ --add-host=database:$HOST_IP \
+ --env-file workers/environments/test \
+ -i dr_worker test --no-input "$@"
diff --git a/workers/tester.sh b/workers/tester.sh
index 69af5cfb4..f2f78abef 100755
--- a/workers/tester.sh
+++ b/workers/tester.sh
@@ -16,7 +16,7 @@ docker build -t test_master -f workers/Dockerfile .
HOST_IP=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
docker run \
- --link some-rabbit:rabbit \
+ --link message-queue:rabbit \
--add-host=database:$HOST_IP \
--env-file workers/environments/dev \
--entrypoint ./manage.py \
diff --git a/workers/worker.sh b/workers/worker.sh
index b628ff007..ee5e49937 100755
--- a/workers/worker.sh
+++ b/workers/worker.sh
@@ -23,7 +23,7 @@ docker build -t dr_worker -f workers/Dockerfile .
HOST_IP=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
docker run \
- --link some-rabbit:rabbit \
+ --link message-queue:rabbit \
--name worker1 \
--add-host=database:$HOST_IP \
--env-file workers/environments/dev \