Skip to content

Commit

Permalink
Add initial implementation of task distribution reimandlab#122
Browse files Browse the repository at this point in the history
  • Loading branch information
krassowski committed Sep 2, 2017
1 parent 7bbfae0 commit 2df65cf
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 10 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,29 @@ cd website/static/js_templates
```
And you are done. When `DEBUG = False`, precompiled templates will be loaded automatically.

### Task distribution


#### Cyclic, maintenance tasks

For cyclic tasks a CRON-like package [Advanced Python Scheduler](https://apscheduler.readthedocs.io/en/latest/) is used;
it is fully integrated with application code and no additional setup is required.

The jobs functions are defined in `jobs.py` file and scheduling information is stored in `config.py`, in `JOBS` variable.

#### User jobs

To manage and execute user provided mutation search [Celery Distributed Task Queue](http://docs.celeryproject.org/en/latest/index.html) is used,
with the broker and backend being RabbitMQ.
Both RabitMQ and Celery need to be run as services and set up properly, as described in [Celery](http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq).
On Debian-based machines RabitMQ may be installed as a service directly from repositories.

To run celery worker use following command:

```
celery -A celery_worker.celery worker
```

### Serving

#### Serving with Werkzeug
Expand Down
9 changes: 9 additions & 0 deletions setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,12 @@ sudo npm install -g autoprefixer postcss-cli nunjucks
# to be replaced with 'clean-css clean-css-cli' after a new release of webassets:
# currently integration fails for new versions but the fix seems to be already implemented on master branch
sudo npm install -g [email protected]

# install broker for celery (note: command is Debian/Ubuntu specific)
sudo apt-get install rabbitmq-server

# generate keys (for testing only!)
mkdir -p celery
cd celery
ssh-keygen -t rsa -f worker.key -N 'Password for testing only'
cd ..
9 changes: 9 additions & 0 deletions website/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
from database import bdb_refseq
from assets import bundles
from assets import DependencyManager
from flask_celery import Celery


login_manager = LoginManager()
mail = Mail()
recaptcha = ReCaptcha()
limiter = Limiter(key_func=get_remote_address)
scheduler = APScheduler()
celery = Celery()


def create_app(config_filename='config.py', config_override={}):
Expand Down Expand Up @@ -52,9 +54,16 @@ def create_app(config_filename='config.py', config_override={}):
limiter.init_app(app)

# Scheduler
if scheduler.running:
scheduler.shutdown()
scheduler.init_app(app)
scheduler.start()

# Celery
celery.init_app(app)
from celery.security import setup_security
setup_security()

#
# Error logging
#
Expand Down
5 changes: 5 additions & 0 deletions website/celery_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from app import celery
from app import create_app

app = create_app()
celery
13 changes: 13 additions & 0 deletions website/example_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,16 @@
'hours': 6
}
]


# - Celery (turned off by default as it requires Celery manual configuration)
USE_CELERY = False
CELERY_BROKER_URL = 'amqp://guest@localhost//'
CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'application/json']
CELERY_IGNORE_RESULT = False
CELERY_SECURITY_KEY = 'celery/worker.key'
CELERY_SECURITY_CERTIFICATE = 'celery/worker.key.pub'
CELERY_SECURITY_CERT_STORE = 'celery/*.pub'
31 changes: 31 additions & 0 deletions website/flask_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Adapted from flask.pocoo.org/docs/0.12/patterns/celery/ Flask documentation,
therefore Flask documentation, BSD licence conditions may apply."""
import celery


class Celery(celery.Celery):

def init_app(self, app):
self.__init__(app)

def __init__(self, app=None):
if app:
super().__init__(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
self.conf.update(app.config)

TaskBase = self.Task

class ContextTask(TaskBase):
abstract = True

def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)

self.Task = ContextTask
else:
super().__init__()
4 changes: 3 additions & 1 deletion website/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ jinja2_pluralize
yuicompressor
flask-recaptcha
flask-limiter
flask_apscheduler
flask_apscheduler
celery
amqp
25 changes: 25 additions & 0 deletions website/templates/search/progress.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{% extends 'base.html' %}


{% block head %}
{% if status in ['PENDING', 'PROGRESS'] %}
<meta http-equiv="refresh" content="5">
{% endif %}
{% endblock %}


{% block content %}
{% if status == 'PENDING' %}
<p>Your search is queued. Please wait. The page will refresh itself every 5 seconds.</p>
{% elif status == 'PROGRESS' %}
<p>Your search is running: <span id="progress-percent">{{ progress }}</span>%</p>
{% elif status == "FAILED" %}
<p>The search failed</p>
{% endif %}

<div class="progress">
<div class="progress-bar progress-bar-striped active" id="progress-bar" role="progressbar"
aria-valuenow="{{ progress }}" aria-valuemin="0" aria-valuemax="100" style="width:{{ progress }}%">
</div>
</div>
{% endblock %}
1 change: 1 addition & 0 deletions website/tests/views/view_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ViewTest(DatabaseTest):

RECAPTCHA_ENABLED = False
RATELIMIT_ENABLED = False
USE_CELERY = False

def view_module(self):
"""Required to be defined in subclasses in order to use flash assertions"""
Expand Down
72 changes: 63 additions & 9 deletions website/views/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from flask_login import current_user
from Levenshtein import distance
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.datastructures import FileStorage

from app import celery
from helpers.bioinf import complement
from models import Protein, Pathway, Cancer, GeneList, MC3Mutation, Disease, InheritedMutation, ClinicalData
from models import Gene
Expand Down Expand Up @@ -148,6 +150,15 @@ def __init__(self, vcf_file=None, text_query=None, filter_manager=None):
self.without_mutations = []
self.badly_formatted = []
self.hidden_results_cnt = 0
self._progress = 0
self._total = 0
if vcf_file:
if type(vcf_file) is FileStorage:
# as bad as it can be, but usually the vcf file will be a list of lines already
vcf_file = vcf_file.readlines()
self._total += len(vcf_file)
if text_query:
self._total += sum(1 for _ in text_query.splitlines())

if filter_manager:
def data_filter(elements):
Expand All @@ -172,8 +183,18 @@ def data_filter(elements):
# like filter_manager so any instance of this class can be pickled.
self.data_filter = None

def progress(self):
self._progress += 1
if celery.current_task:
celery.current_task.update_state(
state='PROGRESS',
meta={'progress': self._progress / self._total}
)

def add_mutation_items(self, items, query_line):

self.progress()

if not items:
self.without_mutations.append(query_line)
return False
Expand Down Expand Up @@ -292,6 +313,12 @@ def make_widgets(filter_manager):
}


@celery.task
def search_task(vcf_file, textarea_query, filter_manager):
m = MutationSearch(vcf_file, textarea_query, filter_manager)
return m


class SearchView(FlaskView):
"""Enables searching in any of registered database models."""

Expand Down Expand Up @@ -337,7 +364,6 @@ def user_mutations(self, uri):

response = make_response(template(
'search/dataset.html',
target='mutations',
results=dataset.data.results,
widgets=make_widgets(filter_manager),
without_mutations=dataset.data.without_mutations,
Expand Down Expand Up @@ -368,24 +394,45 @@ def remove_user_mutations(self, uri):
flash('Successfully removed <b>%s</b> dataset.' % name, category='success')
return redirect(url_for('ContentManagementSystem:my_datasets'))

def progress(self, task_id):

celery_task = celery.AsyncResult(task_id)
status = celery_task.status

if status == 'SUCCESS':
return redirect(url_for('SearchView:mutations', task_id=task_id))

progress = celery_task.result.get('progress', 0) if status == 'PROGRESS' else 0

return make_response(template(
'search/progress.html',
task=celery_task,
progress=int(progress * 100),
status=status
))

@route('/mutations', methods=['POST', 'GET'])
def mutations(self):
"""Render search form and results (if any) for proteins or mutations"""

task_id = request.args.get('task_id', None)
use_celery = current_app.config.get('USE_CELERY', False)
filter_manager = SearchViewFilters()

if request.method == 'POST':
textarea_query = request.form.get('mutations', False)
vcf_file = request.files.get('vcf-file', False)

mutation_search = MutationSearch(
vcf_file,
textarea_query,
filter_manager
)

store_on_server = request.form.get('store_on_server', False)

if use_celery:
mutation_search = search_task.delay(
# vcf_file is not serializable but list of lines is
vcf_file.readlines() if vcf_file else None,
textarea_query,
filter_manager
)
else:
mutation_search = MutationSearch(vcf_file, textarea_query, filter_manager)

if store_on_server:
name = request.form.get('dataset_name', None)
if not name:
Expand Down Expand Up @@ -422,6 +469,13 @@ def mutations(self):
'<a href="' + url + '">' + url + '</a></p>',
'success'
)

if use_celery:
return redirect(url_for('SearchView:progress', task_id=mutation_search.task_id))

elif task_id:
celery_task = celery.AsyncResult(task_id)
mutation_search = celery_task.result
else:
mutation_search = MutationSearch()

Expand Down

0 comments on commit 2df65cf

Please sign in to comment.