From e72acee2c42fdc311c06fb0828d0e4b238062eac Mon Sep 17 00:00:00 2001 From: Janez Kokosar Date: Fri, 5 Apr 2024 16:00:02 +0200 Subject: [PATCH] Rewrite Expression Aggregator process to Python --- docs/CHANGELOG.rst | 2 + .../expression/expression_aggregator.py | 292 ++++++++++++++++++ .../expression/expression_aggregator.yml | 90 ------ resolwe_bio/tests/files/exp_matrix.json.gz | Bin 93 -> 93 bytes .../processes/test_expression_aggregator.py | 32 +- resolwe_bio/tools/expression_aggregator.py | 203 ------------ resolwe_bio/utils/test.py | 17 +- 7 files changed, 329 insertions(+), 307 deletions(-) create mode 100644 resolwe_bio/processes/expression/expression_aggregator.py delete mode 100644 resolwe_bio/processes/expression/expression_aggregator.yml delete mode 100755 resolwe_bio/tools/expression_aggregator.py diff --git a/docs/CHANGELOG.rst b/docs/CHANGELOG.rst index b3c0a1334..2106467b8 100644 --- a/docs/CHANGELOG.rst +++ b/docs/CHANGELOG.rst @@ -16,6 +16,8 @@ Added Changed ------- +- Rewrite ``expression-aggregator`` process to Python and make it + compatible with the new annotation model Fixed ----- diff --git a/resolwe_bio/processes/expression/expression_aggregator.py b/resolwe_bio/processes/expression/expression_aggregator.py new file mode 100644 index 000000000..1a440a848 --- /dev/null +++ b/resolwe_bio/processes/expression/expression_aggregator.py @@ -0,0 +1,292 @@ +"""Collect expression data from samples grouped by sample annotation field.""" + +import csv +import gzip +import json +import math + +import numpy as np + +from resolwe.process import ( + DataField, + FileField, + JsonField, + ListField, + Process, + SchedulingClass, + StringField, +) + + +def load_expression(fn=None, sep="\t"): + """Read expressions from file.""" + with gzip.open(fn, "rt") as f: + reader = csv.DictReader(f, delimiter=sep) + return {row["Gene"]: float(row["Expression"]) for row in reader} + + +def get_indices(descriptors, descriptor): + """Return positions of the descriptor in the list.""" + return {i for i, x in enumerate(descriptors) if x == descriptor} + + +def get_values(expressions, descriptors, gene, descriptor): + """Return expressions of a gene with matching descriptor.""" + indices = get_indices(descriptors, descriptor) + return [ + expression[gene] + for i, expression in enumerate(expressions) + if i in indices and gene in expression + ] + + +def load_expressions(aggregator, expression_fns=[], sep="\t", descriptors=[]): + """Read expressions from files.""" + raw_expressions = [ + load_expression(expression_fn, sep) for expression_fn in expression_fns + ] + if aggregator: + raw_expressions.extend(aggregator["raw_expressions"]) + descriptors.extend(aggregator["descriptors"]) + genes = {key for raw_expression in raw_expressions for key in raw_expression.keys()} + grouped_expressions = { + gene: { + descriptor: get_values(raw_expressions, descriptors, gene, descriptor) + for descriptor in sorted(set(descriptors)) + if get_values(raw_expressions, descriptors, gene, descriptor) + } + for gene in genes + } + return raw_expressions, descriptors, grouped_expressions + + +def get_log_expressions(grouped_expressions): + """Get log(expression + 1) for all expressions.""" + log_expressions = { + gene: { + descriptor: [ + math.log(expression + 1.0, 2.0) + for expression in grouped_expressions[gene][descriptor] + ] + for descriptor in grouped_expressions[gene] + } + for gene in grouped_expressions + } + return log_expressions + + +def generate_statistic(expression, gene, attribute, expression_type): + """Get box plot statistic for expressions of a single gene and attribute.""" + min_val = min(expression) + max_val = max(expression) + median = np.percentile(expression, 50.0) + q1 = np.percentile(expression, 25.0) + q3 = np.percentile(expression, 75.0) + iqr = q3 - q1 + lowerwhisker = max(min_val, q1 - 1.5 * iqr) + upperwhisker = min(max_val, q3 + 1.5 * iqr) + data_count = len(expression) + return { + "attribute": attribute, + "gene": gene, + "exp_types": [expression_type], + "min": min_val, + "max": max_val, + "median": median, + "q1": q1, + "q3": q3, + "lowerwhisker": lowerwhisker, + "upperwhisker": upperwhisker, + "data_count": data_count, + } + + +def get_statistics(expressions, expression_type): + """Get box plot statistics for expressions of all genes and attributes.""" + return { + gene: [ + generate_statistic(exp, gene, descriptor, expression_type) + for descriptor, exp in expression.items() + ] + for gene, expression in expressions.items() + } + + +def output_json(statistics, fname=None, compressed=False): + """Write json to file.""" + open_file = gzip.open if compressed else open + with open_file(fname, "wt") as f: + json.dump(statistics, f) + + +def load_json(fname): + """Read json from file.""" + with gzip.open(fname, "rt") as f: + return json.load(f) + + +def check_aggregator(aggregator, source, expression_type, group_by): + """Check aggregator fields.""" + if aggregator["source"] != source: + raise ValueError( + "All expressions must be annotated by the same genome database (NCBI, UCSC, ENSEMBLE,...)." + ) + if aggregator["expression_type"] != expression_type: + raise ValueError("All expressions must be of the same type.") + if aggregator["group_by"] != group_by: + raise ValueError("Group by field must be the same.") + + +def get_expressions_out( + raw_expressions, descriptors, source, expression_type, group_by +): + """Return expressions output.""" + return { + "raw_expressions": raw_expressions, + "descriptors": descriptors, + "source": source, + "expression_type": expression_type, + "group_by": group_by, + } + + +class ExpressionAggregator(Process): + """Aggregate expression data based on sample metadata fields. + + The Expression aggregator process should not be run in Batch Mode, as this will create + redundant outputs. Rather, select multiple samples below for which you wish to aggregate the + expression matrix. + """ + + slug = "expression-aggregator" + name = "Expression aggregator" + process_type = "data:aggregator:expression" + version = "1.0.0" + category = "Quantify" + scheduling_class = SchedulingClass.BATCH + requirements = { + "expression-engine": "jinja", + "executor": { + "docker": {"image": "public.ecr.aws/genialis/resolwebio/rnaseq:6.0.0"} + }, + "resources": { + "cores": 2, + "memory": 16384, + "storage": 100, + }, + } + data_name = "Expression aggregator" + + class Input: + """Input fields to process ExpressionAggregator.""" + + exps = ListField( + DataField("expression"), + label="Expression data", + description="Select expression data to aggregate.", + ) + group_by = StringField( + label="Sample annotation field", + description="Select sample annotation field to group by.", + ) + expr_aggregator = DataField( + "aggregator:expression", label="Expression aggregator", required=False + ) + + class Output: + """Output fields to process ExpressionAggregator.""" + + exp_matrix = FileField(label="Expression aggregator") + box_plot = JsonField(label="Box plot") + log_box_plot = JsonField(label="Log box plot") + source = StringField(label="Gene ID source") + species = StringField(label="Species") + exp_type = StringField(label="Expression type") + + def run(self, inputs, outputs): + """Run the analysis.""" + + expression_type = inputs.exps[0].output.exp_type + species = inputs.exps[0].output.species + source = inputs.exps[0].output.source + + exp_matrix_output = "exp_matrix.json.gz" + box_plot_output = "box_plot.json" + log_box_plot_output = "log_box_plot.json" + + # sanity checks + for exp in inputs.exps: + if exp.output.source != source: + self.error( + "Input samples are of different Gene ID databases: " + f"{exp.output.source} and {source}." + ) + if exp.output.species != species: + self.error( + "Input samples are of different Species: " + f"{exp.output.species} and {species}." + ) + if exp.output.exp_type != inputs.exps[0].output.exp_type: + self.error( + "Input samples are of different expression type: " + f"{exp.output.exp_type} and {expression_type}." + ) + # check if the expression aggregator is given/valid + aggregator = None + if inputs.expr_aggregator: + if inputs.expr_aggregator.output.species != inputs.exps[0].output.species: + self.error( + "Cannot append expression data to the existing Expression Aggregator object that is of different species." + f"{inputs.expr_aggregator.output.species} is not the same as {species}." + ) + + aggregator = load_json(inputs.expr_aggregator.output.exp_matrix) + + check_aggregator( + aggregator, + source, + expression_type, + inputs.group_by, + ) + + # prepare sample annotation data + try: + descriptor_data = [ + e.entity.annotations[inputs.group_by] for e in inputs.exps + ] + if not all(descriptor_data): + self.error( + f"All samples must be annotated by the selected {inputs.group_by} field." + ) + except KeyError: + self.error( + f"Annotation field {inputs.group_by} is missing from one or more samples." + ) + + expression_files = [e.output.exp.path for e in inputs.exps] + raw_expressions, descriptors, expressions = load_expressions( + aggregator, expression_files, "\t", descriptor_data + ) + log_expressions = get_log_expressions(expressions) + + statistics = get_statistics(expressions, expression_type) + output_json(statistics, box_plot_output) + + log_statistics = get_statistics(log_expressions, expression_type) + output_json(log_statistics, log_box_plot_output) + + expressions_out = get_expressions_out( + raw_expressions, + descriptors, + source, + expression_type, + inputs.group_by, + ) + output_json(expressions_out, exp_matrix_output, True) + + outputs.exp_matrix = exp_matrix_output + outputs.box_plot = box_plot_output + outputs.log_box_plot = log_box_plot_output + outputs.exp_type = expression_type + outputs.source = source + outputs.species = species diff --git a/resolwe_bio/processes/expression/expression_aggregator.yml b/resolwe_bio/processes/expression/expression_aggregator.yml deleted file mode 100644 index df530eb2e..000000000 --- a/resolwe_bio/processes/expression/expression_aggregator.yml +++ /dev/null @@ -1,90 +0,0 @@ -- slug: expression-aggregator - name: Expression aggregator - requirements: - expression-engine: jinja - executor: - docker: - image: public.ecr.aws/genialis/resolwebio/rnaseq:6.0.0 - data_name: "Expression aggregator" - version: 0.5.1 - type: data:aggregator:expression - category: Quantify - persistence: CACHED - description: | - Collect expression data from samples grouped by sample descriptor field. - The Expression aggregator process should not be run in Batch Mode, as this will create - redundant outputs. Rather, select multiple samples below for which you wish to aggregate the - expression matrix. - input: - - name: exps - label: Expressions - type: list:data:expression - - name: group_by - label: Sample descriptor field - type: basic:string - - name: expr_aggregator - label: Expression aggregator - type: data:aggregator:expression - required: false - output: - - name: exp_matrix - label: Expression matrix - type: basic:file - - name: box_plot - label: Box plot - type: basic:json - - name: log_box_plot - label: Log box plot - type: basic:json - - name: source - label: Gene ID database - type: basic:string - - name: species - label: Species - type: basic:string - - name: exp_type - label: Expression type - type: basic:string - run: - runtime: polyglot - language: bash - program: | - {% for e in exps %} - {% if e.source != (exps|first).source %} - re-warning "Genes in all expression data must be annotated by the same genome database." - re-error "Expression {{ exps|first|name }} has {{ (exps|first).source }} gene IDs, while expression {{ e|name }} has {{ e.source }} gene IDs." - {% endif %} - {% if e.species != (exps|first).species %} - re-warning "All samples must be derived from the same species." - re-error "Sample {{ e|sample_name }} is {{ e.species }}, while {{ (exps|first)|name }} is a(n) {{ (exps|first).species }}." - {% endif %} - {% if e.exp_type != (exps|first).exp_type %} - re-warning "All expressions must be of the same type (TPM, FPKM,...)." - re-error "Sample {{ e|sample_name }} is of {{ e.exp_type }} expresssion type, while {{ (exps|first)|name }} is {{ (exps|first).exp_type }}." - {% endif %} - {% endfor %} - - {% if expr_aggregator %} - {% if expr_aggregator.species != (exps|first).species %} - re-warning "Cannot append expression data to the existing Expression Aggregator object that is of different species." - re-error "{{ expr_aggregator|name }} is {{ expr_aggregator.species }}, while {{ (exps|first)|name }} is a(n) {{ (exps|first).species }}." - {% endif %} - {% endif %} - - expression_aggregator.py \ - --expressions {% for e in exps %} {{e.exp.file}} {% endfor %} \ - --descriptors {% for e in exps %} {{e | sample | descriptor(group_by)}} {% endfor %} \ - --source {{(exps|first).source}} \ - --expression-type {{exps.0.exp_type}} \ - --group-by {{group_by}} \ - {% if expr_aggregator %} --aggregator {{expr_aggregator.exp_matrix.file}} {% endif %} \ - --box-plot-output box_plot.json \ - --log-box-plot-output log_box_plot.json \ - --expressions-output exp_matrix.json.gz - re-checkrc "Expression aggregator failed." - re-save-file exp_matrix exp_matrix.json.gz - re-save box_plot box_plot.json - re-save log_box_plot log_box_plot.json - re-save source {{(exps|first).source}} - re-save exp_type {{(exps|first).exp_type}} - re-save species {{(exps|first).species}} diff --git a/resolwe_bio/tests/files/exp_matrix.json.gz b/resolwe_bio/tests/files/exp_matrix.json.gz index 801ecbefb445eb24e5f915981f5ec6386d86a27a..f634c6cab9afda6c2855e05a6962637e056d9808 100644 GIT binary patch delta 43 wcma!z<&f{@;Lv0eNSnxEDHFwDsrpm3)AA$7!YBqH`2U}oL0eknv>lKQ0Pu?n`~Uy| delta 43 wcma!z<&f{@;Aj!L7Cn)}Qf3B|rRq=BPRoxR3!@l-;QxPShC@#q57+_O04xd)B>(^b diff --git a/resolwe_bio/tests/processes/test_expression_aggregator.py b/resolwe_bio/tests/processes/test_expression_aggregator.py index 03d875d67..19d32b3a1 100644 --- a/resolwe_bio/tests/processes/test_expression_aggregator.py +++ b/resolwe_bio/tests/processes/test_expression_aggregator.py @@ -1,6 +1,12 @@ from resolwe.flow.models import Data +from resolwe.flow.models.annotations import ( + AnnotationField, + AnnotationGroup, + AnnotationValue, +) from resolwe.test import tag_process, with_resolwe_host +from resolwe_bio.models import Sample from resolwe_bio.utils.test import KBBioProcessTestCase @@ -9,48 +15,60 @@ class ExpressionAggregatorTestCase(KBBioProcessTestCase): @tag_process("expression-aggregator") def test_expression_aggregator(self): with self.preparation_stage(): - descriptor_artery = {"general": {"organ": "artery"}} + ann_group = AnnotationGroup.objects.get(name="biospecimen_information") + ann_field = AnnotationField.objects.get(group=ann_group, name="organ") expression1 = self.prepare_expression( f_rc="exp_1_rc.tab.gz", f_exp="exp_1_tpm.tab.gz", f_type="TPM", - descriptor=descriptor_artery, species="Homo sapiens", build="ens_90", ) + entity = Sample.objects.get(data=expression1) + AnnotationValue.objects.create( + entity=entity, field=ann_field, value="artery" + ) - descriptor_blood = {"general": {"organ": "blood"}} expression2 = self.prepare_expression( f_rc="exp_2_rc.tab.gz", f_exp="exp_2_tpm.tab.gz", f_type="TPM", - descriptor=descriptor_blood, species="Homo sapiens", build="ens_90", ) + entity = Sample.objects.get(data=expression2) + AnnotationValue.objects.create( + entity=entity, field=ann_field, value="blood" + ) expression3 = self.prepare_expression( f_rc="exp_1_rc.tab.gz", f_exp="exp_1_tpm.tab.gz", f_type="TPM", - descriptor=descriptor_artery, species="Mus musculus", build="ens_90", ) + entity = Sample.objects.get(data=expression3) + AnnotationValue.objects.create( + entity=entity, field=ann_field, value="artery" + ) expression4 = self.prepare_expression( f_rc="exp_2_rc.tab.gz", f_exp="exp_2_tpm.tab.gz", f_type="TPM", - descriptor=descriptor_blood, species="Mus musculus", build="ens_90", ) + entity = Sample.objects.get(data=expression4) + AnnotationValue.objects.create( + entity=entity, field=ann_field, value="blood" + ) # Expect the process to fail if expression data from multiple species is used inputs = { "exps": [expression1.id, expression2.id, expression3.id], - "group_by": "general.organ", + "group_by": "biospecimen_information.organ", } expression_aggregator = self.run_process( "expression-aggregator", inputs, Data.STATUS_ERROR diff --git a/resolwe_bio/tools/expression_aggregator.py b/resolwe_bio/tools/expression_aggregator.py deleted file mode 100755 index 9c13a1d71..000000000 --- a/resolwe_bio/tools/expression_aggregator.py +++ /dev/null @@ -1,203 +0,0 @@ -#!/usr/bin/env python3 -"""Expression aggregator.""" - -import argparse -import csv -import gzip -import json -import math - -import numpy as np - - -def get_args(): - """Parse command-line arguments.""" - parser = argparse.ArgumentParser(description="Expression aggregator") - parser.add_argument( - "-e", "--expressions", nargs="+", help="Expressions", required=True - ) - parser.add_argument( - "-d", "--descriptors", nargs="+", help="Descriptors", required=True - ) - parser.add_argument("-s", "--source", help="Source", required=True) - parser.add_argument( - "-t", "--expression-type", help="Expression type", required=True - ) - parser.add_argument("-g", "--group-by", help="Group by", required=True) - parser.add_argument("-a", "--aggregator", help="Aggregator") - parser.add_argument("-b", "--box-plot-output", help="Box plot output file name") - parser.add_argument( - "-l", "--log-box-plot-output", help="Log box plot output file name" - ) - parser.add_argument( - "-x", "--expressions-output", help="Expressions output file name" - ) - return parser.parse_args() - - -def load_expression(fn=None, sep="\t"): - """Read expressions from file.""" - with gzip.open(fn, "rt") as f: - reader = csv.DictReader(f, delimiter=sep) - return {row["Gene"]: float(row["Expression"]) for row in reader} - - -def get_indices(descriptors, descriptor): - """Return positions of my_element in my_list.""" - return {i for i, x in enumerate(descriptors) if x == descriptor} - - -def get_values(expressions, descriptors, gene, descriptor): - """Return expressions of a gene with matching descriptor.""" - indices = get_indices(descriptors, descriptor) - return [ - expression[gene] - for i, expression in enumerate(expressions) - if i in indices and gene in expression - ] - - -def load_expressions(aggregator, expression_fns=[], sep="\t", descriptors=[]): - """Read expressions from files.""" - raw_expressions = [ - load_expression(expression_fn, sep) for expression_fn in expression_fns - ] - if aggregator: - raw_expressions.extend(aggregator["raw_expressions"]) - descriptors.extend(aggregator["descriptors"]) - genes = {key for raw_expression in raw_expressions for key in raw_expression.keys()} - grouped_expressions = { - gene: { - descriptor: get_values(raw_expressions, descriptors, gene, descriptor) - for descriptor in sorted(set(descriptors)) - if get_values(raw_expressions, descriptors, gene, descriptor) - } - for gene in genes - } - return raw_expressions, descriptors, grouped_expressions - - -def get_log_expressions(grouped_expressions): - """Get log(expression + 1) for all expressions.""" - log_expressions = { - gene: { - descriptor: [ - math.log(expression + 1.0, 2.0) - for expression in grouped_expressions[gene][descriptor] - ] - for descriptor in grouped_expressions[gene] - } - for gene in grouped_expressions - } - return log_expressions - - -def generate_statistic(expression, gene, attribute, expression_type): - """Get box plot statistic for expressions of a single gene and attribute.""" - min_val = min(expression) - max_val = max(expression) - median = np.percentile(expression, 50.0) - q1 = np.percentile(expression, 25.0) - q3 = np.percentile(expression, 75.0) - iqr = q3 - q1 - lowerwhisker = max(min_val, q1 - 1.5 * iqr) - upperwhisker = min(max_val, q3 + 1.5 * iqr) - data_count = len(expression) - return { - "attribute": attribute, - "gene": gene, - "exp_types": [expression_type], - "min": min_val, - "max": max_val, - "median": median, - "q1": q1, - "q3": q3, - "lowerwhisker": lowerwhisker, - "upperwhisker": upperwhisker, - "data_count": data_count, - } - - -def get_statistics(expressions, expression_type): - """Get box plot statistics for expressions of all genes and attributes.""" - return { - gene: [ - generate_statistic(exp, gene, descriptor, expression_type) - for descriptor, exp in expression.items() - ] - for gene, expression in expressions.items() - } - - -def output_json(statistics, fname=None, compressed=False): - """Write json to file.""" - if not compressed: - with open(fname, "w") as f: - json.dump(statistics, f) - else: - with gzip.open(fname, "wt") as f: - json.dump(statistics, f) - - -def load_json(fname): - """Read json from file.""" - with gzip.open(fname, "rt") as f: - return json.load(f) - - -def check_aggregator(aggregator, source, expression_type, group_by): - """Check aggregator fields.""" - if aggregator["source"] != source: - raise ValueError( - "All expressions must be annotated by the same genome database (NCBI, UCSC, ENSEMBLE,...)." - ) - if aggregator["expression_type"] != expression_type: - raise ValueError("All expressions must be of the same type.") - if aggregator["group_by"] != group_by: - raise ValueError("Group by field must be the same.") - - -def get_expressions_out( - raw_expressions, descriptors, source, expression_type, group_by -): - """Return expressions output.""" - return { - "raw_expressions": raw_expressions, - "descriptors": descriptors, - "source": source, - "expression_type": expression_type, - "group_by": group_by, - } - - -def main(): - """Compute expression statistics.""" - args = get_args() - aggregator = None - if args.aggregator: - aggregator = load_json(args.aggregator) - check_aggregator(aggregator, args.source, args.expression_type, args.group_by) - raw_expressions, descriptors, expressions = load_expressions( - aggregator, args.expressions, "\t", args.descriptors - ) - log_expressions = get_log_expressions(expressions) - - if args.box_plot_output: - statistics = get_statistics(expressions, args.expression_type) - output_json(statistics, args.box_plot_output) - if args.log_box_plot_output: - log_statistics = get_statistics(log_expressions, args.expression_type) - output_json(log_statistics, args.log_box_plot_output) - if args.expressions_output: - expressions_out = get_expressions_out( - raw_expressions, - descriptors, - args.source, - args.expression_type, - args.group_by, - ) - output_json(expressions_out, args.expressions_output, True) - - -if __name__ == "__main__": - main() diff --git a/resolwe_bio/utils/test.py b/resolwe_bio/utils/test.py index 242b07139..eff1317d6 100644 --- a/resolwe_bio/utils/test.py +++ b/resolwe_bio/utils/test.py @@ -15,8 +15,6 @@ ) from resolwe.test import ProcessTestCase -from resolwe_bio.models import Sample - TEST_FILES_DIR = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "tests", "files") ) @@ -93,6 +91,16 @@ def setUp(self): }, ) + biospecimen_group = AnnotationGroup.objects.create( + name="biospecimen_information", sort_order=2 + ) + AnnotationField.objects.create( + name="organ", + sort_order=1, + group=biospecimen_group, + type=AnnotationType.STRING.value, + ) + def prepare_reads(self, fn=["reads.fastq.gz"]): """Prepare NGS reads FASTQ.""" inputs = {"src": fn} @@ -154,7 +162,6 @@ def prepare_expression( f_type="TPM", name="Expression", source="DICTYBASE", - descriptor=None, feature_type="gene", species="Dictyostelium discoideum", build="dd-05-2009", @@ -171,10 +178,6 @@ def prepare_expression( "feature_type": feature_type, } expression = self.run_process("upload-expression", inputs) - if descriptor: - sample = Sample.objects.get(data=expression) - sample.descriptor = descriptor - sample.save() return expression