diff --git a/graphkb/constants.py b/graphkb/constants.py index 55f7e26..9f443bc 100644 --- a/graphkb/constants.py +++ b/graphkb/constants.py @@ -11,102 +11,102 @@ PREFERRED_GENE_SOURCE = "#39:5" # HGNC -BASE_RETURN_PROPERTIES = ['@rid', '@class'] +BASE_RETURN_PROPERTIES = ["@rid", "@class"] GENERIC_RETURN_PROPERTIES = [ - 'name', - 'sourceId', - 'sourceIdVersion', - 'source.name', - 'source.@rid', - 'displayName', - 'deprecated', + "name", + "sourceId", + "sourceIdVersion", + "source.name", + "source.@rid", + "displayName", + "deprecated", ] + BASE_RETURN_PROPERTIES -GENE_RETURN_PROPERTIES = ['biotype'] + GENERIC_RETURN_PROPERTIES +GENE_RETURN_PROPERTIES = ["biotype"] + GENERIC_RETURN_PROPERTIES VARIANT_RETURN_PROPERTIES = ( BASE_RETURN_PROPERTIES - + [f'type.{p}' for p in GENERIC_RETURN_PROPERTIES] - + [f'reference1.{p}' for p in GENE_RETURN_PROPERTIES] - + [f'reference2.{p}' for p in GENE_RETURN_PROPERTIES] - + ['zygosity', 'germline', 'displayName'] + + [f"type.{p}" for p in GENERIC_RETURN_PROPERTIES] + + [f"reference1.{p}" for p in GENE_RETURN_PROPERTIES] + + [f"reference2.{p}" for p in GENE_RETURN_PROPERTIES] + + ["zygosity", "germline", "displayName"] ) POS_VARIANT_RETURN_PROPERTIES = VARIANT_RETURN_PROPERTIES + [ - 'break1Start', - 'break1End', - 'break2Start', - 'break2End', - 'break1Repr', - 'break2Repr', - 'refSeq', - 'untemplatedSeq', - 'untemplatedSeqSize', - 'truncation', - 'assembly', + "break1Start", + "break1End", + "break2Start", + "break2End", + "break1Repr", + "break2Repr", + "refSeq", + "untemplatedSeq", + "untemplatedSeqSize", + "truncation", + "assembly", ] STATEMENT_RETURN_PROPERTIES = ( BASE_RETURN_PROPERTIES - + ['displayNameTemplate', 'sourceId', 'source.name', 'source.displayName'] - + [f'conditions.{p}' for p in GENERIC_RETURN_PROPERTIES] - + [f'subject.{p}' for p in GENERIC_RETURN_PROPERTIES] - + [f'evidence.{p}' for p in GENERIC_RETURN_PROPERTIES] - + [f'relevance.{p}' for p in GENERIC_RETURN_PROPERTIES] - + [f'evidenceLevel.{p}' for p in GENERIC_RETURN_PROPERTIES] - + ['reviewStatus'] + + ["displayNameTemplate", "sourceId", "source.name", "source.displayName"] + + [f"conditions.{p}" for p in GENERIC_RETURN_PROPERTIES] + + [f"subject.{p}" for p in GENERIC_RETURN_PROPERTIES] + + [f"evidence.{p}" for p in GENERIC_RETURN_PROPERTIES] + + [f"relevance.{p}" for p in GENERIC_RETURN_PROPERTIES] + + [f"evidenceLevel.{p}" for p in GENERIC_RETURN_PROPERTIES] + + ["reviewStatus"] ) -ONCOKB_SOURCE_NAME = 'oncokb' -ONCOGENE = 'oncogenic' -TUMOUR_SUPPRESSIVE = 'tumour suppressive' -FUSION_NAMES = ['structural variant', 'fusion'] +ONCOKB_SOURCE_NAME = "oncokb" +ONCOGENE = "oncogenic" +TUMOUR_SUPPRESSIVE = "tumour suppressive" +FUSION_NAMES = ["structural variant", "fusion"] PHARMACOGENOMIC_SOURCE_EXCLUDE_LIST = ["cancer genome interpreter", "civic"] -BASE_THERAPEUTIC_TERMS = ['therapeutic efficacy', 'eligibility'] +BASE_THERAPEUTIC_TERMS = ["therapeutic efficacy", "eligibility"] # the order here is the order these are applied, the first category matched is returned RELEVANCE_BASE_TERMS: CategoryBaseTermMapping = [ - ('therapeutic', BASE_THERAPEUTIC_TERMS), - ('diagnostic', ['diagnostic indicator']), - ('prognostic', ['prognostic indicator']), - ('pharmacogenomic', ['metabolism', 'toxicity', 'dosage']), - ('cancer predisposition', ['pathogenic']), - ('biological', ['functional effect', 'tumourigenesis', 'predisposing']), + ("therapeutic", BASE_THERAPEUTIC_TERMS), + ("diagnostic", ["diagnostic indicator"]), + ("prognostic", ["prognostic indicator"]), + ("pharmacogenomic", ["metabolism", "toxicity", "dosage"]), + ("cancer predisposition", ["pathogenic"]), + ("biological", ["functional effect", "tumourigenesis", "predisposing"]), ] -FAILED_REVIEW_STATUS = 'failed' +FAILED_REVIEW_STATUS = "failed" -CHROMOSOMES_HG38 = [f"chr{i}" for i in range(1, 23)] + ['chrX', 'chrY', 'chrM'] -CHROMOSOMES_HG19 = [str(i) for i in range(1, 23)] + ['x', 'y', 'mt'] +CHROMOSOMES_HG38 = [f"chr{i}" for i in range(1, 23)] + ["chrX", "chrY", "chrM"] +CHROMOSOMES_HG19 = [str(i) for i in range(1, 23)] + ["x", "y", "mt"] CHROMOSOMES = CHROMOSOMES_HG38 + CHROMOSOMES_HG19 -AMBIGUOUS_AA = ['x', '?', 'X'] +AMBIGUOUS_AA = ["x", "?", "X"] AA_3to1_MAPPING = { - 'Ala': 'A', - 'Arg': 'R', - 'Asn': 'N', - 'Asp': 'D', - 'Asx': 'B', - 'Cys': 'C', - 'Glu': 'E', - 'Gln': 'Q', - 'Glx': 'Z', - 'Gly': 'G', - 'His': 'H', - 'Ile': 'I', - 'Leu': 'L', - 'Lys': 'K', - 'Met': 'M', - 'Phe': 'F', - 'Pro': 'P', - 'Ser': 'S', - 'Thr': 'T', - 'Trp': 'W', - 'Tyr': 'Y', - 'Val': 'V', - 'Ter': '*', + "Ala": "A", + "Arg": "R", + "Asn": "N", + "Asp": "D", + "Asx": "B", + "Cys": "C", + "Glu": "E", + "Gln": "Q", + "Glx": "Z", + "Gly": "G", + "His": "H", + "Ile": "I", + "Leu": "L", + "Lys": "K", + "Met": "M", + "Phe": "F", + "Pro": "P", + "Ser": "S", + "Thr": "T", + "Trp": "W", + "Tyr": "Y", + "Val": "V", + "Ter": "*", } @@ -128,47 +128,89 @@ def __getitem__(self, key): INPUT_COPY_CATEGORIES = IterableNamespace( - AMP='amplification', - ANY_GAIN='copy gain', - ANY_LOSS='copy loss', - DEEP='deep deletion', - GAIN='low level copy gain', - LOSS='shallow deletion', + AMP="amplification", + ANY_GAIN="copy gain", + ANY_LOSS="copy loss", + DEEP="deep deletion", + GAIN="low level copy gain", + LOSS="shallow deletion", ) INPUT_EXPRESSION_CATEGORIES = IterableNamespace( - UP='increased expression', DOWN='reduced expression' + UP="increased expression", DOWN="reduced expression" ) # From: https://github.com/bcgsc/pori_graphkb_parser/blob/ae3738842a4c208ab30f58c08ae987594d632504/src/constants.ts#L33-L80 TYPES_TO_NOTATION: Dict[str, str] = { - 'acetylation': 'ac', - 'copy gain': 'copygain', - 'copy loss': 'copyloss', - 'deletion': 'del', - 'duplication': 'dup', - 'extension': 'ext', - 'frameshift': 'fs', - 'fusion': 'fusion', - 'indel': 'delins', - 'insertion': 'ins', - 'inversion': 'inv', - 'inverted translocation': 'itrans', - 'methylation': 'me', - 'missense mutation': 'mis', - 'mutation': 'mut', - 'nonsense mutation': '>', - 'phosphorylation': 'phos', - 'splice-site': 'spl', - 'substitution': '>', - 'translocation': 'trans', - 'truncating frameshift mutation': 'fs', - 'ubiquitination': 'ub', + "acetylation": "ac", + "copy gain": "copygain", + "copy loss": "copyloss", + "deletion": "del", + "duplication": "dup", + "extension": "ext", + "frameshift": "fs", + "fusion": "fusion", + "indel": "delins", + "insertion": "ins", + "inversion": "inv", + "inverted translocation": "itrans", + "methylation": "me", + "missense mutation": "mis", + "mutation": "mut", + "nonsense mutation": ">", + "phosphorylation": "phos", + "splice-site": "spl", + "substitution": ">", + "translocation": "trans", + "truncating frameshift mutation": "fs", + "ubiquitination": "ub", # deprecated forms and aliases - 'frameshift mutation': 'fs', - 'frameshift truncation': 'fs', - 'missense variant': 'mis', - 'truncating frameshift': 'fs', - 'missense': 'mis', - 'mutations': 'mut', - 'nonsense': '>', + "frameshift mutation": "fs", + "frameshift truncation": "fs", + "missense variant": "mis", + "truncating frameshift": "fs", + "missense": "mis", + "mutations": "mut", + "nonsense": ">", } + +# For match.type_screening() [KBDEV-1056] +DEFAULT_NON_STRUCTURAL_VARIANT_TYPE = 'mutation' +STRUCTURAL_VARIANT_SIZE_THRESHOLD = 48 # bp +STRUCTURAL_VARIANT_TYPES = [ + "structural variant", + "insertion", + "in-frame insertion", + "deletion", + "deletion polymorphism", + "in-frame deletion", + "translocation", + "inverted translocation", + "inversion", + "indel", + "fusion", + "out-of-frame fusion", + "oncogenic fusion", + "in-frame fusion", + "disruptive fusion", + "duplication", + "internal duplication", + "tandem duplication", + "internal tandem duplication", + "itd", + "domain duplication", + "kinase domain duplication", + "copy variant", + "copy number variation", + "copy number variant", + "copy loss", + "copy number loss", + "shallow deletion", + "deep deletion", + "gene deletion", + "copy gain", + "copy number gain", + "low level copy gain", + "amplification", + "focal amplification", + "rearrangement", +] diff --git a/graphkb/genes.py b/graphkb/genes.py index 9965b47..92279c2 100644 --- a/graphkb/genes.py +++ b/graphkb/genes.py @@ -6,12 +6,12 @@ BASE_THERAPEUTIC_TERMS, CHROMOSOMES, FAILED_REVIEW_STATUS, - GENERIC_RETURN_PROPERTIES, GENE_RETURN_PROPERTIES, ONCOGENE, ONCOKB_SOURCE_NAME, PHARMACOGENOMIC_SOURCE_EXCLUDE_LIST, PREFERRED_GENE_SOURCE, + RELEVANCE_BASE_TERMS, TUMOUR_SUPPRESSIVE, ) from .match import get_equivalent_features @@ -23,18 +23,18 @@ def _get_oncokb_gene_list( conn: GraphKBConnection, relevance: str, ignore_cache: bool = False ) -> List[Ontology]: - source = conn.get_source(ONCOKB_SOURCE_NAME)['@rid'] + source = conn.get_source(ONCOKB_SOURCE_NAME)["@rid"] statements = cast( List[Statement], conn.query( { - 'target': 'Statement', - 'filters': [ - {'source': source}, - {'relevance': {'target': 'Vocabulary', 'filters': {'name': relevance}}}, + "target": "Statement", + "filters": [ + {"source": source}, + {"relevance": {"target": "Vocabulary", "filters": {"name": relevance}}}, ], - 'returnProperties': [f'subject.{prop}' for prop in GENE_RETURN_PROPERTIES], + "returnProperties": [f"subject.{prop}" for prop in GENE_RETURN_PROPERTIES], }, ignore_cache=ignore_cache, ), @@ -42,9 +42,9 @@ def _get_oncokb_gene_list( genes: Dict[str, Ontology] = {} for statement in statements: - if statement['subject'].get('biotype', '') == 'gene': - record_id = statement['subject']['@rid'] - genes[record_id] = statement['subject'] + if statement["subject"].get("biotype", "") == "gene": + record_id = statement["subject"]["@rid"] + genes[record_id] = statement["subject"] return [gene for gene in genes.values()] @@ -78,34 +78,34 @@ def get_therapeutic_associated_genes(graphkb_conn: GraphKBConnection) -> List[On therapeutic_relevance = get_terms_set(graphkb_conn, BASE_THERAPEUTIC_TERMS) statements = graphkb_conn.query( { - 'target': 'Statement', - 'filters': {'relevance': sorted(list(therapeutic_relevance))}, - 'returnProperties': ['reviewStatus'] - + [f'conditions.{prop}' for prop in GENE_RETURN_PROPERTIES] + "target": "Statement", + "filters": {"relevance": sorted(list(therapeutic_relevance))}, + "returnProperties": ["reviewStatus"] + + [f"conditions.{prop}" for prop in GENE_RETURN_PROPERTIES] + [ - f'conditions.reference{ref}.{prop}' + f"conditions.reference{ref}.{prop}" for prop in GENE_RETURN_PROPERTIES - for ref in ('1', '2') + for ref in ("1", "2") ], } ) genes: List[Ontology] = [] for statement in statements: - if statement['reviewStatus'] == 'failed': + if statement["reviewStatus"] == "failed": continue - for condition in statement['conditions']: - if condition['@class'] == 'Feature': + for condition in statement["conditions"]: + if condition["@class"] == "Feature": genes.append(condition) - elif condition['@class'].endswith('Variant'): + elif condition["@class"].endswith("Variant"): cond = cast(Variant, condition) - if cond['reference1'] and cond['reference1']['@class'] == 'Feature': - genes.append(cond['reference1']) - if cond['reference2'] and cond['reference2']['@class'] == 'Feature': - genes.append(cond['reference2']) + if cond["reference1"] and cond["reference1"]["@class"] == "Feature": + genes.append(cond["reference1"]) + if cond["reference2"] and cond["reference2"]["@class"] == "Feature": + genes.append(cond["reference2"]) unique_genes: List[Ontology] = [] for gene in genes: - if not gene.get('deprecated', False): - if gene['@rid'] not in [g['@rid'] for g in unique_genes]: + if not gene.get("deprecated", False): + if gene["@rid"] not in [g["@rid"] for g in unique_genes]: unique_genes.append(gene) return unique_genes @@ -129,16 +129,16 @@ def get_genes_from_variant_types( filters: List[Dict[str, Any]] = [] if types: filters.append( - {'type': {'target': 'Vocabulary', 'filters': {'name': types, 'operator': 'IN'}}} + {"type": {"target": "Vocabulary", "filters": {"name": types, "operator": "IN"}}} ) variants = cast( List[Variant], conn.query( { - 'target': 'Variant', - 'filters': filters, - 'returnProperties': ['reference1', 'reference2'], + "target": "Variant", + "filters": filters, + "returnProperties": ["reference1", "reference2"], }, ignore_cache=ignore_cache, ), @@ -146,20 +146,20 @@ def get_genes_from_variant_types( genes = set() for variant in variants: - genes.add(variant['reference1']) - if variant['reference2']: - genes.add(variant['reference2']) + genes.add(variant["reference1"]) + if variant["reference2"]: + genes.add(variant["reference2"]) if not genes: return [] - filters: List[Dict[str, Any]] = [{'biotype': 'gene'}] + filters: List[Dict[str, Any]] = [{"biotype": "gene"}] if source_record_ids: - filters.append({'source': source_record_ids, 'operator': 'IN'}) + filters.append({"source": source_record_ids, "operator": "IN"}) result = cast( List[Ontology], conn.query( - {'target': list(genes), 'returnProperties': GENE_RETURN_PROPERTIES, 'filters': filters}, + {"target": list(genes), "returnProperties": GENE_RETURN_PROPERTIES, "filters": filters}, ignore_cache=ignore_cache, ), ) @@ -184,20 +184,20 @@ def get_preferred_gene_name( """ if gene_name in CHROMOSOMES: logger.error(f"{gene_name} assumed to be a chromosome, not gene") - return '' + return "" eq = get_equivalent_features(conn=conn, gene_name=gene_name) - genes = [m for m in eq if m.get('biotype') == 'gene' and not m.get('deprecated')] + genes = [m for m in eq if m.get("biotype") == "gene" and not m.get("deprecated")] if not genes: logger.error(f"No genes found for: {gene_name}") - return '' + return "" if source: - source_filtered_genes = [m for m in genes if m.get('source') == source] + source_filtered_genes = [m for m in genes if m.get("source") == source] if not source_filtered_genes: logger.error(f"No data from source {source} for {gene_name}") else: genes = source_filtered_genes - gene_names = [g['displayName'] for g in genes if g] + gene_names = [g["displayName"] for g in genes if g] if len(gene_names) > 1: logger.error( f"Multiple gene names found for: {gene_name} - using {gene_names[0]}, ignoring {gene_names[1:]}" @@ -227,20 +227,25 @@ def get_cancer_predisposition_info(conn: GraphKBConnection) -> Tuple[List[str], infer_genes = set() variants = {} - relevance_rids = list(get_terms_set(conn, "cancer predisposition")) + terms: dict = {term: lst for term, lst in RELEVANCE_BASE_TERMS} + relevance_rids = list(get_terms_set(conn, terms.get("cancer predisposition", []))) for record in conn.query( { "target": "Statement", - "filters": [ - { - "evidence": { - "target": "Source", - "filters": {"@rid": get_rid(conn, "Source", "CGL")}, + "filters": { + "AND": [ + { + "evidence": { + "target": "Source", + "filters": {"@rid": get_rid(conn, "Source", "CGL")}, + }, }, - "relevance": {"target": "Vocabulary", "filters": {"@rid": relevance_rids}}, - } - ], + { + "relevance": {"target": "Vocabulary", "filters": {"@rid": relevance_rids}}, + }, + ], + }, "returnProperties": [ "conditions.@class", "conditions.@rid", @@ -357,7 +362,7 @@ def get_pharmacogenomic_info(conn: GraphKBConnection) -> Tuple[List[str], Dict[s def convert_to_rid_set(records: Sequence[Dict]) -> Set[str]: - return {r['@rid'] for r in records} + return {r["@rid"] for r in records} def get_gene_information( @@ -382,46 +387,46 @@ def get_gene_information( 'name': 'TERT', 'oncogene': True}] """ - logger.info('fetching variant related genes list') + logger.info("fetching variant related genes list") # For query speed, only fetch the minimum needed details ret_props = [ - 'conditions.@rid', - 'conditions.@class', - 'conditions.reference1', - 'conditions.reference2', - 'reviewStatus', + "conditions.@rid", + "conditions.@class", + "conditions.reference1", + "conditions.reference2", + "reviewStatus", ] - body: Dict[str, Any] = {'target': 'Statement', 'returnProperties': ret_props} + body: Dict[str, Any] = {"target": "Statement", "returnProperties": ret_props} gene_names = sorted(set(gene_names)) statements = graphkb_conn.query(body) - statements = [s for s in statements if s.get('reviewStatus') != FAILED_REVIEW_STATUS] + statements = [s for s in statements if s.get("reviewStatus") != FAILED_REVIEW_STATUS] gene_flags: Dict[str, Set[str]] = { - 'cancerRelated': set(), - 'knownFusionPartner': set(), - 'knownSmallMutation': set(), + "cancerRelated": set(), + "knownFusionPartner": set(), + "knownSmallMutation": set(), } for statement in statements: - for condition in statement['conditions']: - if not condition.get('reference1'): + for condition in statement["conditions"]: + if not condition.get("reference1"): continue - gene_flags['cancerRelated'].add(condition['reference1']) - if condition['reference2']: - gene_flags['cancerRelated'].add(condition['reference2']) - gene_flags['knownFusionPartner'].add(condition['reference1']) - gene_flags['knownFusionPartner'].add(condition['reference2']) - elif condition['@class'] == 'PositionalVariant': - gene_flags['knownSmallMutation'].add(condition['reference1']) - - logger.info('fetching oncogenes list') - gene_flags['oncogene'] = convert_to_rid_set(get_oncokb_oncogenes(graphkb_conn)) - logger.info('fetching tumour supressors list') - gene_flags['tumourSuppressor'] = convert_to_rid_set(get_oncokb_tumour_supressors(graphkb_conn)) - - logger.info('fetching therapeutic associated genes lists') - gene_flags['therapeuticAssociated'] = convert_to_rid_set( + gene_flags["cancerRelated"].add(condition["reference1"]) + if condition["reference2"]: + gene_flags["cancerRelated"].add(condition["reference2"]) + gene_flags["knownFusionPartner"].add(condition["reference1"]) + gene_flags["knownFusionPartner"].add(condition["reference2"]) + elif condition["@class"] == "PositionalVariant": + gene_flags["knownSmallMutation"].add(condition["reference1"]) + + logger.info("fetching oncogenes list") + gene_flags["oncogene"] = convert_to_rid_set(get_oncokb_oncogenes(graphkb_conn)) + logger.info("fetching tumour supressors list") + gene_flags["tumourSuppressor"] = convert_to_rid_set(get_oncokb_tumour_supressors(graphkb_conn)) + + logger.info("fetching therapeutic associated genes lists") + gene_flags["therapeuticAssociated"] = convert_to_rid_set( get_therapeutic_associated_genes(graphkb_conn) ) @@ -429,7 +434,7 @@ def get_gene_information( result = [] for gene_name in gene_names: equivalent = convert_to_rid_set(get_equivalent_features(graphkb_conn, gene_name)) - row = {'name': gene_name} + row = {"name": gene_name} flagged = False for flag in gene_flags: # make smaller JSON to upload since all default to false already diff --git a/graphkb/match.py b/graphkb/match.py index 829f75a..3e4e0ab 100644 --- a/graphkb/match.py +++ b/graphkb/match.py @@ -6,9 +6,12 @@ from . import GraphKBConnection from .constants import ( AMBIGUOUS_AA, + DEFAULT_NON_STRUCTURAL_VARIANT_TYPE, INPUT_COPY_CATEGORIES, INPUT_EXPRESSION_CATEGORIES, POS_VARIANT_RETURN_PROPERTIES, + STRUCTURAL_VARIANT_SIZE_THRESHOLD, + STRUCTURAL_VARIANT_TYPES, VARIANT_RETURN_PROPERTIES, ) from .types import BasicPosition, Ontology, ParsedVariant, PositionalVariant, Record, Variant @@ -19,7 +22,7 @@ looks_like_rid, stringifyVariant, ) -from .vocab import get_equivalent_terms, get_term_tree +from .vocab import get_equivalent_terms, get_terms_set, get_term_tree FEATURES_CACHE: Set[str] = set() @@ -29,8 +32,8 @@ def get_equivalent_features( gene_name: str, ignore_cache: bool = False, is_source_id: bool = False, - source: str = '', - source_id_version: str = '', + source: str = "", + source_id_version: str = "", ) -> List[Ontology]: """Match an equivalent list of features given some input feature name (or ID). @@ -59,36 +62,36 @@ def get_equivalent_features( return cast( List[Ontology], conn.query( - {'target': [gene_name], 'queryType': 'similarTo'}, ignore_cache=ignore_cache + {"target": [gene_name], "queryType": "similarTo"}, ignore_cache=ignore_cache ), ) filters: List[Dict] = [] if source: - filters.append({'source': {'target': 'Source', 'filters': {'name': source}}}) + filters.append({"source": {"target": "Source", "filters": {"name": source}}}) - if gene_name.count('.') == 1 and gene_name.split('.')[-1].isnumeric(): + if gene_name.count(".") == 1 and gene_name.split(".")[-1].isnumeric(): # eg. ENSG00000133703.11 or NM_033360.4 logger.debug( f"Assuming {gene_name} has a .version_format - ignoring the version for equivalent features" ) - gene_name = gene_name.split('.')[0] + gene_name = gene_name.split(".")[0] if is_source_id or source_id_version: - filters.append({'sourceId': gene_name}) + filters.append({"sourceId": gene_name}) if source_id_version: filters.append( - {'OR': [{'sourceIdVersion': source_id_version}, {'sourceIdVersion': None}]} + {"OR": [{"sourceIdVersion": source_id_version}, {"sourceIdVersion": None}]} ) elif FEATURES_CACHE and gene_name.lower() not in FEATURES_CACHE and not ignore_cache: return [] else: - filters.append({'OR': [{'sourceId': gene_name}, {'name': gene_name}]}) + filters.append({"OR": [{"sourceId": gene_name}, {"name": gene_name}]}) return cast( List[Ontology], conn.query( - {'target': {'target': 'Feature', 'filters': filters}, 'queryType': 'similarTo'}, + {"target": {"target": "Feature", "filters": filters}, "queryType": "similarTo"}, ignore_cache=ignore_cache, ), ) @@ -101,21 +104,21 @@ def cache_missing_features(conn: GraphKBConnection) -> None: """ genes = cast( List[Ontology], - conn.query({'target': 'Feature', 'returnProperties': ['name', 'sourceId'], 'neighbors': 0}), + conn.query({"target": "Feature", "returnProperties": ["name", "sourceId"], "neighbors": 0}), ) for gene in genes: - if gene['name']: - FEATURES_CACHE.add(gene['name'].lower()) - if gene['sourceId']: - FEATURES_CACHE.add(gene['sourceId'].lower()) + if gene["name"]: + FEATURES_CACHE.add(gene["name"].lower()) + if gene["sourceId"]: + FEATURES_CACHE.add(gene["sourceId"].lower()) def match_category_variant( conn: GraphKBConnection, gene_name: str, category: str, - root_exclude_term: str = '', - gene_source: str = '', + root_exclude_term: str = "", + gene_source: str = "", gene_is_source_id: bool = False, ignore_cache: bool = False, ) -> List[Variant]: @@ -147,7 +150,7 @@ def match_category_variant( if not features: raise FeatureNotFoundError( - f'unable to find the gene ({gene_name}) or any equivalent representations' + f"unable to find the gene ({gene_name}) or any equivalent representations" ) # get the list of terms that we should match @@ -156,24 +159,24 @@ def match_category_variant( ) if not terms: - raise ValueError(f'unable to find the term/category ({category}) or any equivalent') + raise ValueError(f"unable to find the term/category ({category}) or any equivalent") # find the variant list return cast( List[Variant], conn.query( { - 'target': { - 'target': 'CategoryVariant', - 'filters': [ - {'reference1': features, 'operator': 'IN'}, - {'type': terms, 'operator': 'IN'}, + "target": { + "target": "CategoryVariant", + "filters": [ + {"reference1": features, "operator": "IN"}, + {"type": terms, "operator": "IN"}, ], }, - 'queryType': 'similarTo', - 'edges': ['AliasOf', 'DeprecatedBy', 'CrossReferenceOf', 'GeneralizationOf'], - 'treeEdges': ['Infers'], - 'returnProperties': VARIANT_RETURN_PROPERTIES, + "queryType": "similarTo", + "edges": ["AliasOf", "DeprecatedBy", "CrossReferenceOf", "GeneralizationOf"], + "treeEdges": ["Infers"], + "returnProperties": VARIANT_RETURN_PROPERTIES, }, ignore_cache=ignore_cache, ), @@ -199,14 +202,14 @@ def match_copy_variant( List of variant records from GraphKB which match the input """ if category not in INPUT_COPY_CATEGORIES.values(): - raise ValueError(f'not a valid copy variant input category ({category})') + raise ValueError(f"not a valid copy variant input category ({category})") result = match_category_variant( - conn, gene_name, category, root_exclude_term='structural variant', **kwargs + conn, gene_name, category, root_exclude_term="structural variant", **kwargs ) if drop_homozygous: - return [row for row in result if row['zygosity'] != 'homozygous'] + return [row for row in result if row["zygosity"] != "homozygous"] return result @@ -214,10 +217,10 @@ def match_expression_variant( conn: GraphKBConnection, gene_name: str, category: str, **kwargs ) -> List[Variant]: if category not in INPUT_EXPRESSION_CATEGORIES.values(): - raise ValueError(f'not a valid expression variant input category ({category})') + raise ValueError(f"not a valid expression variant input category ({category})") return match_category_variant( - conn, gene_name, category, root_exclude_term='biological', **kwargs + conn, gene_name, category, root_exclude_term="biological", **kwargs ) @@ -241,19 +244,19 @@ def positions_overlap( Returns: bool: True if the positions overlap """ - if pos_record.get('@class', '') == 'CytobandPosition': + if pos_record.get("@class", "") == "CytobandPosition": raise NotImplementedError( - 'Position comparison for cytoband coordinates is not yet implemented' + "Position comparison for cytoband coordinates is not yet implemented" ) - pos = pos_record.get('pos', None) + pos = pos_record.get("pos", None) if pos is None: return True - start = range_start.get('pos', None) + start = range_start.get("pos", None) if range_end: - end = range_end.get('pos', None) + end = range_end.get("pos", None) if start is not None and pos < start: return False @@ -298,81 +301,172 @@ def compare_positional_variants( # For break1, check if positions are overlaping between the variant and the reference. # Continue only if True. if not positions_overlap( - cast(BasicPosition, variant['break1Start']), - cast(BasicPosition, reference_variant['break1Start']), + cast(BasicPosition, variant["break1Start"]), + cast(BasicPosition, reference_variant["break1Start"]), None - if 'break1End' not in reference_variant - else cast(BasicPosition, reference_variant['break1End']), + if "break1End" not in reference_variant + else cast(BasicPosition, reference_variant["break1End"]), ): return False # For break2, check if positions are overlaping between the variant and the reference. # Continue only if True or no break2. # TODO: check for variant without break2 but reference_variant with one. - if variant.get('break2Start'): - if not reference_variant.get('break2Start'): + if variant.get("break2Start"): + if not reference_variant.get("break2Start"): return False if not positions_overlap( - cast(BasicPosition, variant['break2Start']), - cast(BasicPosition, reference_variant['break2Start']), + cast(BasicPosition, variant["break2Start"]), + cast(BasicPosition, reference_variant["break2Start"]), None - if 'break2End' not in reference_variant - else cast(BasicPosition, reference_variant['break2End']), + if "break2End" not in reference_variant + else cast(BasicPosition, reference_variant["break2End"]), ): return False # If both variants have untemplated sequence, # check for size and content. if ( - variant.get('untemplatedSeq', None) is not None - and reference_variant.get('untemplatedSeq', None) is not None + variant.get("untemplatedSeq", None) is not None + and reference_variant.get("untemplatedSeq", None) is not None ): if ( - variant.get('untemplatedSeqSize', None) is not None - and reference_variant.get('untemplatedSeqSize', None) is not None + variant.get("untemplatedSeqSize", None) is not None + and reference_variant.get("untemplatedSeqSize", None) is not None ): - if variant['untemplatedSeqSize'] != reference_variant['untemplatedSeqSize']: + if variant["untemplatedSeqSize"] != reference_variant["untemplatedSeqSize"]: return False if ( - reference_variant['untemplatedSeq'] is not None - and variant['untemplatedSeq'] is not None + reference_variant["untemplatedSeq"] is not None + and variant["untemplatedSeq"] is not None ): if ( - reference_variant['untemplatedSeq'] not in AMBIGUOUS_AA - and variant['untemplatedSeq'] not in AMBIGUOUS_AA + reference_variant["untemplatedSeq"] not in AMBIGUOUS_AA + and variant["untemplatedSeq"] not in AMBIGUOUS_AA ): - if reference_variant['untemplatedSeq'].lower() != variant['untemplatedSeq'].lower(): + if reference_variant["untemplatedSeq"].lower() != variant["untemplatedSeq"].lower(): return False - elif len(variant['untemplatedSeq']) != len(reference_variant['untemplatedSeq']): + elif len(variant["untemplatedSeq"]) != len(reference_variant["untemplatedSeq"]): return False # If both variants have a reference sequence, # check if they are the same. if ( - variant.get('refSeq', None) is not None - and reference_variant.get('refSeq', None) is not None + variant.get("refSeq", None) is not None + and reference_variant.get("refSeq", None) is not None ): if ( - reference_variant['refSeq'] not in AMBIGUOUS_AA - and variant['refSeq'] not in AMBIGUOUS_AA + reference_variant["refSeq"] not in AMBIGUOUS_AA + and variant["refSeq"] not in AMBIGUOUS_AA ): - if reference_variant['refSeq'].lower() != variant['refSeq'].lower(): # type: ignore + if reference_variant["refSeq"].lower() != variant["refSeq"].lower(): # type: ignore return False - elif len(variant['refSeq']) != len(reference_variant['refSeq']): # type: ignore + elif len(variant["refSeq"]) != len(reference_variant["refSeq"]): # type: ignore return False return True +def type_screening( + conn: GraphKBConnection, + parsed: ParsedVariant, + updateStructuralTypes=False, +) -> str: + """ + [KBDEV-1056] + Given a parsed variant notation, ensure that for some structural variant, type + (e.g. duplication, deletion, insertion, indel, copy number, inversion, etc.) + is only returned when the length of the variation meets a threshold, + otherwise 'mutation' is returned as default. + + Args: + conn (GraphKBConnection): the graphkb connection object + parsed (ParsedVariant): the variant notation parsed as a dictionary by the API + updateStructuralTypes (boolean): if True the API is queried for an updated list + of terms, otherwise an hard-coded list is used + + Returns: + A string describing the variation type + + Example: + # structural variant type returned as 'mutation' IF length < threshold (50) + type_screening(conn, { + 'type': 'deletion', + 'break1Start': {'pos': 1}, + 'break2Start': {'pos': 5}, + }) -> 'mutation' + + Example: + # structural variant type returned as-is IF length >= threshold (50) + type_screening(conn, { + 'type': 'deletion', + 'break1Start': {'pos': 1}, + 'break2Start': {'pos': 50}, + }) -> 'deletion' + + Example: + # fusion & translocation always returned as-is + type_screening(conn, {'type': 'fusion'}) -> 'fusion' + + Example: + # non structural always returned as-is + type_screening(conn, {'type': 'substitution'}) -> 'substitution' + """ + default_type = DEFAULT_NON_STRUCTURAL_VARIANT_TYPE + structuralVariantTypes = STRUCTURAL_VARIANT_TYPES + threshold = STRUCTURAL_VARIANT_SIZE_THRESHOLD + + # Will use either hardcoded type list or an updated list from the API + if updateStructuralTypes: + rids = list(get_terms_set(conn, ['structural variant'])) + records = conn.get_records_by_id(rids) + structuralVariantTypes = [el['name'] for el in records] + + # Unambiguous non-structural variation type + if parsed['type'] not in structuralVariantTypes: + return parsed['type'] + + # Unambiguous structural variation type + if parsed['type'] in ['fusion', 'translocation']: + return parsed['type'] + if parsed.get('reference2', None): + return parsed['type'] + prefix = parsed.get('prefix', 'g') + if prefix == 'y': # Assuming all variations using cytoband coordiantes meet the size threshold + return parsed['type'] + + # When size cannot be determined: exonic and intronic coordinates + # e.g. "MET:e.14del" meaning "Any deletion occuring at the 14th exon" + if prefix in ['e', 'i']: # Assuming they don't meet the size threshold + return default_type + + # When size is given + if parsed.get('untemplatedSeqSize', 0) >= threshold: + return parsed['type'] + + # When size needs to be computed from positions + pos_start = parsed.get('break1Start', {}).get('pos', 1) + pos_end = parsed.get('break2Start', {}).get('pos', pos_start) + pos_size = 1 + if prefix == 'p': + pos_size = 3 + if ((pos_end - pos_start) + 1) * pos_size >= threshold: + return parsed['type'] + + # Default + return default_type + + def match_positional_variant( conn: GraphKBConnection, variant_string: str, reference1: Optional[str] = None, reference2: Optional[str] = None, gene_is_source_id: bool = False, - gene_source: str = '', + gene_source: str = "", ignore_cache: bool = False, + updateStructuralTypes: bool = False, ) -> List[Variant]: """ Given the HGVS+ representation of some positional variant, parse it and match it to @@ -383,7 +477,10 @@ def match_positional_variant( reference1: Explicitly specify the first reference link record (gene1) reference2: Explicitly specify the second reference link record (gene2) gene_source: The source database the gene is defined by (ex. ensembl) - gene_is_source_id: Indicates the gene name(s) input should be treated as sourceIds not names + gene_is_source_id: Indicates the gene name(s) input should be treated + as sourceIds not names + updateStructuralTypes: Whether or not updating the structural variant list + with an API call, or use the hard-coded one Raises: NotImplementedError: thrown for uncertain position input (ranges) @@ -414,21 +511,21 @@ def match_positional_variant( # parse the representation parsed = conn.parse(variant_string, not (reference1 or reference2)) - if 'break1End' in parsed or 'break2End' in parsed: # uncertain position + if "break1End" in parsed or "break2End" in parsed: # uncertain position raise NotImplementedError( - f'Matching does not support uncertain positions ({variant_string}) as input' + f"Matching does not support uncertain positions ({variant_string}) as input" ) if reference2 and not reference1: - raise ValueError('cannot specify reference2 without reference1') + raise ValueError("cannot specify reference2 without reference1") # disambiguate the gene name if reference1: gene1 = reference1 - if 'reference1' in parsed: + if "reference1" in parsed: raise ValueError( - 'Cannot specify reference1 explicitly as well as in the variant notation' + "Cannot specify reference1 explicitly as well as in the variant notation" ) else: - gene1 = parsed['reference1'] + gene1 = parsed["reference1"] gene1_features = get_equivalent_features( conn, gene1, source=gene_source, is_source_id=gene_is_source_id, ignore_cache=ignore_cache @@ -437,7 +534,7 @@ def match_positional_variant( if not features: raise FeatureNotFoundError( - f'unable to find the gene ({gene1}) or any equivalent representations' + f"unable to find the gene ({gene1}) or any equivalent representations" ) secondary_features = None @@ -445,20 +542,20 @@ def match_positional_variant( gene2: Optional[str] = None if reference2: gene2 = reference2 - if 'reference2' in parsed: + if "reference2" in parsed: raise ValueError( - 'Cannot specify reference2 explicitly as well as in the variant notation' + "Cannot specify reference2 explicitly as well as in the variant notation" ) - elif 'reference1' in parsed: + elif "reference1" in parsed: raise ValueError( - 'variant notation cannot contain features when explicit features are given' + "variant notation cannot contain features when explicit features are given" ) elif ( - 'reference2' in parsed - and parsed.get('reference2', '?') != '?' - and parsed['reference2'] is not None + "reference2" in parsed + and parsed.get("reference2", "?") != "?" + and parsed["reference2"] is not None ): - gene2 = parsed['reference2'] + gene2 = parsed["reference2"] if gene2: gene2_features = get_equivalent_features( @@ -471,14 +568,14 @@ def match_positional_variant( secondary_features = convert_to_rid_list(gene2_features) if not secondary_features: raise FeatureNotFoundError( - f'unable to find the gene ({gene2}) or any equivalent representations' + f"unable to find the gene ({gene2}) or any equivalent representations" ) # match the existing mutations (positional) query_filters = [ - {'reference1': features}, - {'reference2': secondary_features}, - {'break1Start.@class': parsed['break1Start']['@class']}, + {"reference1": features}, + {"reference2": secondary_features}, + {"break1Start.@class": parsed["break1Start"]["@class"]}, ] filtered_similarOnly: List[Record] = [] # For post filter match use @@ -487,7 +584,7 @@ def match_positional_variant( for row in cast( List[Record], conn.query( - {'target': 'PositionalVariant', 'filters': query_filters}, ignore_cache=ignore_cache + {"target": "PositionalVariant", "filters": query_filters}, ignore_cache=ignore_cache ), ): # TODO: Check if variant and reference_variant should be interchanged @@ -508,21 +605,24 @@ def match_positional_variant( matches.extend( conn.query( { - 'target': convert_to_rid_list(filtered_similarOnly), - 'queryType': 'similarTo', - 'edges': ['AliasOf', 'DeprecatedBy', 'CrossReferenceOf', 'GeneralizationOf'], - 'treeEdges': ['Infers'], - 'returnProperties': POS_VARIANT_RETURN_PROPERTIES, + "target": convert_to_rid_list(filtered_similarOnly), + "queryType": "similarTo", + "edges": ["AliasOf", "DeprecatedBy", "CrossReferenceOf", "GeneralizationOf"], + "treeEdges": ["Infers"], + "returnProperties": POS_VARIANT_RETURN_PROPERTIES, }, ignore_cache=ignore_cache, ) ) + # screening type for discrepancies regarding structural variants + screened_type = type_screening(conn, parsed, updateStructuralTypes) + # disambiguate the variant type variant_types_details = get_equivalent_terms( conn, - parsed['type'], - root_exclude_term='mutation' if secondary_features else '', + screened_type, + root_exclude_term="mutation" if secondary_features else "", ignore_cache=ignore_cache, ) @@ -531,18 +631,18 @@ def match_positional_variant( matches.extend( conn.query( { - 'target': { - 'target': 'CategoryVariant', - 'filters': [ - {'reference1': features}, - {'type': types}, - {'reference2': secondary_features}, + "target": { + "target": "CategoryVariant", + "filters": [ + {"reference1": features}, + {"type": types}, + {"reference2": secondary_features}, ], }, - 'queryType': 'similarTo', - 'edges': ['AliasOf', 'DeprecatedBy', 'CrossReferenceOf'], - 'treeEdges': ['Infers'], - 'returnProperties': POS_VARIANT_RETURN_PROPERTIES, + "queryType": "similarTo", + "edges": ["AliasOf", "DeprecatedBy", "CrossReferenceOf"], + "treeEdges": ["Infers"], + "returnProperties": POS_VARIANT_RETURN_PROPERTIES, }, ignore_cache=ignore_cache, ) @@ -556,18 +656,18 @@ def cat_variant_query( matches.extend( conn.query( { - 'target': { - 'target': 'CategoryVariant', - 'filters': [ - {'reference1': cat_features}, - {'type': cat_types}, - {'reference2': cat_secondary_features}, + "target": { + "target": "CategoryVariant", + "filters": [ + {"reference1": cat_features}, + {"type": cat_types}, + {"reference2": cat_secondary_features}, ], }, - 'queryType': 'similarTo', - 'edges': ['AliasOf', 'DeprecatedBy', 'CrossReferenceOf'], - 'treeEdges': [], - 'returnProperties': VARIANT_RETURN_PROPERTIES, + "queryType": "similarTo", + "edges": ["AliasOf", "DeprecatedBy", "CrossReferenceOf"], + "treeEdges": [], + "returnProperties": VARIANT_RETURN_PROPERTIES, }, ignore_cache=ignore_cache, ) @@ -585,10 +685,10 @@ def cat_variant_query( matches.extend( conn.query( { - 'target': convert_to_rid_list(filtered_similarAndGeneric), - 'queryType': 'descendants', - 'edges': [], - 'returnProperties': POS_VARIANT_RETURN_PROPERTIES, + "target": convert_to_rid_list(filtered_similarAndGeneric), + "queryType": "descendants", + "edges": [], + "returnProperties": POS_VARIANT_RETURN_PROPERTIES, }, ignore_cache=ignore_cache, ) @@ -596,6 +696,6 @@ def cat_variant_query( result: Dict[str, Variant] = {} for row in matches: - result[row['@rid']] = cast(Variant, row) + result[row["@rid"]] = cast(Variant, row) return list(result.values()) diff --git a/graphkb/statement.py b/graphkb/statement.py index 97f80ff..c969e8f 100644 --- a/graphkb/statement.py +++ b/graphkb/statement.py @@ -19,7 +19,7 @@ def categorize_relevance( term_set = get_terms_set(graphkb_conn, base_terms) if relevance_rid in term_set: return category - return '' + return "" def get_statements_from_variants( @@ -37,11 +37,11 @@ def get_statements_from_variants( """ statements = graphkb_conn.query( { - 'target': 'Statement', - 'filters': {'conditions': convert_to_rid_list(variants), 'operator': 'CONTAINSANY'}, - 'returnProperties': STATEMENT_RETURN_PROPERTIES, + "target": "Statement", + "filters": {"conditions": convert_to_rid_list(variants), "operator": "CONTAINSANY"}, + "returnProperties": STATEMENT_RETURN_PROPERTIES, } ) if not failed_review: - statements = [s for s in statements if s.get('reviewStatus') != FAILED_REVIEW_STATUS] + statements = [s for s in statements if s.get("reviewStatus") != FAILED_REVIEW_STATUS] return [cast(Statement, s) for s in statements] diff --git a/graphkb/util.py b/graphkb/util.py index f4327fd..d544970 100644 --- a/graphkb/util.py +++ b/graphkb/util.py @@ -18,7 +18,7 @@ # name the logger after the package to make it simple to disable for packages using this one as a dependency # https://stackoverflow.com/questions/11029717/how-do-i-disable-log-messages-from-the-requests-library -logger = logging.getLogger('graphkb') +logger = logging.getLogger("graphkb") def convert_to_rid_list(records: Iterable[Record]) -> List[str]: @@ -28,7 +28,7 @@ def convert_to_rid_list(records: Iterable[Record]) -> List[str]: if isinstance(record, str): result.append(record) # assume an @rid string else: - result.append(record['@rid']) + result.append(record["@rid"]) return result @@ -38,7 +38,7 @@ class FeatureNotFoundError(Exception): def looks_like_rid(rid: str) -> bool: """Check if an input string looks like a GraphKB ID.""" - if re.match(r'^#-?\d+:-?\d+$', rid): + if re.match(r"^#-?\d+:-?\d+$", rid): return True return False @@ -47,15 +47,15 @@ def convert_aa_3to1(three_letter_notation: str) -> str: """Convert an Input string from 3 letter AA notation to 1 letter AA notation.""" result = [] - if ':' in three_letter_notation: + if ":" in three_letter_notation: # do not include the feature/gene in replacements - pos = three_letter_notation.index(':') + pos = three_letter_notation.index(":") result.append(three_letter_notation[: pos + 1]) three_letter_notation = three_letter_notation[pos + 1 :] last_match_end = 0 # exclusive interval [ ) - for match in re.finditer(r'[A-Z][a-z][a-z]', three_letter_notation): + for match in re.finditer(r"[A-Z][a-z][a-z]", three_letter_notation): # add the in-between string result.append(three_letter_notation[last_match_end : match.start()]) text = three_letter_notation[match.start() : match.end()] @@ -63,7 +63,7 @@ def convert_aa_3to1(three_letter_notation: str) -> str: last_match_end = match.end() result.append(three_letter_notation[last_match_end:]) - return ''.join(result) + return "".join(result) def join_url(base_url: str, *parts) -> str: @@ -71,9 +71,9 @@ def join_url(base_url: str, *parts) -> str: if not parts: return base_url - url = [base_url.rstrip('/')] + [part.strip('/') for part in parts] + url = [base_url.rstrip("/")] + [part.strip("/") for part in parts] - return '/'.join(url) + return "/".join(url) def millis_interval(start: datetime, end: datetime) -> int: @@ -88,7 +88,7 @@ def millis_interval(start: datetime, end: datetime) -> int: def cache_key(request_body) -> str: """Create a cache key for a query request to GraphKB.""" body = json.dumps(request_body, sort_keys=True) - hash_code = hashlib.md5(f'/query{body}'.encode('utf-8')).hexdigest() + hash_code = hashlib.md5(f"/query{body}".encode("utf-8")).hexdigest() return hash_code @@ -96,8 +96,8 @@ class GraphKBConnection: def __init__( self, url: str = DEFAULT_URL, - username: str = '', - password: str = '', + username: str = "", + password: str = "", use_global_cache: bool = True, ): self.http = requests.Session() @@ -110,11 +110,11 @@ def __init__( ) self.http.mount("https://", HTTPAdapter(max_retries=retries)) - self.token = '' + self.token = "" self.url = url self.username = username self.password = password - self.headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} + self.headers = {"Accept": "application/json", "Content-Type": "application/json"} self.cache: Dict[Any, Any] = {} if not use_global_cache else QUERY_CACHE self.request_count = 0 self.first_request: Optional[datetime] = None @@ -130,7 +130,7 @@ def load(self) -> Optional[float]: ) return None - def request(self, endpoint: str, method: str = 'GET', **kwargs) -> Dict: + def request(self, endpoint: str, method: str = "GET", **kwargs) -> Dict: """Request wrapper to handle adding common headers and logging. Args: @@ -148,7 +148,7 @@ def request(self, endpoint: str, method: str = 'GET', **kwargs) -> Dict: # don't want to use a read timeout if the request is not idempotent # otherwise you may wind up making unintended changes timeout = None - if endpoint in ['query', 'parse']: + if endpoint in ["query", "parse"]: timeout = (connect_timeout, read_timeout) start_time = datetime.now() @@ -172,21 +172,21 @@ def request(self, endpoint: str, method: str = 'GET', **kwargs) -> Dict: method, url, headers=self.headers, timeout=timeout, **kwargs ) if resp.status_code == 401 or resp.status_code == 403: - logger.debug(f'/{endpoint} - {resp.status_code} - retrying') + logger.debug(f"/{endpoint} - {resp.status_code} - retrying") # try to re-login if the token expired continue else: break except (requests.exceptions.ConnectionError, OSError) as err: if attempt < len(attempts) - 1: - logger.debug(f'/{endpoint} - {str(err)} - retrying') + logger.debug(f"/{endpoint} - {str(err)} - retrying") continue raise err except Exception as err2: raise err2 timing = millis_interval(start_time, datetime.now()) - logger.debug(f'/{endpoint} - {resp.status_code} - {timing} ms') # type: ignore + logger.debug(f"/{endpoint} - {resp.status_code} - {timing} ms") # type: ignore try: resp.raise_for_status() @@ -194,7 +194,7 @@ def request(self, endpoint: str, method: str = 'GET', **kwargs) -> Dict: # try to get more error details message = str(err) try: - message += ' ' + resp.json()['message'] + message += " " + resp.json()["message"] except Exception: pass @@ -204,7 +204,7 @@ def request(self, endpoint: str, method: str = 'GET', **kwargs) -> Dict: def post(self, uri: str, data: Dict = {}, **kwargs) -> Dict: """Convenience method for making post requests.""" - return self.request(uri, method='POST', data=json.dumps(data), **kwargs) + return self.request(uri, method="POST", data=json.dumps(data), **kwargs) def login(self, username: str, password: str) -> None: self.username = username @@ -220,24 +220,24 @@ def login(self, username: str, password: str) -> None: try: self.request_count += 1 resp = requests.request( - url=f'{self.url}/token', - method='POST', + url=f"{self.url}/token", + method="POST", headers=self.headers, timeout=(connect_timeout, read_timeout), - data=json.dumps({'username': username, 'password': password}), + data=json.dumps({"username": username, "password": password}), ) break except (requests.exceptions.ConnectionError, OSError) as err: if attempt < len(attempts) - 1: - logger.debug(f'/login - {str(err)} - retrying') + logger.debug(f"/login - {str(err)} - retrying") continue raise err except Exception as err2: raise err2 resp.raise_for_status() content = resp.json() - self.token = content['kbToken'] - self.headers['Authorization'] = self.token + self.token = content["kbToken"] + self.headers["Authorization"] = self.token def refresh_login(self) -> None: self.login(self.username, self.password) @@ -267,8 +267,8 @@ def query( return self.cache[hash_code] while True: - content = self.post('query', data={**request_body, 'limit': limit, 'skip': len(result)}) - records = content['result'] + content = self.post("query", data={**request_body, "limit": limit, "skip": len(result)}) + records = content["result"] result.extend(records) if len(records) < limit or not paginate: break @@ -279,17 +279,17 @@ def query( def parse(self, hgvs_string: str, requireFeatures: bool = False) -> ParsedVariant: content = self.post( - 'parse', data={'content': hgvs_string, 'requireFeatures': requireFeatures} + "parse", data={"content": hgvs_string, "requireFeatures": requireFeatures} ) - return cast(ParsedVariant, content['result']) + return cast(ParsedVariant, content["result"]) def get_records_by_id(self, record_ids: List[str]) -> List[Record]: if not record_ids: return [] - result = self.query({'target': record_ids}) + result = self.query({"target": record_ids}) if len(record_ids) != len(result): raise AssertionError( - f'The number of Ids given ({len(record_ids)}) does not match the number of records fetched ({len(result)})' + f"The number of Ids given ({len(record_ids)}) does not match the number of records fetched ({len(result)})" ) return result @@ -298,9 +298,9 @@ def get_record_by_id(self, record_id: str) -> Record: return result[0] def get_source(self, name: str) -> Record: - source = self.query({'target': 'Source', 'filters': {'name': name}}) + source = self.query({"target": "Source", "filters": {"name": name}}) if len(source) != 1: - raise AssertionError(f'Unable to unqiuely identify source with name {name}') + raise AssertionError(f"Unable to unqiuely identify source with name {name}") return source[0] @@ -330,13 +330,13 @@ def get_rid(conn: GraphKBConnection, target: str, name: str) -> str: def ontologyTermRepr(term: Union[OntologyTerm, str]) -> str: if type(term) is not str: - if getattr(term, 'displayName', None) and term.displayName != '': + if getattr(term, "displayName", None) and term.displayName != "": return term.displayName - if getattr(term, 'sourceId', None) and term.sourceId != '': + if getattr(term, "sourceId", None) and term.sourceId != "": return term.sourceId - if getattr(term, 'name', None) and term.name != '': + if getattr(term, "name", None) and term.name != "": return term.name - return '' + return "" return term @@ -368,7 +368,7 @@ def stripDisplayName(displayName: str, withRef: bool = True, withRefSeq: bool = match: object = re.search(r"^(.*\:)([a-z]\.)(.*)$", displayName) if match and not withRefSeq: - ref: str = match.group(1) if match.group(1) != ':' else '' + ref: str = match.group(1) if match.group(1) != ":" else "" prefix: str = match.group(2) rest: str = match.group(3) new_matches: Union[bool, object] = True @@ -407,18 +407,18 @@ def stringifyVariant( str: The string representation """ - displayName: str = variant.get('displayName', '') + displayName: str = variant.get("displayName", "") # If variant is a PositionalVariant (i.e. variant with a displayName) and # we already have the appropriate string representation, # then return it right away - if displayName != '' and (withRef and withRefSeq): + if displayName != "" and (withRef and withRefSeq): return displayName # If variant is a PositionalVariant (i.e. variant with a displayName) and # we DO NOT have the appropriate string representation, # then strip unwanted features, then return it right away - if displayName != '': + if displayName != "": return stripDisplayName(displayName, withRef, withRefSeq) # If variant is a ParsedVariant (i.e. variant without a displayName yet), @@ -429,28 +429,28 @@ def stringifyVariant( result: List[str] = [] # Extracting parsed values into individual variables - break1Repr: str = parsed.get('break1Repr', '') - break2Repr: str = parsed.get('break2Repr', '') - multiFeature: bool = parsed.get('multiFeature', False) - noFeatures: bool = parsed.get('noFeatures', False) - notationType: str = parsed.get('notationType', '') - reference1: str = parsed.get('reference1', '') - reference2: str = parsed.get('reference2', '') - refSeq: str = parsed.get('refSeq', '') - truncation: int = parsed.get('truncation', None) - type: str = parsed.get('type', '') - untemplatedSeq: str = parsed.get('untemplatedSeq', '') - untemplatedSeqSize: int = parsed.get('untemplatedSeqSize', None) + break1Repr: str = parsed.get("break1Repr", "") + break2Repr: str = parsed.get("break2Repr", "") + multiFeature: bool = parsed.get("multiFeature", False) + noFeatures: bool = parsed.get("noFeatures", False) + notationType: str = parsed.get("notationType", "") + reference1: str = parsed.get("reference1", "") + reference2: str = parsed.get("reference2", "") + refSeq: str = parsed.get("refSeq", "") + truncation: int = parsed.get("truncation", None) + type: str = parsed.get("type", "") + untemplatedSeq: str = parsed.get("untemplatedSeq", "") + untemplatedSeqSize: int = parsed.get("untemplatedSeqSize", None) # formating notationType - if notationType == '': + if notationType == "": variantType = ontologyTermRepr(type) - notationType = TYPES_TO_NOTATION.get(variantType, '') - if notationType == '': + notationType = TYPES_TO_NOTATION.get(variantType, "") + if notationType == "": notationType = re.sub(r"\s", "-", variantType) # If multiFeature - if multiFeature or (reference2 != '' and reference1 != reference2): + if multiFeature or (reference2 != "" and reference1 != reference2): if withRef and not noFeatures: result.append(f"({reference1}:{reference2})") result.append(notationType) @@ -464,11 +464,11 @@ def stringifyVariant( result.append( f"({break1Repr_noParentheses_noRefSeq},{break2Repr_noParentheses_noRefSeq})" ) - if untemplatedSeq != '': + if untemplatedSeq != "": result.append(untemplatedSeq) elif untemplatedSeqSize: result.append(str(untemplatedSeqSize)) - return ''.join(result) + return "".join(result) # Continuous notation... @@ -479,22 +479,22 @@ def stringifyVariant( # BreakRep if withRefSeq: result.append(break1Repr) - if break2Repr != '': + if break2Repr != "": result.append(f"_{break2Repr[2:]}") else: result.append(stripRefSeq(break1Repr)) - if break2Repr != '': + if break2Repr != "": result.append(f"_{stripRefSeq(break2Repr)[2:]}") # refSeq, truncation, notationType, untemplatedSeq, untemplatedSeqSize - if any(i in notationType for i in ['ext', 'fs']) or ( - notationType == '>' and break1Repr.startswith('p.') + if any(i in notationType for i in ["ext", "fs"]) or ( + notationType == ">" and break1Repr.startswith("p.") ): result.append(untemplatedSeq) - if notationType == 'mis' and break1Repr.startswith('p.'): + if notationType == "mis" and break1Repr.startswith("p."): result.append(untemplatedSeq) - elif notationType != '>': - if notationType == 'delins': + elif notationType != ">": + if notationType == "delins": if withRefSeq: result.append(f"del{refSeq}ins") else: @@ -506,22 +506,22 @@ def stringifyVariant( result.append(truncation) else: result.append(f"*{truncation}") - if any(i in notationType for i in ['dup', 'del', 'inv']): + if any(i in notationType for i in ["dup", "del", "inv"]): if withRefSeq: result.append(refSeq) - if any(i in notationType for i in ['ins', 'delins']): - if untemplatedSeq != '': + if any(i in notationType for i in ["ins", "delins"]): + if untemplatedSeq != "": result.append(untemplatedSeq) elif untemplatedSeqSize: result.append(str(untemplatedSeqSize)) - elif not break1Repr.startswith('p.'): + elif not break1Repr.startswith("p."): if withRefSeq: - refSeq = refSeq if refSeq != '' else '?' + refSeq = refSeq if refSeq != "" else "?" else: - refSeq = '' - untemplatedSeq = untemplatedSeq if untemplatedSeq != '' else '?' + refSeq = "" + untemplatedSeq = untemplatedSeq if untemplatedSeq != "" else "?" result.append(f"{refSeq}{notationType}{untemplatedSeq}") # TODO: Deal with more complexes cases like 'MED12:p.(?34_?68)mut' - return ''.join(result) + return "".join(result) diff --git a/graphkb/vocab.py b/graphkb/vocab.py index c12e690..51446db 100644 --- a/graphkb/vocab.py +++ b/graphkb/vocab.py @@ -6,14 +6,14 @@ def query_by_name(ontology_class: str, base_term_name: str) -> Dict: - return {'target': ontology_class, 'filters': {'name': base_term_name}} + return {"target": ontology_class, "filters": {"name": base_term_name}} def get_equivalent_terms( conn: GraphKBConnection, base_term_name: str, - root_exclude_term: str = '', - ontology_class: str = 'Vocabulary', + root_exclude_term: str = "", + ontology_class: str = "Vocabulary", ignore_cache: bool = False, build_base_query: Callable = query_by_name, ) -> List[Ontology]: @@ -31,10 +31,10 @@ def get_equivalent_terms( List[Ontology], conn.query( { - 'target': {'target': base_records, 'queryType': 'descendants'}, - 'queryType': 'similarTo', - 'treeEdges': [], - 'returnProperties': ['sourceId', 'sourceIdVersion', 'deprecated', 'name', '@rid'], + "target": {"target": base_records, "queryType": "descendants"}, + "queryType": "similarTo", + "treeEdges": [], + "returnProperties": ["sourceId", "sourceIdVersion", "deprecated", "name", "@rid"], }, ignore_cache=ignore_cache, ), @@ -50,30 +50,30 @@ def get_equivalent_terms( convert_to_rid_list( conn.query( { - 'target': {'target': root_records, 'queryType': 'descendants'}, - 'queryType': 'similarTo', - 'treeEdges': [], - 'returnProperties': [ - 'sourceId', - 'sourceIdVersion', - 'deprecated', - 'name', - '@rid', + "target": {"target": root_records, "queryType": "descendants"}, + "queryType": "similarTo", + "treeEdges": [], + "returnProperties": [ + "sourceId", + "sourceIdVersion", + "deprecated", + "name", + "@rid", ], }, ignore_cache=ignore_cache, ) ) ) - return [term for term in base_term_parents if term['@rid'] not in exclude] + return [term for term in base_term_parents if term["@rid"] not in exclude] return base_term_parents def get_term_tree( conn: GraphKBConnection, base_term_name: str, - root_exclude_term: str = '', - ontology_class: str = 'Vocabulary', + root_exclude_term: str = "", + ontology_class: str = "Vocabulary", include_superclasses: bool = True, ignore_cache: bool = False, build_base_query: Callable = query_by_name, @@ -101,10 +101,10 @@ def get_term_tree( List[Ontology], conn.query( { - 'target': {'target': base_records, 'queryType': 'ancestors'}, - 'queryType': 'similarTo', - 'treeEdges': [], - 'returnProperties': ['sourceId', 'sourceIdVersion', 'deprecated', 'name', '@rid'], + "target": {"target": base_records, "queryType": "ancestors"}, + "queryType": "similarTo", + "treeEdges": [], + "returnProperties": ["sourceId", "sourceIdVersion", "deprecated", "name", "@rid"], }, ignore_cache=ignore_cache, ), @@ -125,7 +125,7 @@ def get_term_tree( terms = {} # merge the two lists for term in child_terms + parent_terms: - terms[term['@rid']] = term + terms[term["@rid"]] = term return list(terms.values()) @@ -133,7 +133,7 @@ def get_term_tree( def get_term_by_name( conn: GraphKBConnection, name: str, - ontology_class: str = 'Vocabulary', + ontology_class: str = "Vocabulary", ignore_cache: bool = False, **kwargs, ) -> Ontology: @@ -155,15 +155,15 @@ def get_term_by_name( """ result = conn.query( { - 'target': ontology_class, - 'filters': {'name': name}, - 'returnProperties': [ - 'sourceId', - 'sourceIdVersion', - 'deprecated', - 'name', - '@rid', - '@class', + "target": ontology_class, + "filters": {"name": name}, + "returnProperties": [ + "sourceId", + "sourceIdVersion", + "deprecated", + "name", + "@rid", + "@class", ], }, ignore_cache=ignore_cache, @@ -171,7 +171,7 @@ def get_term_by_name( ) if len(result) != 1: - raise AssertionError(f'unable to find term ({name}) by name') + raise AssertionError(f"unable to find term ({name}) by name") return cast(Ontology, result[0]) diff --git a/setup.cfg b/setup.cfg index c623b52..c332460 100644 --- a/setup.cfg +++ b/setup.cfg @@ -10,7 +10,7 @@ include_trailing_comma = true [metadata] name = graphkb url = https://github.com/bcgsc/pori_graphkb_python -version = 1.11.0 +version = 1.12.0 author_email = graphkb@bcgsc.ca description = python adapter for interacting with the GraphKB API long_description = file: README.md @@ -39,7 +39,7 @@ dev = markdown_refdocs mkdocs-material mkdocs-redirects - black==19.10b0 + black flake8 flake8-annotations isort diff --git a/tests/data.py b/tests/data.py new file mode 100644 index 0000000..3c24fe0 --- /dev/null +++ b/tests/data.py @@ -0,0 +1,213 @@ +"""_summary_ + matches: + Array of variants (diplayName and type) that MUST be matching, but not restricted to + does_not_matches: + Array of variants (diplayName and type) that MUST NOT be matching, but not restricted to +""" + + +# Screening structural variant to rule out small events [KBDEV_1056] +structuralVariants = { + # Unambiguous structural variations + "(FGFR3,BRCA2):fusion(g.1234567,g.1234567)": { + "matches": { + "displayName": [ + "FGFR3 fusion", + "FGFR3 rearrangement", + ], + "type": [ + "fusion", + "rearrangement", + ], + }, + }, + # ambiguous structural variations -> structural + "FGFR3:c.1200_1300dup": { + "matches": { + "displayName": [ + "FGFR3 mutation", + "FGFR3 rearrangement", + ], + "type": [ + "mutation", + "rearrangement", + ], + }, + }, + "FGFR3:c.1200_1201insACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGT": { + "matches": { + "displayName": [ + "FGFR3 mutation", + "FGFR3 rearrangement", + ], + "type": [ + "mutation", + "rearrangement", + ], + }, + }, + "FGFR3:g.5000_5100del": { + "matches": { + "displayName": [ + "FGFR3 mutation", + "FGFR3 rearrangement", + ], + "type": [ + "mutation", + "rearrangement", + ], + }, + }, + "FGFR3:c.1200_1300delinsA": { + "matches": { + "displayName": [ + "FGFR3 mutation", + "FGFR3 rearrangement", + ], + "type": [ + "mutation", + "rearrangement", + ], + }, + }, + "FGFR3:c.1200delinsACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGT": { + "matches": { + "displayName": [ + "FGFR3 mutation", + "FGFR3 rearrangement", + ], + "type": [ + "mutation", + "rearrangement", + ], + }, + }, + # ambiguous structural variations -> non-structural + "FGFR3:c.1200dup": { + "matches": { + "displayName": [ + "FGFR3 mutation", + ], + "type": [ + "mutation", + ], + }, + "does_not_matches": { + "displayName": [ + "FGFR3 rearrangement", + ], + "type": [ + "rearrangement", + ], + }, + }, + "FGFR3:c.1200_1201insA": { + "matches": { + "displayName": [ + "FGFR3 mutation", + ], + "type": [ + "mutation", + ], + }, + "does_not_matches": { + "displayName": [ + "FGFR3 rearrangement", + ], + "type": [ + "rearrangement", + ], + }, + }, + "FGFR3:g.5000del": { + "matches": { + "displayName": [ + "FGFR3 mutation", + ], + "type": [ + "mutation", + ], + }, + "does_not_matches": { + "displayName": [ + "FGFR3 rearrangement", + ], + "type": [ + "rearrangement", + ], + }, + }, + "FGFR3:c.1200delinsA": { + "matches": { + "displayName": [ + "FGFR3 mutation", + ], + "type": [ + "mutation", + ], + }, + "does_not_matches": { + "displayName": [ + "FGFR3 rearrangement", + ], + "type": [ + "rearrangement", + ], + }, + }, + "STK11:e.1_100del": { + "matches": { + "displayName": [ + "STK11 mutation", + ], + "type": [ + "mutation", + ], + }, + "does_not_matches": { + "displayName": [ + "STK11 deletion", + ], + "type": [ + "deletion", + ], + }, + }, + "STK11:i.1_100del": { + "matches": { + "displayName": [ + "STK11 mutation", + ], + "type": [ + "mutation", + ], + }, + "does_not_matches": { + "displayName": [ + "STK11 deletion", + ], + "type": [ + "deletion", + ], + }, + }, + # non-structural variations + "FGFR3:c.1200C>A": { + "matches": { + "displayName": [ + "FGFR3 mutation", + ], + "type": [ + "mutation", + ], + }, + "does_not_matches": { + "displayName": [ + "FGFR3 rearrangement", + ], + "type": [ + "rearrangement", + ], + }, + }, +} diff --git a/tests/test_genes.py b/tests/test_genes.py index 12e1e9d..ef88d14 100644 --- a/tests/test_genes.py +++ b/tests/test_genes.py @@ -8,8 +8,8 @@ from graphkb import GraphKBConnection from graphkb.genes import ( get_cancer_predisposition_info, - get_genes_from_variant_types, get_gene_information, + get_genes_from_variant_types, get_oncokb_oncogenes, get_oncokb_tumour_supressors, get_pharmacogenomic_info, @@ -18,96 +18,96 @@ ) from graphkb.util import get_rid -EXCLUDE_INTEGRATION_TESTS = os.environ.get('EXCLUDE_INTEGRATION_TESTS') == '1' +EXCLUDE_INTEGRATION_TESTS = os.environ.get("EXCLUDE_INTEGRATION_TESTS") == "1" -CANONICAL_ONCOGENES = ['kras', 'nras', 'alk'] -CANONICAL_TS = ['cdkn2a', 'tp53'] -CANONICAL_FUSION_GENES = ['alk', 'ewsr1', 'fli1'] -CANONICAL_STRUCTURAL_VARIANT_GENES = ['brca1', 'dpyd', 'pten'] -CANNONICAL_THERAPY_GENES = ['erbb2', 'brca2', 'egfr'] +CANONICAL_ONCOGENES = ["kras", "nras", "alk"] +CANONICAL_TS = ["cdkn2a", "tp53"] +CANONICAL_FUSION_GENES = ["alk", "ewsr1", "fli1"] +CANONICAL_STRUCTURAL_VARIANT_GENES = ["brca1", "dpyd", "pten"] +CANNONICAL_THERAPY_GENES = ["erbb2", "brca2", "egfr"] PHARMACOGENOMIC_INITIAL_GENES = [ - 'ACYP2', - 'CEP72', + "ACYP2", + "CEP72", # 'CYP26B1', # defined as hgvsGenomic chr2:g.233760235_233760235nc_000002.12:g.233760235ta[7]>ta[8] - 'DPYD', - 'NUDT15', - 'RARG', - 'SLC28A3', - 'TPMT', - 'UGT1A6', + "DPYD", + "NUDT15", + "RARG", + "SLC28A3", + "TPMT", + "UGT1A6", ] CANCER_PREDISP_INITIAL_GENES = [ - 'AKT1', - 'APC', - 'ATM', - 'AXIN2', - 'BAP1', - 'BLM', - 'BMPR1A', - 'BRCA1', - 'BRCA2', - 'BRIP1', - 'CBL', - 'CDH1', - 'CDK4', - 'CDKN2A', - 'CHEK2', - 'DICER1', - 'EGFR', - 'EPCAM', - 'ETV6', - 'EZH2', - 'FH', - 'FLCN', - 'GATA2', - 'HRAS', - 'KIT', - 'MEN1', - 'MET', - 'MLH1', - 'MSH2', - 'MSH6', - 'MUTYH', - 'NBN', - 'NF1', - 'PALB2', - 'PDGFRA', - 'PMS2', - 'PTCH1', - 'PTEN', - 'PTPN11', - 'RAD51C', - 'RAD51D', - 'RB1', - 'RET', - 'RUNX1', - 'SDHA', - 'SDHB', - 'SDHC', - 'SDHD', - 'SMAD4', - 'SMARCA4', - 'STK11', - 'TP53', - 'TSC1', - 'TSC2', - 'VHL', - 'WT1', + "AKT1", + "APC", + "ATM", + "AXIN2", + "BAP1", + "BLM", + "BMPR1A", + "BRCA1", + "BRCA2", + "BRIP1", + "CBL", + "CDH1", + "CDK4", + "CDKN2A", + "CHEK2", + "DICER1", + "EGFR", + "EPCAM", + "ETV6", + "EZH2", + "FH", + "FLCN", + "GATA2", + "HRAS", + "KIT", + "MEN1", + "MET", + "MLH1", + "MSH2", + "MSH6", + "MUTYH", + "NBN", + "NF1", + "PALB2", + "PDGFRA", + "PMS2", + "PTCH1", + "PTEN", + "PTPN11", + "RAD51C", + "RAD51D", + "RB1", + "RET", + "RUNX1", + "SDHA", + "SDHB", + "SDHC", + "SDHD", + "SMAD4", + "SMARCA4", + "STK11", + "TP53", + "TSC1", + "TSC2", + "VHL", + "WT1", ] -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def conn(): conn = GraphKBConnection() - conn.login(os.environ['GRAPHKB_USER'], os.environ['GRAPHKB_PASS']) + conn.login(os.environ["GRAPHKB_USER"], os.environ["GRAPHKB_PASS"]) return conn def test_oncogene(conn): result = get_oncokb_oncogenes(conn) - names = {row['name'] for row in result} + names = {row["name"] for row in result} for gene in CANONICAL_ONCOGENES: assert gene in names for gene in CANONICAL_TS: @@ -116,7 +116,7 @@ def test_oncogene(conn): def test_tumour_supressors(conn): result = get_oncokb_tumour_supressors(conn) - names = {row['name'] for row in result} + names = {row["name"] for row in result} for gene in CANONICAL_TS: assert gene in names for gene in CANONICAL_ONCOGENES: @@ -142,28 +142,28 @@ def test_get_cancer_predisposition_info(conn): @pytest.mark.parametrize( - 'alt_rep', ('NM_033360.4', 'NM_033360', 'ENSG00000133703.11', 'ENSG00000133703') + "alt_rep", ("NM_033360.4", "NM_033360", "ENSG00000133703.11", "ENSG00000133703") ) def test_get_preferred_gene_name_kras(alt_rep, conn): gene_name = get_preferred_gene_name(conn, alt_rep) assert ( - 'KRAS' == gene_name + "KRAS" == gene_name ), f"Expected KRAS as preferred gene name for {alt_rep}, not '{gene_name}'" @pytest.mark.skipif(EXCLUDE_INTEGRATION_TESTS, reason="excluding long running integration tests") def test_find_genes_by_variant_type_structural_variant(conn): - result = get_genes_from_variant_types(conn, ['structural variant']) - names = {row['name'] for row in result} + result = get_genes_from_variant_types(conn, ["structural variant"]) + names = {row["name"] for row in result} for gene in CANONICAL_STRUCTURAL_VARIANT_GENES: assert gene in names, f"{gene} was not identified as a structural variant gene." @pytest.mark.skipif(EXCLUDE_INTEGRATION_TESTS, reason="excluding long running integration tests") def test_find_no_genes_by_variant_type_with_nonmatching_source_record_id(conn): - refseq_id = get_rid(conn, target='source', name='refseq') + refseq_id = get_rid(conn, target="source", name="refseq") result = get_genes_from_variant_types( - conn, ['structural variant'], source_record_ids=[refseq_id] + conn, ["structural variant"], source_record_ids=[refseq_id] ) assert not result @@ -171,11 +171,11 @@ def test_find_no_genes_by_variant_type_with_nonmatching_source_record_id(conn): @pytest.mark.skipif(EXCLUDE_INTEGRATION_TESTS, reason="excluding long running integration tests") def test_get_therapeutic_associated_genes(conn): gene_list = get_therapeutic_associated_genes(graphkb_conn=conn) - assert gene_list, 'No get_therapeutic_associated_genes found' + assert gene_list, "No get_therapeutic_associated_genes found" assert ( len(gene_list) > 500 - ), f'Expected over 500 get_therapeutic_associated_genes but found {len(gene_list)}' - names = {row['name'] for row in gene_list} + ), f"Expected over 500 get_therapeutic_associated_genes but found {len(gene_list)}" + names = {row["name"] for row in gene_list} for gene in CANNONICAL_THERAPY_GENES + CANONICAL_ONCOGENES + CANONICAL_TS: assert gene in names, f"{gene} not found by get_therapeutic_associated_genes" @@ -189,35 +189,35 @@ def test_get_gene_information(conn): + CANONICAL_FUSION_GENES + CANONICAL_STRUCTURAL_VARIANT_GENES + CANNONICAL_THERAPY_GENES - + ['notagenename'], + + ["notagenename"], ) assert gene_info - nongene_flagged = [g['name'] for g in gene_info if g['name'] == 'notagenename'] + nongene_flagged = [g["name"] for g in gene_info if g["name"] == "notagenename"] assert not nongene_flagged, f"Improper gene category: {nongene_flagged}" for gene in CANONICAL_ONCOGENES: assert gene in [ - g['name'] for g in gene_info if g.get('oncogene') + g["name"] for g in gene_info if g.get("oncogene") ], f"Missed oncogene {gene}" for gene in CANONICAL_TS: assert gene in [ - g['name'] for g in gene_info if g.get('tumourSuppressor') + g["name"] for g in gene_info if g.get("tumourSuppressor") ], f"Missed 'tumourSuppressor' {gene}" for gene in CANONICAL_FUSION_GENES: assert gene in [ - g['name'] for g in gene_info if g.get('knownFusionPartner') + g["name"] for g in gene_info if g.get("knownFusionPartner") ], f"Missed knownFusionPartner {gene}" for gene in CANONICAL_STRUCTURAL_VARIANT_GENES: assert gene in [ - g['name'] for g in gene_info if g.get('knownSmallMutation') + g["name"] for g in gene_info if g.get("knownSmallMutation") ], f"Missed knownSmallMutation {gene}" for gene in CANNONICAL_THERAPY_GENES: assert gene in [ - g['name'] for g in gene_info if g.get('therapeuticAssociated') + g["name"] for g in gene_info if g.get("therapeuticAssociated") ], f"Missed therapeuticAssociated {gene}" for gene in ( @@ -228,5 +228,5 @@ def test_get_gene_information(conn): + CANNONICAL_THERAPY_GENES ): assert gene in [ - g['name'] for g in gene_info if g.get('cancerRelated') + g["name"] for g in gene_info if g.get("cancerRelated") ], f"Missed cancerRelated {gene}" diff --git a/tests/test_graphkb.py b/tests/test_graphkb.py index b2d9c0a..88983f5 100644 --- a/tests/test_graphkb.py +++ b/tests/test_graphkb.py @@ -8,26 +8,26 @@ def test_login_ok(): conn = GraphKBConnection() - conn.login(os.environ['GRAPHKB_USER'], os.environ['GRAPHKB_PASS']) + conn.login(os.environ["GRAPHKB_USER"], os.environ["GRAPHKB_PASS"]) assert conn.token is not None -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def conn(): conn = GraphKBConnection() - conn.login(os.environ['GRAPHKB_USER'], os.environ['GRAPHKB_PASS']) + conn.login(os.environ["GRAPHKB_USER"], os.environ["GRAPHKB_PASS"]) return conn class TestPaginate: - @mock.patch('graphkb.GraphKBConnection.request') + @mock.patch("graphkb.GraphKBConnection.request") def test_does_not_paginate_when_false(self, graphkb_request, conn): - graphkb_request.side_effect = [{'result': [1, 2, 3]}, {'result': [4, 5]}] + graphkb_request.side_effect = [{"result": [1, 2, 3]}, {"result": [4, 5]}] result = conn.query({}, paginate=False, limit=3) assert result == [1, 2, 3] - @mock.patch('graphkb.GraphKBConnection.request') + @mock.patch("graphkb.GraphKBConnection.request") def test_paginates_by_default(self, graphkb_request, conn): - graphkb_request.side_effect = [{'result': [1, 2, 3]}, {'result': [4, 5]}] + graphkb_request.side_effect = [{"result": [1, 2, 3]}, {"result": [4, 5]}] result = conn.query({}, paginate=True, limit=3) assert result == [1, 2, 3, 4, 5] diff --git a/tests/test_match.py b/tests/test_match.py index 580f045..de736ff 100644 --- a/tests/test_match.py +++ b/tests/test_match.py @@ -5,111 +5,116 @@ import pytest +import graphkb from graphkb import GraphKBConnection, match +from graphkb.constants import DEFAULT_NON_STRUCTURAL_VARIANT_TYPE, STRUCTURAL_VARIANT_SIZE_THRESHOLD from graphkb.util import FeatureNotFoundError -EXCLUDE_INTEGRATION_TESTS = os.environ.get('EXCLUDE_INTEGRATION_TESTS') == '1' +# Test datasets +from .data import structuralVariants -INCREASE_PREFIXES = ['up', 'increase', 'over', 'gain', 'amp'] -DECREASE_PREFIXES = ['down', 'decrease', 'reduce', 'under', 'loss', 'delet'] -GENERAL_MUTATION = 'mutation' +EXCLUDE_INTEGRATION_TESTS = os.environ.get("EXCLUDE_INTEGRATION_TESTS") == "1" + +INCREASE_PREFIXES = ["up", "increase", "over", "gain", "amp"] +DECREASE_PREFIXES = ["down", "decrease", "reduce", "under", "loss", "delet"] +GENERAL_MUTATION = "mutation" def has_prefix(word: str, prefixes: List[str]) -> bool: for prefix in prefixes: - if re.search(r'\b' + prefix, word): + if re.search(r"\b" + prefix, word): return True return False -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def conn() -> GraphKBConnection: conn = GraphKBConnection() - conn.login(os.environ['GRAPHKB_USER'], os.environ['GRAPHKB_PASS']) + conn.login(os.environ["GRAPHKB_USER"], os.environ["GRAPHKB_PASS"]) return conn -@pytest.fixture(scope='class') +@pytest.fixture(scope="class") def kras(conn): - return [f['displayName'] for f in match.get_equivalent_features(conn, 'kras')] + return [f["displayName"] for f in match.get_equivalent_features(conn, "kras")] class TestGetEquivalentFeatures: def test_kras_has_self(self, kras): - assert 'KRAS' in kras + assert "KRAS" in kras def test_expands_aliases(self, kras): - assert 'KRAS2' in kras + assert "KRAS2" in kras def test_expands_elements(self, kras): - assert 'NM_033360' in kras - assert 'ENST00000311936' in kras + assert "NM_033360" in kras + assert "ENST00000311936" in kras def test_expands_generalizations(self, kras): - assert 'NM_033360.4' in kras - assert 'ENSG00000133703.11' in kras + assert "NM_033360.4" in kras + assert "ENSG00000133703.11" in kras def test_expands_generalizations_kras(self, kras): - assert 'NM_033360.4' in kras - assert 'NM_033360' in kras - assert 'ENSG00000133703.11' in kras - assert 'ENSG00000133703' in kras + assert "NM_033360.4" in kras + assert "NM_033360" in kras + assert "ENSG00000133703.11" in kras + assert "ENSG00000133703" in kras @pytest.mark.parametrize( - 'alt_rep', ('NM_033360.4', 'NM_033360', 'ENSG00000133703.11', 'ENSG00000133703') + "alt_rep", ("NM_033360.4", "NM_033360", "ENSG00000133703.11", "ENSG00000133703") ) def test_expands_generalizations_refseq(self, alt_rep, conn): - kras = [f['displayName'] for f in match.get_equivalent_features(conn, alt_rep)] - assert 'NM_033360.4' in kras - assert 'NM_033360' in kras - assert 'ENSG00000133703.11' in kras - assert 'ENSG00000133703' in kras + kras = [f["displayName"] for f in match.get_equivalent_features(conn, alt_rep)] + assert "NM_033360.4" in kras + assert "NM_033360" in kras + assert "ENSG00000133703.11" in kras + assert "ENSG00000133703" in kras def test_checks_by_source_id_kras(self, conn): kras = [ - f['displayName'] + f["displayName"] for f in match.get_equivalent_features( - conn, 'nm_033360', source='refseq', source_id_version='4', is_source_id=True + conn, "nm_033360", source="refseq", source_id_version="4", is_source_id=True ) ] - assert 'KRAS' in kras + assert "KRAS" in kras class TestMatchCopyVariant: def test_bad_category(self, conn): with pytest.raises(ValueError): - match.match_copy_variant(conn, 'kras', 'not a copy number') + match.match_copy_variant(conn, "kras", "not a copy number") def test_bad_gene_name(self, conn): with pytest.raises(FeatureNotFoundError): - match.match_copy_variant(conn, 'not a real gene name', match.INPUT_COPY_CATEGORIES.AMP) + match.match_copy_variant(conn, "not a real gene name", match.INPUT_COPY_CATEGORIES.AMP) def test_known_loss(self, conn): - matches = match.match_copy_variant(conn, 'CDKN2A', match.INPUT_COPY_CATEGORIES.ANY_LOSS) + matches = match.match_copy_variant(conn, "CDKN2A", match.INPUT_COPY_CATEGORIES.ANY_LOSS) assert matches - types_selected = {record['type']['name'] for record in matches} - zygositys = {record['zygosity'] for record in matches} + types_selected = {record["type"]["name"] for record in matches} + zygositys = {record["zygosity"] for record in matches} assert match.INPUT_COPY_CATEGORIES.ANY_LOSS in types_selected assert match.INPUT_COPY_CATEGORIES.AMP not in types_selected assert GENERAL_MUTATION not in types_selected - assert 'homozygous' in zygositys + assert "homozygous" in zygositys for variant_type in types_selected: assert not has_prefix(variant_type, INCREASE_PREFIXES) def test_known_loss_zygosity_filtered(self, conn): matches = match.match_copy_variant( - conn, 'CDKN2A', match.INPUT_COPY_CATEGORIES.ANY_LOSS, True + conn, "CDKN2A", match.INPUT_COPY_CATEGORIES.ANY_LOSS, True ) assert matches - types_selected = {record['type']['name'] for record in matches} - zygositys = {record['zygosity'] for record in matches} + types_selected = {record["type"]["name"] for record in matches} + zygositys = {record["zygosity"] for record in matches} - assert 'homozygous' not in zygositys + assert "homozygous" not in zygositys assert GENERAL_MUTATION not in types_selected assert match.INPUT_COPY_CATEGORIES.ANY_LOSS in types_selected @@ -119,10 +124,10 @@ def test_known_loss_zygosity_filtered(self, conn): assert not has_prefix(variant_type, INCREASE_PREFIXES) def test_known_gain(self, conn): - matches = match.match_copy_variant(conn, 'KRAS', 'copy gain') + matches = match.match_copy_variant(conn, "KRAS", "copy gain") assert matches - types_selected = {record['type']['name'] for record in matches} + types_selected = {record["type"]["name"] for record in matches} assert GENERAL_MUTATION not in types_selected assert match.INPUT_COPY_CATEGORIES.AMP in types_selected @@ -135,9 +140,9 @@ def test_known_gain(self, conn): EXCLUDE_INTEGRATION_TESTS, reason="excluding long running integration tests" ) def test_low_gain_excludes_amplification(self, conn): - matches = match.match_copy_variant(conn, 'KRAS', match.INPUT_COPY_CATEGORIES.GAIN) + matches = match.match_copy_variant(conn, "KRAS", match.INPUT_COPY_CATEGORIES.GAIN) - types_selected = {record['type']['name'] for record in matches} + types_selected = {record["type"]["name"] for record in matches} assert match.INPUT_COPY_CATEGORIES.AMP not in types_selected assert match.INPUT_COPY_CATEGORIES.LOSS not in types_selected @@ -147,34 +152,34 @@ def test_low_gain_excludes_amplification(self, conn): assert not has_prefix(variant_type, DECREASE_PREFIXES) -@pytest.mark.parametrize('pos1,pos2_start,pos2_end', [[3, 2, 5], [2, None, 5], [3, 2, None]]) +@pytest.mark.parametrize("pos1,pos2_start,pos2_end", [[3, 2, 5], [2, None, 5], [3, 2, None]]) def test_range_overlap(pos1, pos2_start, pos2_end): - assert match.positions_overlap({'pos': pos1}, {'pos': pos2_start}, {'pos': pos2_end}) + assert match.positions_overlap({"pos": pos1}, {"pos": pos2_start}, {"pos": pos2_end}) @pytest.mark.parametrize( - 'pos1,pos2_start,pos2_end', + "pos1,pos2_start,pos2_end", [[2, 4, 5], [5, 2, 3], [10, None, 9], [10, 11, None], [1, 2, 2], [2, 1, 1]], ) def test_range_not_overlap(pos1, pos2_start, pos2_end): - assert not match.positions_overlap({'pos': pos1}, {'pos': pos2_start}, {'pos': pos2_end}) + assert not match.positions_overlap({"pos": pos1}, {"pos": pos2_start}, {"pos": pos2_end}) -@pytest.mark.parametrize('pos1', [None, 1]) -@pytest.mark.parametrize('pos2', [None, 1]) +@pytest.mark.parametrize("pos1", [None, 1]) +@pytest.mark.parametrize("pos2", [None, 1]) def test_position_match(pos1, pos2): - assert match.positions_overlap({'pos': pos1}, {'pos': pos2}) + assert match.positions_overlap({"pos": pos1}, {"pos": pos2}) class TestMatchExpressionVariant: def test_bad_category(self, conn): with pytest.raises(ValueError): - match.match_expression_variant(conn, 'PTEN', 'not a expression category') + match.match_expression_variant(conn, "PTEN", "not a expression category") def test_bad_gene_name(self, conn): with pytest.raises(FeatureNotFoundError): match.match_expression_variant( - conn, 'not a real gene name', match.INPUT_EXPRESSION_CATEGORIES.UP + conn, "not a real gene name", match.INPUT_EXPRESSION_CATEGORIES.UP ) @pytest.mark.skipif( @@ -182,11 +187,11 @@ def test_bad_gene_name(self, conn): ) def test_known_reduced_expression(self, conn): matches = match.match_expression_variant( - conn, 'PTEN', match.INPUT_EXPRESSION_CATEGORIES.DOWN + conn, "PTEN", match.INPUT_EXPRESSION_CATEGORIES.DOWN ) assert matches - types_selected = {record['type']['name'] for record in matches} + types_selected = {record["type"]["name"] for record in matches} assert match.INPUT_EXPRESSION_CATEGORIES.UP not in types_selected assert GENERAL_MUTATION not in types_selected @@ -195,13 +200,13 @@ def test_known_reduced_expression(self, conn): assert not has_prefix(variant_type, INCREASE_PREFIXES) def test_known_reduced_expression_gene_id(self, conn): - gene_id = conn.query({'target': 'Feature', 'filters': [{'name': 'PTEN'}]})[0]['@rid'] + gene_id = conn.query({"target": "Feature", "filters": [{"name": "PTEN"}]})[0]["@rid"] matches = match.match_expression_variant( conn, gene_id, match.INPUT_EXPRESSION_CATEGORIES.DOWN ) assert matches - types_selected = {record['type']['name'] for record in matches} + types_selected = {record["type"]["name"] for record in matches} assert match.INPUT_EXPRESSION_CATEGORIES.UP not in types_selected assert GENERAL_MUTATION not in types_selected @@ -213,10 +218,10 @@ def test_known_reduced_expression_gene_id(self, conn): EXCLUDE_INTEGRATION_TESTS, reason="excluding long running integration tests" ) def test_known_increased_expression(self, conn): - matches = match.match_expression_variant(conn, 'CA9', match.INPUT_EXPRESSION_CATEGORIES.UP) + matches = match.match_expression_variant(conn, "CA9", match.INPUT_EXPRESSION_CATEGORIES.UP) assert matches - types_selected = {record['type']['name'] for record in matches} + types_selected = {record["type"]["name"] for record in matches} assert match.INPUT_EXPRESSION_CATEGORIES.UP not in types_selected assert GENERAL_MUTATION not in types_selected @@ -228,100 +233,100 @@ def test_known_increased_expression(self, conn): class TestComparePositionalVariants: def test_nonspecific_altseq(self): assert match.compare_positional_variants( - {'break1Start': {'pos': 1}}, {'break1Start': {'pos': 1}} + {"break1Start": {"pos": 1}}, {"break1Start": {"pos": 1}} ) # null matches anything assert match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'untemplatedSeq': 'T'}, {'break1Start': {'pos': 1}} + {"break1Start": {"pos": 1}, "untemplatedSeq": "T"}, {"break1Start": {"pos": 1}} ) assert match.compare_positional_variants( - {'break1Start': {'pos': 1}}, {'break1Start': {'pos': 1}, 'untemplatedSeq': 'T'} + {"break1Start": {"pos": 1}}, {"break1Start": {"pos": 1}, "untemplatedSeq": "T"} ) - @pytest.mark.parametrize('seq1', ['T', 'X', '?']) - @pytest.mark.parametrize('seq2', ['T', 'X', '?']) + @pytest.mark.parametrize("seq1", ["T", "X", "?"]) + @pytest.mark.parametrize("seq2", ["T", "X", "?"]) def test_ambiguous_altseq(self, seq1, seq2): # ambiguous AA matches anything the same length assert match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'untemplatedSeq': seq1}, - {'break1Start': {'pos': 1}, 'untemplatedSeq': seq2}, + {"break1Start": {"pos": 1}, "untemplatedSeq": seq1}, + {"break1Start": {"pos": 1}, "untemplatedSeq": seq2}, ) def test_altseq_length_mismatch(self): assert not match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'untemplatedSeq': '??'}, - {'break1Start': {'pos': 1}, 'untemplatedSeq': 'T'}, + {"break1Start": {"pos": 1}, "untemplatedSeq": "??"}, + {"break1Start": {"pos": 1}, "untemplatedSeq": "T"}, ) assert not match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'untemplatedSeq': '?'}, - {'break1Start': {'pos': 1}, 'untemplatedSeq': 'TT'}, + {"break1Start": {"pos": 1}, "untemplatedSeq": "?"}, + {"break1Start": {"pos": 1}, "untemplatedSeq": "TT"}, ) def test_nonspecific_refseq(self): # null matches anything assert match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'refSeq': 'T'}, {'break1Start': {'pos': 1}} + {"break1Start": {"pos": 1}, "refSeq": "T"}, {"break1Start": {"pos": 1}} ) assert match.compare_positional_variants( - {'break1Start': {'pos': 1}}, {'break1Start': {'pos': 1}, 'refSeq': 'T'} + {"break1Start": {"pos": 1}}, {"break1Start": {"pos": 1}, "refSeq": "T"} ) - @pytest.mark.parametrize('seq1', ['T', 'X', '?']) - @pytest.mark.parametrize('seq2', ['T', 'X', '?']) + @pytest.mark.parametrize("seq1", ["T", "X", "?"]) + @pytest.mark.parametrize("seq2", ["T", "X", "?"]) def test_ambiguous_refseq(self, seq1, seq2): # ambiguous AA matches anything the same length assert match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'refSeq': seq1}, {'break1Start': {'pos': 1}, 'refSeq': seq2} + {"break1Start": {"pos": 1}, "refSeq": seq1}, {"break1Start": {"pos": 1}, "refSeq": seq2} ) def test_refseq_length_mismatch(self): assert not match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'refSeq': '??'}, {'break1Start': {'pos': 1}, 'refSeq': 'T'} + {"break1Start": {"pos": 1}, "refSeq": "??"}, {"break1Start": {"pos": 1}, "refSeq": "T"} ) assert not match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'refSeq': '?'}, {'break1Start': {'pos': 1}, 'refSeq': 'TT'} + {"break1Start": {"pos": 1}, "refSeq": "?"}, {"break1Start": {"pos": 1}, "refSeq": "TT"} ) def test_diff_altseq(self): assert not match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'untemplatedSeq': 'M'}, - {'break1Start': {'pos': 1}, 'untemplatedSeq': 'R'}, + {"break1Start": {"pos": 1}, "untemplatedSeq": "M"}, + {"break1Start": {"pos": 1}, "untemplatedSeq": "R"}, ) def test_same_altseq_matches(self): assert match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'untemplatedSeq': 'R'}, - {'break1Start': {'pos': 1}, 'untemplatedSeq': 'R'}, + {"break1Start": {"pos": 1}, "untemplatedSeq": "R"}, + {"break1Start": {"pos": 1}, "untemplatedSeq": "R"}, ) def test_diff_refseq(self): assert not match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'refSeq': 'M'}, {'break1Start': {'pos': 1}, 'refSeq': 'R'} + {"break1Start": {"pos": 1}, "refSeq": "M"}, {"break1Start": {"pos": 1}, "refSeq": "R"} ) def test_same_refseq_matches(self): assert match.compare_positional_variants( - {'break1Start': {'pos': 1}, 'refSeq': 'R'}, {'break1Start': {'pos': 1}, 'refSeq': 'R'} + {"break1Start": {"pos": 1}, "refSeq": "R"}, {"break1Start": {"pos": 1}, "refSeq": "R"} ) def test_range_vs_sub(self): sub = { - 'break1Repr': 'p.G776', - 'break1Start': {'@Class': 'ProteinPosition', 'pos': 776, 'refAA': 'G'}, - 'break2Repr': 'p.V777', - 'break2Start': {'@Class': 'ProteinPosition', 'pos': 777, 'refAA': 'V'}, - 'reference1': 'ERBB2', - 'type': 'insertion', - 'untemplatedSeq': 'YVMA', - 'untemplatedSeqSize': 4, + "break1Repr": "p.G776", + "break1Start": {"@Class": "ProteinPosition", "pos": 776, "refAA": "G"}, + "break2Repr": "p.V777", + "break2Start": {"@Class": "ProteinPosition", "pos": 777, "refAA": "V"}, + "reference1": "ERBB2", + "type": "insertion", + "untemplatedSeq": "YVMA", + "untemplatedSeqSize": 4, } range_variant = { - 'break1Repr': 'p.G776', - 'break1Start': {'@Class': 'ProteinPosition', 'pos': 776, 'refAA': 'G'}, - 'break2Repr': 'p.?776', - 'break2Start': None, - 'refSeq': 'G', - 'untemplatedSeq': 'VV', + "break1Repr": "p.G776", + "break1Start": {"@Class": "ProteinPosition", "pos": 776, "refAA": "G"}, + "break2Repr": "p.?776", + "break2Start": None, + "refSeq": "G", + "untemplatedSeq": "VV", } assert not match.compare_positional_variants(sub, range_variant) assert not match.compare_positional_variants(range_variant, sub) @@ -330,43 +335,43 @@ def test_range_vs_sub(self): class TestMatchPositionalVariant: def test_error_on_duplicate_reference1(self, conn): with pytest.raises(ValueError): - match.match_positional_variant(conn, 'KRAS:p.G12D', '#123:34') + match.match_positional_variant(conn, "KRAS:p.G12D", "#123:34") def test_error_on_bad_reference2(self, conn): with pytest.raises(ValueError): - match.match_positional_variant(conn, 'KRAS:p.G12D', reference2='#123:34') + match.match_positional_variant(conn, "KRAS:p.G12D", reference2="#123:34") def test_error_on_duplicate_reference2(self, conn): with pytest.raises(ValueError): match.match_positional_variant( - conn, '(BCR,ABL1):fusion(e.13,e.3)', reference2='#123:34' + conn, "(BCR,ABL1):fusion(e.13,e.3)", reference2="#123:34" ) def test_uncertain_position_not_supported(self, conn): with pytest.raises(NotImplementedError): - match.match_positional_variant(conn, '(BCR,ABL1):fusion(e.13_24,e.3)') + match.match_positional_variant(conn, "(BCR,ABL1):fusion(e.13_24,e.3)") def test_bad_gene_name(self, conn): with pytest.raises(FeatureNotFoundError): - match.match_positional_variant(conn, 'ME-AS-A-GENE:p.G12D') + match.match_positional_variant(conn, "ME-AS-A-GENE:p.G12D") def test_bad_gene2_name(self, conn): with pytest.raises(FeatureNotFoundError): - match.match_positional_variant(conn, '(BCR,ME-AS-A-GENE):fusion(e.13,e.3)') + match.match_positional_variant(conn, "(BCR,ME-AS-A-GENE):fusion(e.13,e.3)") def test_match_explicit_reference1(self, conn): - reference1 = conn.query({'target': 'Feature', 'filters': {'name': 'KRAS'}})[0]['@rid'] - matches = match.match_positional_variant(conn, 'p.G12D', reference1=reference1) + reference1 = conn.query({"target": "Feature", "filters": {"name": "KRAS"}})[0]["@rid"] + matches = match.match_positional_variant(conn, "p.G12D", reference1=reference1) assert matches @pytest.mark.skipif( EXCLUDE_INTEGRATION_TESTS, reason="excluding long running integration tests" ) def test_match_explicit_references(self, conn): - reference1 = conn.query({'target': 'Feature', 'filters': {'name': 'BCR'}})[0]['@rid'] - reference2 = conn.query({'target': 'Feature', 'filters': {'name': 'ABL1'}})[0]['@rid'] + reference1 = conn.query({"target": "Feature", "filters": {"name": "BCR"}})[0]["@rid"] + reference2 = conn.query({"target": "Feature", "filters": {"name": "ABL1"}})[0]["@rid"] matches = match.match_positional_variant( - conn, 'fusion(e.13,e.3)', reference1=reference1, reference2=reference2 + conn, "fusion(e.13,e.3)", reference1=reference1, reference2=reference2 ) assert matches @@ -374,17 +379,17 @@ def test_match_explicit_references(self, conn): EXCLUDE_INTEGRATION_TESTS, reason="excluding long running integration tests" ) @pytest.mark.parametrize( - 'known_variant,related_variants,unrelated_variants', + "known_variant,related_variants,unrelated_variants", [ - ['KRAS:p.G12D', ['KRAS:p.G12X', 'chr12:g.25398284C>T'], ['KRAS:p.G12V']], - ['KRAS:p.G13D', ['KRAS:p.?13mut'], []], - ['chr12:g.25398284C>T', ['KRAS:p.G12D'], ['KRAS:p.G12V']], - ['EGFR:p.E746_S752delinsI', ['EGFR mutation'], ['EGFR copy variant']], + ["KRAS:p.G12D", ["KRAS:p.G12X", "chr12:g.25398284C>T"], ["KRAS:p.G12V"]], + ["KRAS:p.G13D", ["KRAS:p.?13mut"], []], + ["chr12:g.25398284C>T", ["KRAS:p.G12D"], ["KRAS:p.G12V"]], + ["EGFR:p.E746_S752delinsI", ["EGFR mutation"], ["EGFR copy variant"]], ], ) def test_known_variants(self, conn, known_variant, related_variants, unrelated_variants): matches = match.match_positional_variant(conn, known_variant) - names = {m['displayName'] for m in matches} + names = {m["displayName"] for m in matches} assert matches assert known_variant in names for variant in related_variants: @@ -393,40 +398,40 @@ def test_known_variants(self, conn, known_variant, related_variants, unrelated_v assert variant not in names @pytest.mark.parametrize( - 'known_variant,related_variants', + "known_variant,related_variants", [ - ['(BCR,ABL1):fusion(e.13,e.3)', ['BCR and ABL1 fusion']], - ['(ATP1B1,NRG1):fusion(e.2,e.2)', ['NRG1 fusion', 'ATP1B1 and NRG1 fusion']], + ["(BCR,ABL1):fusion(e.13,e.3)", ["BCR and ABL1 fusion"]], + ["(ATP1B1,NRG1):fusion(e.2,e.2)", ["NRG1 fusion", "ATP1B1 and NRG1 fusion"]], ], ) def test_known_fusions(self, conn, known_variant, related_variants): matches = match.match_positional_variant(conn, known_variant) - types_selected = [m['type']['name'] for m in matches] + types_selected = [m["type"]["name"] for m in matches] assert GENERAL_MUTATION not in types_selected - names = {m['displayName'] for m in matches} + names = {m["displayName"] for m in matches} assert matches assert known_variant in names for variant in related_variants: assert variant in names def test_known_fusion_single_gene_no_match(self, conn): - known = '(TERT,?):fusion(e.1,e.?)' + known = "(TERT,?):fusion(e.1,e.?)" matches = match.match_positional_variant(conn, known) assert not matches def test_novel_specific_matches_general(self, conn): - novel_specific = 'CDKN2A:p.T18888888888888888888M' + novel_specific = "CDKN2A:p.T18888888888888888888M" matches = match.match_positional_variant(conn, novel_specific) - names = {m['displayName'] for m in matches} + names = {m["displayName"] for m in matches} assert matches assert novel_specific not in names - assert 'CDKN2A mutation' in names + assert "CDKN2A mutation" in names @pytest.mark.skipif( EXCLUDE_INTEGRATION_TESTS, reason="excluding long running integration tests" ) def test_genomic_coordinates(self, conn): - genomic = 'X:g.100611165A>T' + genomic = "X:g.100611165A>T" match.match_positional_variant(conn, genomic) # no assert b/c checking for no error rather than the result @@ -434,39 +439,194 @@ def test_genomic_coordinates(self, conn): EXCLUDE_INTEGRATION_TESTS, reason="excluding long running integration tests" ) def test_tert_promoter(self, conn): - assert match.match_positional_variant(conn, 'TERT:c.-124C>T') + assert match.match_positional_variant(conn, "TERT:c.-124C>T") @pytest.mark.skipif( True, reason="GERO-303 - technically incorrect notation for GSC backwards compatibility." ) def test_tert_promoter_leading_one_alt_notation(self, conn): # GERO-303 - technically this format is incorrect. - assert match.match_positional_variant(conn, 'TERT:c.1-124C>T') + assert match.match_positional_variant(conn, "TERT:c.1-124C>T") def test_missense_is_not_nonsense(self, conn): """GERO-299 - nonsense mutation creates a stop codon and is usually more severe.""" # equivalent TP53 notations - genomic = 'chr17:g.7674252C>T' - cds = 'ENST00000269305:c.711G>A' - protein = 'TP53:p.M237I' + genomic = "chr17:g.7674252C>T" + cds = "ENST00000269305:c.711G>A" + protein = "TP53:p.M237I" for mut in (protein, genomic, cds): matches = match.match_positional_variant(conn, mut) - nonsense = [m for m in matches if 'nonsense' in m['displayName']] + nonsense = [m for m in matches if "nonsense" in m["displayName"]] assert ( not nonsense ), f"Missense {mut} is not a nonsense variant: {((m['displayName'], m['@rid']) for m in nonsense)}" + def test_structural_variants(self, conn): + """KBDEV-1056""" + for variant_string, expected in structuralVariants.items(): + print(variant_string) + # Querying matches for variant_string + m = match.match_positional_variant(conn, variant_string) + MatchingDisplayNames = [el["displayName"] for el in m] + MatchingTypes = [el["type"]["name"] for el in m] + + # Match + for displayName in expected.get('matches', {}).get("displayName", []): + assert displayName in MatchingDisplayNames + for type in expected.get('matches', {}).get("type", []): + assert type in MatchingTypes + + # Does not match + for displayName in MatchingDisplayNames: + assert displayName not in expected.get('does_not_matches', {}).get( + "displayName", [] + ) + for type in MatchingTypes: + assert type not in expected.get('does_not_matches', {}).get("type", []) + class TestCacheMissingFeatures: def test_filling_cache(self): mock_conn = MagicMock( query=MagicMock( return_value=[ - {'name': 'bob', 'sourceId': 'alice'}, - {'name': 'KRAS', 'sourceId': '1234'}, + {"name": "bob", "sourceId": "alice"}, + {"name": "KRAS", "sourceId": "1234"}, ] ) ) match.cache_missing_features(mock_conn) - assert 'kras' in match.FEATURES_CACHE - assert 'alice' in match.FEATURES_CACHE + assert "kras" in match.FEATURES_CACHE + assert "alice" in match.FEATURES_CACHE + + +class TestTypeScreening: + # Types as class variables + default_type = DEFAULT_NON_STRUCTURAL_VARIANT_TYPE + threshold = STRUCTURAL_VARIANT_SIZE_THRESHOLD + unambiguous_structural = [ + "fusion", + "translocation", + ] + ambiguous_structural = [ + "duplication", + "deletion", + "insertion", + "indel", + ] + non_structural = [ + "substitution", + "missense", + "nonsense", + "frameshift", + "truncating", + ] + + def test_type_screening_update(self, conn, monkeypatch): + # Monkey-patching get_terms_set() + def mock_get_terms_set(graphkb_conn, base_terms): + nonlocal called + called = True + return set() + + monkeypatch.setattr("graphkb.match.get_terms_set", mock_get_terms_set) + + # Assert get_terms_set() has been called + called = False + graphkb.match.type_screening(conn, {"type": ""}, updateStructuralTypes=True) + assert called + + # Assert get_terms_set() has not been called (default behavior) + called = False + graphkb.match.type_screening(conn, {"type": ""}) + assert not called + + def test_type_screening_non_structural(self, conn): + for type in TestTypeScreening.non_structural: + # type substitution and alike + assert match.type_screening(conn, {"type": type}) == type + + def test_type_screening_structural(self, conn): + for type in TestTypeScreening.unambiguous_structural: + # type fusion and alike + assert match.type_screening(conn, {"type": type}) == type + for type in TestTypeScreening.ambiguous_structural: + # w/ reference2 + assert match.type_screening(conn, {"type": type, "reference2": "#123:45"}) == type + # w/ cytoband coordinates + assert match.type_screening(conn, {"type": type, "prefix": "y"}) == type + + def test_type_screening_structural_ambiguous_size(self, conn): + for type in TestTypeScreening.ambiguous_structural: + # coordinate system with ambiguous size + for prefix in ['e', 'i']: + assert ( + match.type_screening( + conn, + { + "type": type, + "break2Start": {"pos": TestTypeScreening.threshold}, + "prefix": prefix, + }, + ) + == TestTypeScreening.default_type + ) + + def test_type_screening_structural_untemplatedSeqSize(self, conn): + for type in TestTypeScreening.ambiguous_structural: + # Variation length too small (< threshold) + assert ( + match.type_screening( + conn, + { + "type": type, + "untemplatedSeqSize": TestTypeScreening.threshold - 1, + }, + ) + == TestTypeScreening.default_type + ) + # Variation length big enough (>= threshold) + assert ( + match.type_screening( + conn, + { + "type": type, + "untemplatedSeqSize": TestTypeScreening.threshold, + }, + ) + == type + ) + + def test_type_screening_structural_positions(self, conn): + for type in TestTypeScreening.ambiguous_structural: + # Variation length too small (< threshold) + for opt in [ + {"break2Start": {"pos": TestTypeScreening.threshold - 1}}, + {"break2Start": {"pos": TestTypeScreening.threshold - 1}, "prefix": "c"}, + {"break2Start": {"pos": TestTypeScreening.threshold - 1}, "prefix": "g"}, + {"break2Start": {"pos": TestTypeScreening.threshold - 1}, "prefix": "n"}, + {"break2Start": {"pos": TestTypeScreening.threshold - 1}, "prefix": "r"}, + {"break2Start": {"pos": int(TestTypeScreening.threshold / 3) - 1}, "prefix": "p"}, + { + "break1Start": {"pos": 1 + 99}, + "break2Start": {"pos": TestTypeScreening.threshold + 99 - 1}, + }, + ]: + assert ( + match.type_screening(conn, {"type": type, **opt}) + == TestTypeScreening.default_type + ) + # Variation length big enough (>= threshold) + for opt in [ + {"break2Start": {"pos": TestTypeScreening.threshold}}, + {"break2Start": {"pos": TestTypeScreening.threshold}, "prefix": "c"}, + {"break2Start": {"pos": TestTypeScreening.threshold}, "prefix": "g"}, + {"break2Start": {"pos": TestTypeScreening.threshold}, "prefix": "n"}, + {"break2Start": {"pos": TestTypeScreening.threshold}, "prefix": "r"}, + {"break2Start": {"pos": int(TestTypeScreening.threshold / 3) + 1}, "prefix": "p"}, + { + "break1Start": {"pos": 1 + 99}, + "break2Start": {"pos": TestTypeScreening.threshold + 99}, + }, + ]: + assert match.type_screening(conn, {"type": type, **opt}) == type diff --git a/tests/test_statement.py b/tests/test_statement.py index 2c7b8e7..aa032a6 100644 --- a/tests/test_statement.py +++ b/tests/test_statement.py @@ -7,28 +7,28 @@ from .test_match import conn -EXCLUDE_INTEGRATION_TESTS = os.environ.get('EXCLUDE_INTEGRATION_TESTS') == '1' +EXCLUDE_INTEGRATION_TESTS = os.environ.get("EXCLUDE_INTEGRATION_TESTS") == "1" @pytest.fixture() def graphkb_conn(): def make_rid_list(*values): - return [{'@rid': v} for v in values] + return [{"@rid": v} for v in values] def term_tree_calls(*final_values): # this function makes 2 calls to conn.query here - sets = [['fake'], final_values] + sets = [["fake"], final_values] return [make_rid_list(*s) for s in sets] return_values = [ - *term_tree_calls('1'), # therapeutic - *term_tree_calls('2'), # therapeutic (2nd base term) - *term_tree_calls('3'), # diagnostic - *term_tree_calls('4'), # prognostic - *term_tree_calls('5'), # pharmacogenomic ['metabolism'] - *term_tree_calls('6'), # pharmacogenomic ['toxicity'] - *term_tree_calls('7'), # pharmacogenomic ['dosage'] - *term_tree_calls('8'), # cancer predisposition + *term_tree_calls("1"), # therapeutic + *term_tree_calls("2"), # therapeutic (2nd base term) + *term_tree_calls("3"), # diagnostic + *term_tree_calls("4"), # prognostic + *term_tree_calls("5"), # pharmacogenomic ['metabolism'] + *term_tree_calls("6"), # pharmacogenomic ['toxicity'] + *term_tree_calls("7"), # pharmacogenomic ['dosage'] + *term_tree_calls("8"), # cancer predisposition *term_tree_calls(), # biological *term_tree_calls(), # biological (2nd base term) *term_tree_calls(), # biological (3rd base term) @@ -41,52 +41,52 @@ def term_tree_calls(*final_values): class TestCategorizeRelevance: def test_default_categories(self, graphkb_conn): - category = statement.categorize_relevance(graphkb_conn, '1') - assert category == 'therapeutic' + category = statement.categorize_relevance(graphkb_conn, "1") + assert category == "therapeutic" def test_first_match_returns(self, graphkb_conn): - category = statement.categorize_relevance(graphkb_conn, '2') - assert category == 'therapeutic' + category = statement.categorize_relevance(graphkb_conn, "2") + assert category == "therapeutic" def test_second_category(self, graphkb_conn): - category = statement.categorize_relevance(graphkb_conn, '3') - assert category == 'diagnostic' + category = statement.categorize_relevance(graphkb_conn, "3") + assert category == "diagnostic" def test_third_category(self, graphkb_conn): - category = statement.categorize_relevance(graphkb_conn, '4') - assert category == 'prognostic' + category = statement.categorize_relevance(graphkb_conn, "4") + assert category == "prognostic" def test_fourth_category(self, graphkb_conn): - category = statement.categorize_relevance(graphkb_conn, '5') - assert category == 'pharmacogenomic' + category = statement.categorize_relevance(graphkb_conn, "5") + assert category == "pharmacogenomic" def test_fifth_category(self, graphkb_conn): - category = statement.categorize_relevance(graphkb_conn, '6') - assert category == 'pharmacogenomic' + category = statement.categorize_relevance(graphkb_conn, "6") + assert category == "pharmacogenomic" def test_predisposition_category(self, graphkb_conn): - category = statement.categorize_relevance(graphkb_conn, '8') - assert category == 'cancer predisposition' + category = statement.categorize_relevance(graphkb_conn, "8") + assert category == "cancer predisposition" def test_no_match(self, graphkb_conn): - category = statement.categorize_relevance(graphkb_conn, 'x') - assert category == '' + category = statement.categorize_relevance(graphkb_conn, "x") + assert category == "" def test_custom_categories(self, graphkb_conn): category = statement.categorize_relevance( - graphkb_conn, 'x', [('blargh', ['some', 'blargh'])] + graphkb_conn, "x", [("blargh", ["some", "blargh"])] ) - assert category == '' + assert category == "" category = statement.categorize_relevance( - graphkb_conn, '1', [('blargh', ['some', 'blargh'])] + graphkb_conn, "1", [("blargh", ["some", "blargh"])] ) - assert category == 'blargh' + assert category == "blargh" @pytest.mark.skipif(EXCLUDE_INTEGRATION_TESTS, reason="excluding long running integration tests") class TestStatementMatch: def test_truncating_categories(self, conn): - variant = {'@class': 'CategoryVariant', '@rid': '#161:429', 'displayName': 'RB1 truncating'} + variant = {"@class": "CategoryVariant", "@rid": "#161:429", "displayName": "RB1 truncating"} statements = statement.get_statements_from_variants(conn, [variant]) assert statements diff --git a/tests/test_util.py b/tests/test_util.py index e05388b..10ff445 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -12,34 +12,34 @@ def __init__(self, name, sourceId, displayName): self.displayName = displayName -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def conn() -> GraphKBConnection: conn = GraphKBConnection() - conn.login(os.environ['GRAPHKB_USER'], os.environ['GRAPHKB_PASS']) + conn.login(os.environ["GRAPHKB_USER"], os.environ["GRAPHKB_PASS"]) return conn class TestLooksLikeRid: - @pytest.mark.parametrize('rid', ['#3:4', '#50:04', '#-3:4', '#-3:-4', '#3:-4']) + @pytest.mark.parametrize("rid", ["#3:4", "#50:04", "#-3:4", "#-3:-4", "#3:-4"]) def test_valid(self, rid): assert util.looks_like_rid(rid) - @pytest.mark.parametrize('rid', ['-3:4', 'KRAS']) + @pytest.mark.parametrize("rid", ["-3:4", "KRAS"]) def test_invalid(self, rid): assert not util.looks_like_rid(rid) @pytest.mark.parametrize( - 'input,result', + "input,result", [ - ['GP5:p.Leu113His', 'GP5:p.L113H'], - ['GP5:p.Lys113His', 'GP5:p.K113H'], - ['CDK11A:p.Arg536Gln', 'CDK11A:p.R536Q'], - ['APC:p.Cys1405*', 'APC:p.C1405*'], - ['ApcTer:p.Cys1405*', 'ApcTer:p.C1405*'], - ['GP5:p.Leu113_His114insLys', 'GP5:p.L113_H114insK'], - ['NP_003997.1:p.Lys23_Val25del', 'NP_003997.1:p.K23_V25del'], - ['LRG_199p1:p.Val7del', 'LRG_199p1:p.V7del'], + ["GP5:p.Leu113His", "GP5:p.L113H"], + ["GP5:p.Lys113His", "GP5:p.K113H"], + ["CDK11A:p.Arg536Gln", "CDK11A:p.R536Q"], + ["APC:p.Cys1405*", "APC:p.C1405*"], + ["ApcTer:p.Cys1405*", "ApcTer:p.C1405*"], + ["GP5:p.Leu113_His114insLys", "GP5:p.L113_H114insK"], + ["NP_003997.1:p.Lys23_Val25del", "NP_003997.1:p.K23_V25del"], + ["LRG_199p1:p.Val7del", "LRG_199p1:p.V7del"], ], ) def test_convert_aa_3to1(input, result): @@ -48,18 +48,18 @@ def test_convert_aa_3to1(input, result): class TestOntologyTermRepr: @pytest.mark.parametrize( - 'termStr,termRepr', [['missense mutation', 'missense mutation'], ['', '']] + "termStr,termRepr", [["missense mutation", "missense mutation"], ["", ""]] ) def test_ontologyTermRepr_str(self, termStr, termRepr): assert util.ontologyTermRepr(termStr) == termRepr @pytest.mark.parametrize( - 'termObjOpt,termRepr', + "termObjOpt,termRepr", [ - [{"displayName": 'abc123', "name": '', "sourceId": ''}, 'abc123'], - [{"displayName": '', "name": '', "sourceId": 'abc123'}, 'abc123'], - [{"displayName": '', "name": 'abc123', "sourceId": ''}, 'abc123'], - [{"displayName": '', "name": '', "sourceId": ''}, ''], + [{"displayName": "abc123", "name": "", "sourceId": ""}, "abc123"], + [{"displayName": "", "name": "", "sourceId": "abc123"}, "abc123"], + [{"displayName": "", "name": "abc123", "sourceId": ""}, "abc123"], + [{"displayName": "", "name": "", "sourceId": ""}, ""], ], ) def test_ontologyTermRepr_obj(self, termObjOpt, termRepr): @@ -69,12 +69,12 @@ def test_ontologyTermRepr_obj(self, termObjOpt, termRepr): class TestStripParentheses: @pytest.mark.parametrize( - 'breakRepr,StrippedBreakRepr', + "breakRepr,StrippedBreakRepr", [ - ['p.(E2015_Q2114)', 'p.E2015_Q2114'], - ['p.(?572_?630)', 'p.?572_?630'], - ['g.178916854', 'g.178916854'], - ['e.10', 'e.10'], + ["p.(E2015_Q2114)", "p.E2015_Q2114"], + ["p.(?572_?630)", "p.?572_?630"], + ["g.178916854", "g.178916854"], + ["e.10", "e.10"], ], ) def test_stripParentheses(self, breakRepr, StrippedBreakRepr): @@ -83,10 +83,10 @@ def test_stripParentheses(self, breakRepr, StrippedBreakRepr): class TestStripRefSeq: @pytest.mark.parametrize( - 'breakRepr,StrippedBreakRepr', + "breakRepr,StrippedBreakRepr", [ - ['p.L2209', 'p.2209'], - ['p.?891', 'p.891'], + ["p.L2209", "p.2209"], + ["p.?891", "p.891"], # TODO: ['p.?572_?630', 'p.572_630'], ], ) @@ -96,31 +96,31 @@ def test_stripRefSeq(self, breakRepr, StrippedBreakRepr): class TestStripDisplayName: @pytest.mark.parametrize( - 'opt,stripDisplayName', + "opt,stripDisplayName", [ - [{'displayName': 'ABL1:p.T315I', 'withRef': True, 'withRefSeq': True}, 'ABL1:p.T315I'], - [{'displayName': 'ABL1:p.T315I', 'withRef': False, 'withRefSeq': True}, 'p.T315I'], - [{'displayName': 'ABL1:p.T315I', 'withRef': True, 'withRefSeq': False}, 'ABL1:p.315I'], - [{'displayName': 'ABL1:p.T315I', 'withRef': False, 'withRefSeq': False}, 'p.315I'], + [{"displayName": "ABL1:p.T315I", "withRef": True, "withRefSeq": True}, "ABL1:p.T315I"], + [{"displayName": "ABL1:p.T315I", "withRef": False, "withRefSeq": True}, "p.T315I"], + [{"displayName": "ABL1:p.T315I", "withRef": True, "withRefSeq": False}, "ABL1:p.315I"], + [{"displayName": "ABL1:p.T315I", "withRef": False, "withRefSeq": False}, "p.315I"], [ - {'displayName': 'chr3:g.41266125C>T', 'withRef': False, 'withRefSeq': False}, - 'g.41266125>T', + {"displayName": "chr3:g.41266125C>T", "withRef": False, "withRefSeq": False}, + "g.41266125>T", ], [ { - 'displayName': 'chrX:g.99662504_99662505insG', - 'withRef': False, - 'withRefSeq': False, + "displayName": "chrX:g.99662504_99662505insG", + "withRef": False, + "withRefSeq": False, }, - 'g.99662504_99662505insG', + "g.99662504_99662505insG", ], [ { - 'displayName': 'chrX:g.99662504_99662505dup', - 'withRef': False, - 'withRefSeq': False, + "displayName": "chrX:g.99662504_99662505dup", + "withRef": False, + "withRefSeq": False, }, - 'g.99662504_99662505dup', + "g.99662504_99662505dup", ], # TODO: [{'displayName': 'VHL:c.330_331delCAinsTT', 'withRef': False, 'withRefSeq': False}, 'c.330_331delinsTT'], # TODO: [{'displayName': 'VHL:c.464-2G>A', 'withRef': False, 'withRefSeq': False}, 'c.464-2>A'], @@ -132,39 +132,39 @@ def test_stripDisplayName(self, opt, stripDisplayName): class TestStringifyVariant: @pytest.mark.parametrize( - 'hgvs_string,opt,stringifiedVariant', + "hgvs_string,opt,stringifiedVariant", [ - ['VHL:c.345C>G', {'withRef': True, 'withRefSeq': True}, 'VHL:c.345C>G'], - ['VHL:c.345C>G', {'withRef': False, 'withRefSeq': True}, 'c.345C>G'], - ['VHL:c.345C>G', {'withRef': True, 'withRefSeq': False}, 'VHL:c.345>G'], - ['VHL:c.345C>G', {'withRef': False, 'withRefSeq': False}, 'c.345>G'], + ["VHL:c.345C>G", {"withRef": True, "withRefSeq": True}, "VHL:c.345C>G"], + ["VHL:c.345C>G", {"withRef": False, "withRefSeq": True}, "c.345C>G"], + ["VHL:c.345C>G", {"withRef": True, "withRefSeq": False}, "VHL:c.345>G"], + ["VHL:c.345C>G", {"withRef": False, "withRefSeq": False}, "c.345>G"], [ - '(LMNA,NTRK1):fusion(e.10,e.12)', - {'withRef': False, 'withRefSeq': False}, - 'fusion(e.10,e.12)', + "(LMNA,NTRK1):fusion(e.10,e.12)", + {"withRef": False, "withRefSeq": False}, + "fusion(e.10,e.12)", ], - ['ABCA12:p.N1671Ifs*4', {'withRef': False, 'withRefSeq': False}, 'p.1671Ifs*4'], - ['x:y.p22.33copyloss', {'withRef': False, 'withRefSeq': False}, 'y.p22.33copyloss'], + ["ABCA12:p.N1671Ifs*4", {"withRef": False, "withRefSeq": False}, "p.1671Ifs*4"], + ["x:y.p22.33copyloss", {"withRef": False, "withRefSeq": False}, "y.p22.33copyloss"], # TODO: ['MED12:p.(?34_?68)mut', {'withRef': False, 'withRefSeq': False}, 'p.(34_68)mut'], # TODO: ['FLT3:p.(?572_?630)_(?572_?630)ins', {'withRef': False, 'withRefSeq': False}, 'p.(572_630)_(572_630)ins'], ], ) def test_stringifyVariant_parsed(self, conn, hgvs_string, opt, stringifiedVariant): - opt['variant'] = conn.parse(hgvs_string) + opt["variant"] = conn.parse(hgvs_string) assert util.stringifyVariant(**opt) == stringifiedVariant # Based on the assumption that these variants are in the database. # createdAt date help avoiding errors if assumption tuns to be false @pytest.mark.parametrize( - 'rid,createdAt,stringifiedVariant', + "rid,createdAt,stringifiedVariant", [ - ['#157:0', 1565627324397, 'p.315I'], - ['#157:79', 1565627683602, 'p.776_777insVGC'], - ['#158:35317', 1652734056311, 'c.1>G'], + ["#157:0", 1565627324397, "p.315I"], + ["#157:79", 1565627683602, "p.776_777insVGC"], + ["#158:35317", 1652734056311, "c.1>G"], ], ) def test_stringifyVariant_positional(self, conn, rid, createdAt, stringifiedVariant): - opt = {'withRef': False, 'withRefSeq': False} + opt = {"withRef": False, "withRefSeq": False} variant = conn.get_record_by_id(rid) - if variant and variant.get('createdAt', None) == createdAt: + if variant and variant.get("createdAt", None) == createdAt: assert util.stringifyVariant(variant=variant, **opt) == stringifiedVariant diff --git a/tests/test_vocab.py b/tests/test_vocab.py index e96bc59..5e40e04 100644 --- a/tests/test_vocab.py +++ b/tests/test_vocab.py @@ -7,79 +7,79 @@ from graphkb import GraphKBConnection, genes, vocab -BASE_EXPRESSION = 'expression variant' -BASE_INCREASED_EXPRESSION = 'increased expression' -BASE_REDUCED_EXPRESSION = 'reduced expression' +BASE_EXPRESSION = "expression variant" +BASE_INCREASED_EXPRESSION = "increased expression" +BASE_REDUCED_EXPRESSION = "reduced expression" -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def conn(): conn = GraphKBConnection() - conn.login(os.environ['GRAPHKB_USER'], os.environ['GRAPHKB_PASS']) + conn.login(os.environ["GRAPHKB_USER"], os.environ["GRAPHKB_PASS"]) return conn def test_expression_vocabulary(conn): result = vocab.get_term_tree(conn, BASE_EXPRESSION) - names = [row['name'] for row in result] + names = [row["name"] for row in result] assert BASE_EXPRESSION in names - assert 'increased rna expression' in names + assert "increased rna expression" in names def test_indel_vocabulary(conn): - result = vocab.get_term_tree(conn, 'indel') + result = vocab.get_term_tree(conn, "indel") - names = {row['name'] for row in result} - assert 'indel' in names - assert 'copy variant' not in names - assert 'copy number variant' not in names + names = {row["name"] for row in result} + assert "indel" in names + assert "copy variant" not in names + assert "copy number variant" not in names def test_expression_up(conn): result = vocab.get_term_tree(conn, BASE_INCREASED_EXPRESSION) - names = [row['name'] for row in result] + names = [row["name"] for row in result] assert BASE_EXPRESSION in names assert BASE_INCREASED_EXPRESSION in names - assert 'increased rna expression' in names - assert 'reduced rna expression' not in names + assert "increased rna expression" in names + assert "reduced rna expression" not in names assert BASE_REDUCED_EXPRESSION not in names def test_expression_down(conn): result = vocab.get_term_tree(conn, BASE_REDUCED_EXPRESSION) - names = [row['name'] for row in result] + names = [row["name"] for row in result] assert BASE_EXPRESSION in names assert BASE_REDUCED_EXPRESSION in names assert BASE_INCREASED_EXPRESSION not in names - assert 'increased rna expression' not in names - assert 'reduced rna expression' in names + assert "increased rna expression" not in names + assert "reduced rna expression" in names class TestGetEquivalentTerms: def test_gain_excludes_amplification(self, conn): - result = vocab.get_equivalent_terms(conn, 'copy gain') - names = {row['name'] for row in result} - assert 'copy gain' in names - assert 'amplification' not in names + result = vocab.get_equivalent_terms(conn, "copy gain") + names = {row["name"] for row in result} + assert "copy gain" in names + assert "amplification" not in names def test_amplification_includes_gain(self, conn): - result = vocab.get_equivalent_terms(conn, 'amplification') - names = {row['name'] for row in result} - assert 'copy gain' in names - assert 'amplification' in names + result = vocab.get_equivalent_terms(conn, "amplification") + names = {row["name"] for row in result} + assert "copy gain" in names + assert "amplification" in names def test_oncogenic(conn): result = vocab.get_term_by_name(conn, genes.ONCOGENE) - assert result['name'] == genes.ONCOGENE + assert result["name"] == genes.ONCOGENE def test_get_terms_set(conn): - terms = vocab.get_terms_set(conn, ['copy variant']) + terms = vocab.get_terms_set(conn, ["copy variant"]) assert terms - more_terms = vocab.get_terms_set(conn, ['copy variant', 'expression variant']) + more_terms = vocab.get_terms_set(conn, ["copy variant", "expression variant"]) assert more_terms assert len(more_terms) > len(terms)