diff --git a/bin/load.js b/bin/load.js index 838c03df..166ef615 100644 --- a/bin/load.js +++ b/bin/load.js @@ -1,6 +1,3 @@ -const fs = require('fs'); -const path = require('path'); - const { runLoader } = require('../src'); const { createOptionsMenu, fileExists } = require('../src/cli'); @@ -36,7 +33,12 @@ const cosmicResistance = require('../src/cosmic/resistance'); const cosmicFusions = require('../src/cosmic/fusions'); const API_MODULES = { - asco, dgidb, docm, fdaApprovals, moa, oncotree, + asco, + dgidb, + docm, + fdaApprovals, + moa, + oncotree, }; const FILE_MODULES = { @@ -102,6 +104,11 @@ civicParser.add_argument('--trustedCurators', { help: 'CIViC User IDs of curators whose statements should be imported even if they have not yet been reviewed (evidence is submitted but not accepted)', nargs: '+', }); +civicParser.add_argument('--noUpdate', { + action: 'store_true', + default: false, + help: 'Will not check for updating content of existing GraphKB Statements', +}); const clinicaltrialsgovParser = subparsers.add_parser('clinicaltrialsgov'); clinicaltrialsgovParser.add_argument('--days', { @@ -132,14 +139,12 @@ let loaderFunction; if (input) { loaderFunction = ALL_MODULES[moduleName || subparser_name].uploadFile; } else { - debugger; loaderFunction = ALL_MODULES[moduleName || subparser_name].upload; } const loaderOptions = { ...options }; if (input) { - debugger; loaderOptions.filename = input; } diff --git a/package-lock.json b/package-lock.json index 07f31ab2..5212d6a5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@bcgsc-pori/graphkb-loader", - "version": "8.0.1", + "version": "8.0.2", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@bcgsc-pori/graphkb-loader", - "version": "8.0.1", + "version": "8.0.2", "license": "GPL-3", "dependencies": { "@bcgsc-pori/graphkb-parser": "^1.1.1", diff --git a/package.json b/package.json index 42db1765..b4b59389 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@bcgsc-pori/graphkb-loader", "main": "src/index.js", - "version": "8.0.1", + "version": "8.0.2", "repository": { "type": "git", "url": "https://github.com/bcgsc/pori_graphkb_loader.git" diff --git a/src/civic/disease.js b/src/civic/disease.js new file mode 100644 index 00000000..b1573e93 --- /dev/null +++ b/src/civic/disease.js @@ -0,0 +1,41 @@ +const { orderPreferredOntologyTerms } = require('../graphkb'); + +/** + * Given a CIViC EvidenceItem record with its disease property, + * returns the corresponding disease record from GraphKB + * + * @param {ApiConnection} conn graphkb API connector + * @param {object} param1 + * @param {object} param1.rawRecord the EvidenceItem from CIViC + * @returns {object} the disease record from GraphKB + */ +const getDisease = async (conn, { rawRecord }) => { + let disease; + + // Get corresponding GraphKB Disease by it's doid (disease ontology id) + if (rawRecord.disease) { + let diseaseQueryFilters = {}; + + if (rawRecord.disease.doid) { + diseaseQueryFilters = { + AND: [ + { sourceId: `doid:${rawRecord.disease.doid}` }, + { source: { filters: { name: 'disease ontology' }, target: 'Source' } }, + ], + }; + } else { + diseaseQueryFilters = { name: rawRecord.disease.name }; + } + + disease = await conn.getUniqueRecordBy({ + filters: diseaseQueryFilters, + sort: orderPreferredOntologyTerms, + target: 'Disease', + }); + } + return disease; +}; + +module.exports = { + getDisease, +}; diff --git a/src/civic/evidenceItem.js b/src/civic/evidenceItem.js new file mode 100644 index 00000000..f999f8db --- /dev/null +++ b/src/civic/evidenceItem.js @@ -0,0 +1,297 @@ +const fs = require('fs'); +const path = require('path'); + +const _ = require('lodash'); +const Ajv = require('ajv'); +const { error: { ErrorMixin } } = require('@bcgsc-pori/graphkb-parser'); + +const { checkSpec, request } = require('../util'); +const { logger } = require('../logging'); +const { civic: SOURCE_DEFN } = require('../sources'); +const { EvidenceItem: evidenceSpec } = require('./specs.json'); +const _entrezGene = require('../entrez/gene'); +const { processVariantRecord } = require('./variant'); +const { processMolecularProfile } = require('./profile'); +const { addOrFetchTherapy, resolveTherapies } = require('./therapy'); +const { rid } = require('../graphkb'); + + +class NotImplementedError extends ErrorMixin { } + +// Spec compiler +const ajv = new Ajv(); +const validateEvidenceSpec = ajv.compile(evidenceSpec); + +/** + * Requests evidence items from CIViC using their graphql API + * + * @param {string} url the query url + * @param {object} opt the query options + * @returns {object[]} an array of EvidenceItem records + */ +const requestEvidenceItems = async (url, opt) => { + const body = { ...opt }; + const allRecords = []; + let hasNextPage = true; + + while (hasNextPage) { + try { + const page = await request({ + body, + json: true, + method: 'POST', + uri: url, + }); + allRecords.push(...page.data.evidenceItems.nodes); + body.variables = { + ...body.variables, + after: page.data.evidenceItems.pageInfo.endCursor, + }; + hasNextPage = page.data.evidenceItems.pageInfo.hasNextPage; + } catch (err) { + logger.error(err); + throw (err); + } + } + return allRecords; +}; + +/** + * Fetch CIViC approved evidence entries + * as well as those submitted by trusted curators + * + * @param {string} url the url for the request + * @param {string[]} trustedCurators a list of curator IDs for submitted-only EvidenceItems + * @returns {object} an object with the validated records and the encountered errors + */ +const downloadEvidenceItems = async (url, trustedCurators) => { + const evidenceItems = []; + const query = fs.readFileSync(path.join(__dirname, 'evidenceItems.graphql')).toString(); + + // Get accepted evidenceItems + const accepted = await requestEvidenceItems(url, { + query, + variables: { + status: 'ACCEPTED', + }, + }); + logger.info(`${accepted.length} accepted entries from ${SOURCE_DEFN.name}`); + evidenceItems.push(...accepted); + + // Get submitted evidenceItems from trusted curators + for (const curator of Array.from(new Set(trustedCurators))) { + if (!Number.isNaN(curator)) { + const submittedByATrustedCurator = await requestEvidenceItems(url, { + query, + variables: { + status: 'SUBMITTED', + userId: parseInt(curator, 10), + }, + }); + evidenceItems.push(...submittedByATrustedCurator); + logger.info(`${submittedByATrustedCurator.length} submitted entries by trusted curator ${curator} from ${SOURCE_DEFN.name}`); + } + } + + logger.info(`${evidenceItems.length} total records from ${SOURCE_DEFN.name}`); + + // Validation + const validatedRecords = [], + errors = []; + + for (const record of evidenceItems) { + try { + checkSpec(validateEvidenceSpec, record); + } catch (err) { + errors.push({ error: err, errorMessage: err.toString(), record }); + logger.error(err); + continue; + } + validatedRecords.push(record); + } + + logger.info(`${validatedRecords.length}/${evidenceItems.length} validated records`); + return { errors, records: validatedRecords }; +}; + +/** + * Format one combination from a CIViC EvidenceItem into an object + * ready to be compared with a corresponding GraphKB statement + * + * @param {ApiConnection} conn the API connection object for GraphKB + * @param {object} param1 + * @param {object} param1.record the unparsed record from CIViC + * @param {object} param1.sourceRid the souce rid for CIViC in GraphKB + * @returns {object} the formatted content from one combination + */ +const processCombination = async (conn, { + record: rawRecord, + sourceRid, +}) => { + /* + PROCESSING EVIDENCEITEM DATA SPECIFIC TO THAT COMBINATION/STATEMENT + */ + + // THERAPY + // Get corresponding GraphKB Therapies + let therapy; + + if (rawRecord.therapies) { + try { + therapy = await addOrFetchTherapy( + conn, + sourceRid, + rawRecord.therapies, // therapiesRecords + (rawRecord.therapyInteractionType || '').toLowerCase(), // combinationType + ); + } catch (err) { + throw new Error(`failed to fetch therapy: ${JSON.stringify(rawRecord.therapies)}\nerr:${err}`); + } + } + + // VARIANTS + // Note: the combination can have more than 1 variant + // if the Molecular profile was using AND operators + const { variants: civicVariants } = rawRecord; + const variants = []; + + for (const variant of civicVariants) { + // Variant's Feature + const { feature: { featureInstance } } = variant; + + // TODO: Deal with __typename === 'Factor'. No actual case as April 22nd, 2024 + if (featureInstance.__typename !== 'Gene') { + throw new NotImplementedError( + 'unable to process variant\'s feature of type other than Gene (e.g. Factor)', + ); + } + + let feature; + + try { + [feature] = await _entrezGene.fetchAndLoadByIds(conn, [featureInstance.entrezId]); + } catch (err) { + logger.error(`failed to fetch variant's feature: ${featureInstance.entrezId}`); + throw err; + } + + // Variant + try { + const processedVariants = await processVariantRecord(conn, variant, feature); + logger.verbose(`converted variant name (${variant.name}) to variants (${processedVariants.map(v => v.displayName).join(', and ')})`); + variants.push(...processedVariants); + } catch (err) { + logger.error(`unable to process the variant (id=${rawRecord.variant.id}, name=${rawRecord.variant.name})`); + throw err; + } + } + + /* + FORMATTING CONTENT FOR GRAPHKB STATEMENT + */ + + const { content } = rawRecord; + + // SUBJECT + // Adding Disease as subject + if (rawRecord.evidenceType === 'DIAGNOSTIC' || rawRecord.evidenceType === 'PREDISPOSING') { + if (!content.disease) { + throw new Error('unable to create a diagnostic or predisposing statement without a corresponding disease'); + } + content.subject = content.disease; + } + + // Adding Therapy as subject + if (rawRecord.evidenceType === 'PREDICTIVE' && therapy) { + content.subject = rid(therapy); + } + + // Adding 'patient' Vocabulary as subject + if (rawRecord.evidenceType === 'PROGNOSTIC') { + try { + content.subject = rid( + // get the patient vocabulary object + await conn.getVocabularyTerm('patient'), + ); + } catch (err) { + logger.error('unable to fetch Vocabulary record for term patient'); + throw err; + } + } + + // Adding feature (reference1) or Variant (1st variant as the default) as subject. + if (rawRecord.evidenceType === 'FUNCTIONAL') { + content.subject = rid(variants[0].reference1); + } + if (rawRecord.evidenceType === 'ONCOGENIC') { + content.subject = variants.length === 1 + ? rid(variants[0]) + : rid(variants[0].reference1); + } + + // Checking for Subject + if (!content.subject) { + throw Error('unable to determine statement subject'); + } + + // CONDITIONS + // Adding variants as conditions + content.conditions = [...variants.map(v => rid(v))]; + + // Adding Disease as condition + if (content.disease) { + content.conditions.push(content.disease); + } + delete content.disease; // Removing unwanted properties no longer needed + + // Adding content's subject as condition if not already + if (content.subject && !content.conditions.includes(content.subject)) { + content.conditions.push(content.subject); + } + // Sorting conditions for downstream object comparison + content.conditions.sort(); + + return content; +}; + +/** + * Process an EvidenceItem from CIViC into an array of one or more combinations + * + * @param {object} evidenceItem the CIViC EvidenceItem + * @returns {object[]} an array of combinations + */ +const processEvidenceItem = async (evidenceItem) => { + let record = JSON.parse(JSON.stringify(evidenceItem)); // Deep copy + logger.debug(`processing EvidenceItem ${record.id}`); + + // Resolve therapy combinations if any + // Updates record.therapies and record.therapyInteractionType properties + record = resolveTherapies(record); + + // Molecular Profile (conditions w/ variants) + record.conditions = processMolecularProfile(record.molecularProfile).conditions; + + // PROCESSING EVIDENCEITEM INTO AN ARRAY OF COMBINATIONS + const combinations = []; + + for (const condition of record.conditions) { + for (const therapies of record.therapies) { + const content = JSON.parse(JSON.stringify(record.content)); // Deep copy + combinations.push({ + ..._.omit(record, ['conditions']), + content, + therapies, + variants: [...condition], + }); + } + } + + return combinations; +}; + +module.exports = { + downloadEvidenceItems, + processCombination, + processEvidenceItem, + requestEvidenceItems, +}; diff --git a/src/civic/evidenceLevel.js b/src/civic/evidenceLevel.js new file mode 100644 index 00000000..172be882 --- /dev/null +++ b/src/civic/evidenceLevel.js @@ -0,0 +1,59 @@ +/** + * 1-5 : https://docs.civicdb.org/en/latest/model/evidence/evidence_rating.html + * A-E : https://docs.civicdb.org/en/latest/model/evidence/level.html +*/ +const VOCAB = { + 1: 'Claim is not supported well by experimental evidence. Results are not reproducible, or have very small sample size. No follow-up is done to validate novel claims.', + 2: 'Evidence is not well supported by experimental data, and little follow-up data is available. Publication is from a journal with low academic impact. Experiments may lack proper controls, have small sample size, or are not statistically convincing.', + 3: 'Evidence is convincing, but not supported by a breadth of experiments. May be smaller scale projects, or novel results without many follow-up experiments. Discrepancies from expected results are explained and not concerning.', + 4: 'Strong, well supported evidence. Experiments are well controlled, and results are convincing. Any discrepancies from expected results are well-explained and not concerning.', + 5: 'Strong, well supported evidence from a lab or journal with respected academic standing. Experiments are well controlled, and results are clean and reproducible across multiple replicates. Evidence confirmed using independent methods. The study is statistically well powered.', + A: 'Proven/consensus association in human medicine.', + B: 'Clinical trial or other primary patient data supports association.', + C: 'Individual case reports from clinical journals.', + D: 'In vivo or in vitro models support association.', + E: 'Indirect evidence.', + url: 'https://docs.civicdb.org/en/latest/model/evidence.html', +}; + +const EVIDENCE_LEVEL_CACHE = {}; + +/** + * Fetch an evidence level, and add it if there is not an existing record + * + * @param {ApiConnection} conn graphkb API connector + * @param {object} param1 + * @param {object} param1.rawRecord an EvidenceItem record from CIViC + * @param {object} param1.source the CIViC source rid in GraphKB + * @returns {object} an EvidenceLevel recors from GraphKB + */ +const getEvidenceLevel = async (conn, { rawRecord, source, sourceDisplayName }) => { + // get the evidenceLevel + let level = `${rawRecord.evidenceLevel}${rawRecord.evidenceRating || ''}`.toLowerCase(); + + if (EVIDENCE_LEVEL_CACHE[level] === undefined) { + level = await conn.addRecord({ + content: { + description: `${VOCAB[rawRecord.evidenceLevel]} ${VOCAB[rawRecord.evidenceRating] || ''}`, + displayName: `${sourceDisplayName} ${level.toUpperCase()}`, + name: level, + source, + sourceId: level, + url: VOCAB.url, + }, + existsOk: true, + fetchConditions: { + AND: + [{ sourceId: level }, { name: level }, { source }], + }, + target: 'EvidenceLevel', + + }); + EVIDENCE_LEVEL_CACHE[level.sourceId] = level; + } else { + level = EVIDENCE_LEVEL_CACHE[level]; + } + return level; +}; + +module.exports = { getEvidenceLevel }; diff --git a/src/civic/index.js b/src/civic/index.js index 26151043..2f0aee38 100644 --- a/src/civic/index.js +++ b/src/civic/index.js @@ -1,735 +1,475 @@ /** * @module importer/civic */ -const _ = require('lodash'); -const Ajv = require('ajv'); const fs = require('fs'); -const path = require('path'); -const { error: { ErrorMixin } } = require('@bcgsc-pori/graphkb-parser'); - -const { checkSpec, request } = require('../util'); -const { - orderPreferredOntologyTerms, - rid, - shouldUpdate, -} = require('../graphkb'); +const { rid } = require('../graphkb'); const { logger } = require('../logging'); -const _pubmed = require('../entrez/pubmed'); -const _entrezGene = require('../entrez/gene'); -const { civic: SOURCE_DEFN, ncit: NCIT_SOURCE_DEFN } = require('../sources'); -const { processVariantRecord } = require('./variant'); +const { civic: SOURCE_DEFN } = require('../sources'); +const { getDisease } = require('./disease'); const { getRelevance } = require('./relevance'); -const { getPublication } = require('./publication'); -const { MolecularProfile } = require('./profile'); -const { EvidenceItem: evidenceSpec } = require('./specs.json'); - -class NotImplementedError extends ErrorMixin { } +const { getEvidenceLevel } = require('./evidenceLevel'); +const { getPublication, loadPubmedCache } = require('./publication'); +const { + downloadEvidenceItems, + processCombination, + processEvidenceItem, +} = require('./evidenceItem'); +const { + contentMatching, + createStatement, + deleteStatements, + needsUpdate, + updateStatement, +} = require('./statement'); const BASE_URL = 'https://civicdb.org/api/graphql'; -/** - * 1-5 : https://docs.civicdb.org/en/latest/model/evidence/evidence_rating.html - * A-E : https://docs.civicdb.org/en/latest/model/evidence/level.html - */ -const VOCAB = { - 1: 'Claim is not supported well by experimental evidence. Results are not reproducible, or have very small sample size. No follow-up is done to validate novel claims.', - 2: 'Evidence is not well supported by experimental data, and little follow-up data is available. Publication is from a journal with low academic impact. Experiments may lack proper controls, have small sample size, or are not statistically convincing.', - 3: 'Evidence is convincing, but not supported by a breadth of experiments. May be smaller scale projects, or novel results without many follow-up experiments. Discrepancies from expected results are explained and not concerning.', - 4: 'Strong, well supported evidence. Experiments are well controlled, and results are convincing. Any discrepancies from expected results are well-explained and not concerning.', - 5: 'Strong, well supported evidence from a lab or journal with respected academic standing. Experiments are well controlled, and results are clean and reproducible across multiple replicates. Evidence confirmed using independent methods. The study is statistically well powered.', - A: 'Proven/consensus association in human medicine.', - B: 'Clinical trial or other primary patient data supports association.', - C: 'Individual case reports from clinical journals.', - D: 'In vivo or in vitro models support association.', - E: 'Indirect evidence.', - url: 'https://docs.civicdb.org/en/latest/model/evidence.html', -}; - -const EVIDENCE_LEVEL_CACHE = {}; // avoid unecessary requests by caching the evidence levels - -// Spec compiler -const ajv = new Ajv(); -const validateEvidenceSpec = ajv.compile(evidenceSpec); - /** - * Requests evidence items from CIViC using their graphql API - */ -const requestEvidenceItems = async (url, opt) => { - const allRecords = []; - let hasNextPage = true; - - while (hasNextPage) { - try { - const page = await request({ - body: { ...opt }, - json: true, - method: 'POST', - uri: url, - }); - allRecords.push(...page.data.evidenceItems.nodes); - opt.variables = { ...opt.variables, after: page.data.evidenceItems.pageInfo.endCursor }; - hasNextPage = page.data.evidenceItems.pageInfo.hasNextPage; - } catch (err) { - logger.error(err); - throw (err); - } - } - return allRecords; -}; - - -/** - * Given some therapy name, find the therapy that is equivalent by name in GraphKB + * Increment counter on GraphKB Statement CRUD operations + * + * @param {object} initial the counter + * @param {object} updates the increment to apply + * @returns {object} the incremented counter */ -const getTherapy = async (conn, therapyRecord) => { - let originalError; - - // fetch from NCIt first if possible, or pubchem - // then use the name as a fallback - const name = therapyRecord.name.toLowerCase().trim(); - - if (therapyRecord.ncitId) { - try { - const therapy = await conn.getUniqueRecordBy({ - filters: [ - { source: { filters: { name: NCIT_SOURCE_DEFN.name }, target: 'Source' } }, - { sourceId: therapyRecord.ncitId }, - { name: therapyRecord.name }, - ], - sort: orderPreferredOntologyTerms, - target: 'Therapy', - }); - return therapy; - } catch (err) { - logger.error(`had NCIt therapy mapping (${therapyRecord.ncitId}) named (${therapyRecord.name}) but failed to fetch from graphkb: ${err}`); - throw err; - } +const incrementCounts = (initial, updates) => { + if (!initial) { + return updates; } - try { - const therapy = await conn.getTherapy(name); - return therapy; - } catch (err) { - originalError = err; - } + // deep copy + const updated = JSON.parse(JSON.stringify(initial)); - try { - const match = /^\s*(\S+)\s*\([^)]+\)$/.exec(name); - - if (match) { - return await conn.getTherapy(match[1]); + for (const level1 of Object.keys(updated)) { + for (const level2 of Object.keys(updated[level1])) { + updated[level1][level2] += updates[level1][level2]; } - } catch (err) { } - logger.error(originalError); - throw originalError; -}; - - -/** - * Add or fetch a therapy combination if there is not an existing record - * Link the therapy combination to its individual elements - */ -const addOrFetchTherapy = async (conn, source, therapiesRecords, combinationType) => { - if (therapiesRecords.length <= 1) { - if (therapiesRecords[0] === null) { - return null; - } - return getTherapy(conn, therapiesRecords[0]); } - const therapies = await Promise.all(therapiesRecords.map(async therapy => getTherapy(conn, therapy))); - const sourceId = therapies.map(e => e.sourceId).sort().join(' + '); - const name = therapies.map(e => e.name).sort().join(' + '); - const combinedTherapy = await conn.addRecord({ - content: { - combinationType, name, source: rid(source), sourceId, - }, - existsOk: true, - target: 'Therapy', - }); - for (const therapy of therapies) { - await conn.addRecord({ - content: { - in: rid(combinedTherapy), out: rid(therapy), source: rid(source), - }, - existsOk: true, - target: 'ElementOf', - }); - } - return combinedTherapy; + return updated; }; - /** - * Add or fetch an evidence level if there is not an existing record + * Access the CIVic API, parse content, transform and load into GraphKB + * + * @param {object} param0 + * @param {ApiConnection} param0.conn the api connection object for GraphKB + * @param {string} param0.errorLogPrefix prefix to the generated error json file + * @param {number} param0.maxRecords limit of EvidenceItem records to be processed and upload + * @param {?boolean} param0.noUpdate for avoiding deletion/update of existing GraphKB Statements + * @param {string[]} param0.trustedCurators a list of curator IDs for submitted-only EvidenceItems + * @param {?string} param0.url url to use as the base for accessing the civic ApiConnection */ -const getEvidenceLevel = async ({ - conn, rawRecord, sources, +const upload = async ({ + conn, + errorLogPrefix, + maxRecords, + noUpdate = false, + trustedCurators, + url = BASE_URL, }) => { - // get the evidenceLevel - let level = `${rawRecord.evidenceLevel}${rawRecord.evidenceRating || ''}`.toLowerCase(); - - if (EVIDENCE_LEVEL_CACHE[level] === undefined) { - level = await conn.addRecord({ - content: { - description: `${VOCAB[rawRecord.evidenceLevel]} ${VOCAB[rawRecord.evidenceRating] || ''}`, - displayName: `${SOURCE_DEFN.displayName} ${level.toUpperCase()}`, - name: level, - source: rid(sources.civic), - sourceId: level, - url: VOCAB.url, - }, - existsOk: true, - fetchConditions: { - AND: - [{ sourceId: level }, { name: level }, { source: rid(sources.civic) }], - }, - target: 'EvidenceLevel', + const countsEI = { + error: 0, + partialSuccess: 0, + skip: 0, + success: 0, + }; + let countsST; - }); - EVIDENCE_LEVEL_CACHE[level.sourceId] = level; - } else { - level = EVIDENCE_LEVEL_CACHE[level]; - } - return level; -}; + // Adding CIViC as source if not already in GraphKB + const source = await conn.addSource(SOURCE_DEFN); + const sourceRid = rid(source); + /* + 1. DOWNLOAD & PREPROCESSING + */ -/** - * Transform a CIViC evidence record into a GraphKB statement - * - * @param {object} opt - * @param {ApiConnection} opt.conn the API connection object for GraphKB - * @param {object} opt.rawRecord the unparsed record from CIViC - * @param {object} opt.sources the sources by name - * @param {boolean} opt.oneToOne civic statements to graphkb statements is a 1 to 1 mapping - * @param {object} opt.variantsCache used to avoid repeat processing of civic variants. stores the graphkb variant(s) if success or the error if not - * @param - */ -const processEvidenceRecord = async (opt) => { + // GETTING CIVIC EVIDENCEITEMS FROM CIVIC API + // Evidences accepted, or submitted from a trusted curator + logger.info(`loading evidenceItems from ${url}`); const { - conn, rawRecord, sources, variantsCache, oneToOne = false, - } = opt; - - // Relevance & EvidenceLevel - const [level, relevance] = await Promise.all([ - getEvidenceLevel(opt), - getRelevance(opt), - ]); - - // Variant's Feature - let feature; - const civicFeature = rawRecord.variant.feature.featureInstance; - - if (civicFeature.__typename === 'Gene') { - [feature] = await _entrezGene.fetchAndLoadByIds(conn, [civicFeature.entrezId]); - } else if (civicFeature.__typename === 'Factor') { - // TODO: Deal with __typename === 'Factor' - // No actual case as April 22nd, 2024 - throw new NotImplementedError( - 'unable to process variant\'s feature of type Factor', - ); - } + errors: downloadEvidenceItemsErr, + records: evidenceItems, + } = await downloadEvidenceItems(url, trustedCurators); + // Validation errors + const validationErrorList = []; - // Variant - let variants; + if (downloadEvidenceItemsErr.length > 0) { + countsEI.error += downloadEvidenceItemsErr.length; + validationErrorList.push(...downloadEvidenceItemsErr); + } - if (variantsCache.records[rawRecord.variant.id]) { - variants = variantsCache.records[rawRecord.variant.id]; - } else if (variantsCache.errors[rawRecord.variant.id]) { - throw variantsCache.errors[rawRecord.variant.id]; - } else { - try { - variants = await processVariantRecord(conn, rawRecord.variant, feature); - variantsCache.records[rawRecord.variant.id] = variants; - logger.verbose(`converted variant name (${rawRecord.variant.name}) to variants (${variants.map(v => v.displayName).join(', and ')})`); - } catch (err) { - variantsCache.errors[rawRecord.variant.id] = err; - logger.error(`evidence (${rawRecord.id}) Unable to process the variant (id=${rawRecord.variant.id}, name=${rawRecord.variant.name}): ${err}`); - throw err; + // GETTING CIVIC STATEMENTS FROM GRAPHKB API + // Note: One or more GKB Statement can come from the same CIVIC id (sourceId) + logger.info('loading related statements from GraphKB'); + const statements = await conn.getRecords({ + filters: { source: sourceRid }, + returnProperties: [ + '@rid', + 'conditions', + 'description', + 'evidence', + 'evidenceLevel', + 'relevance', + 'reviewStatus', + 'source', + 'sourceId', + 'subject', + ], + target: 'Statement', + }); + const sourceIdsFromGKB = new Set(statements.map(r => r.sourceId)); + logger.info(`${sourceIdsFromGKB.size} distinct ${SOURCE_DEFN.name} sourceId in GraphKB statements`); + logger.info(`${statements.length} total statements previously added to GraphKB from ${SOURCE_DEFN.name}`); + + // REFACTORING GRAPHKB STATEMENTS INTO STATEMENTSBYSOURCEID + // where each sourceId is a key associated with an array + // of one or more GKB Statement records + const statementsBySourceId = {}; + + for (const record of statements) { + if (!statementsBySourceId[record.sourceId]) { + statementsBySourceId[record.sourceId] = []; } + // Sorting conditions for downstream object comparison + record.conditions.sort(); + statementsBySourceId[record.sourceId].push(record); } - // get the disease by doid - let disease; - - // find the disease if it is not null - if (rawRecord.disease) { - let diseaseQueryFilters = {}; + // REFACTORING CIVIC EVIDENCEITEMS INTO EVIDENCEITEMSBYID + // where each id is a key associated with one CIViC EvidenceItem as value + logger.info(`Pre-pocessing ${evidenceItems.length} records`); + const evidenceItemsById = {}; - if (rawRecord.disease.doid) { - diseaseQueryFilters = { - AND: [ - { sourceId: `doid:${rawRecord.disease.doid}` }, - { source: { filters: { name: 'disease ontology' }, target: 'Source' } }, - ], - }; - } else { - diseaseQueryFilters = { name: rawRecord.disease.name }; + // Performing some checks. Skipping some records if needed + // eslint-disable-next-line guard-for-in + for (const i in evidenceItems) { + // Check if max records limit has been reached + if (maxRecords && Object.keys(evidenceItemsById).length >= maxRecords) { + logger.warn(`Not loading all content due to max records limit (${maxRecords})`); + countsEI.skip += (evidenceItems.length - i); + break; } - - disease = await conn.getUniqueRecordBy({ - filters: diseaseQueryFilters, - sort: orderPreferredOntologyTerms, - target: 'Disease', - }); + // Check if record id is unique + if (evidenceItemsById[evidenceItems[i].id]) { + logger.error(`Multiple Evidence Items with the same id: ${evidenceItems[i].id}. Violates assumptions. Only the 1st one was kept.`); + countsEI.skip++; + continue; + } + // Adding EvidenceItem to object for upload + evidenceItemsById[evidenceItems[i].id] = evidenceItems[i]; } - // get the therapy/therapies by name - let therapy; + const noRecords = Object.keys(evidenceItemsById).length; + logger.info(`${noRecords}/${evidenceItems.length} Evidence Items to process`); + + /* + 2. PROCESSING EACH CIVIC EVIDENCEITEM INTO ONE OR MORE GKB STATEMENTS + */ + + // PubMed caching + logger.info('Caching Pubmed publication'); + await loadPubmedCache(conn); + + // Keeping track of EvidenceItem sourceIds who raised errors during processing + const errorSourceIds = { + disease: new Map(), + evidence: new Map(), + evidenceLevel: new Map(), + individualCombinationProcessing: new Map(), + processingIntoCombinations: new Map(), + relevance: new Map(), + }; + const casesToReview = new Map(); + + logger.info(`\n\n${'#'.repeat(80)}\n## PROCESSING RECORDS\n${'#'.repeat(80)}\n`); + let recordNumber = 0; + + // MAIN LOOP + // Looping through Evidence Items + for (const [id, evidenceItem] of Object.entries(evidenceItemsById)) { + /* PROCESSING EVIDENCEITEMS */ + + recordNumber++; + logger.info(); + logger.info(`***** ${recordNumber}/${noRecords} : processing id ${id} *****`); + + const numberOfStatements = statementsBySourceId[id] + ? statementsBySourceId[id].length + : 0; + logger.info(`${numberOfStatements} related statement(s)`); + + // Base object (properties order matters) + // Common content will be deep copied downstream for each combination + evidenceItem.content = { + conditions: [], + description: evidenceItem.description || '', + evidence: [], + evidenceLevel: [], + relevance: undefined, + reviewStatus: (evidenceItem.status === 'ACCEPTED' + ? 'not required' + : 'pending' + ), + source: sourceRid, + sourceId: id, + subject: undefined, + }; + + // PROCESSING DATA COMMON TO ALL COMBINATIONS + + // Removing extra spaces in description. Needed before content comparison + evidenceItem.content.description = evidenceItem.content.description.replace(/\s+/g, ' ').trim(); + + // Get evidence (publication) rid + try { + evidenceItem.content.evidence.push(rid( + await getPublication(conn, evidenceItem), + )); + } catch (err) { + logger.error(err); + countsEI.error++; + errorSourceIds.evidence.set(id, err); + continue; + } - if (rawRecord.therapies) { + // Get evidenceLevel rid try { - therapy = await addOrFetchTherapy( - conn, - rid(sources.civic), - rawRecord.therapies, - (rawRecord.therapyInteractionType || '').toLowerCase(), - ); + evidenceItem.content.evidenceLevel.push(rid( + await getEvidenceLevel(conn, { + rawRecord: evidenceItem, + source: sourceRid, + sourceDisplayName: SOURCE_DEFN.displayName, + }), + )); } catch (err) { logger.error(err); - logger.error(`failed to fetch therapy: ${JSON.stringify(rawRecord.therapies)}`); - throw err; + countsEI.error++; + errorSourceIds.evidenceLevel.set(id, err); + continue; } - } - const publication = await getPublication(conn, rawRecord); - - // common content - const content = { - conditions: [...variants.map(v => rid(v))], - description: rawRecord.description, - evidence: [rid(publication)], - evidenceLevel: [rid(level)], - relevance: rid(relevance), - reviewStatus: (rawRecord.status === 'ACCEPTED' - ? 'not required' - : 'pending' - ), - source: rid(sources.civic), - sourceId: rawRecord.id, - }; + // Get relevance rid + try { + evidenceItem.content.relevance = rid( + await getRelevance(conn, { rawRecord: evidenceItem }), + ); + } catch (err) { + logger.error(err); + countsEI.error++; + errorSourceIds.relevance.set(id, err); + continue; + } - // create the statement and connecting edges - if (rawRecord.evidenceType === 'DIAGNOSTIC' || rawRecord.evidenceType === 'PREDISPOSING') { - if (!disease) { - throw new Error('Unable to create a diagnostic or predisposing statement without a corresponding disease'); + // Get disease rid + try { + // Will be removed downstream after being used as content's subject and/or condition + evidenceItem.content.disease = rid( + await getDisease(conn, { rawRecord: evidenceItem }), + true, // nullOk=true since some EvidenceItems aren't related to any specific disease + ); + } catch (err) { + logger.error(err); + countsEI.error++; + errorSourceIds.disease.set(id, err); + continue; } - content.subject = rid(disease); - } else if (disease) { - content.conditions.push(rid(disease)); - } - if (rawRecord.evidenceType === 'PREDICTIVE' && therapy) { - content.subject = rid(therapy); - } if (rawRecord.evidenceType === 'PROGNOSTIC') { - // get the patient vocabulary object - content.subject = rid(await conn.getVocabularyTerm('patient')); - } if (rawRecord.evidenceType === 'FUNCTIONAL') { - content.subject = rid(feature); - } if (rawRecord.evidenceType === 'ONCOGENIC') { - content.subject = variants.length === 1 - ? rid(variants[0]) - : rid(feature); - } + // PROCESSING INDIVIDUAL EVIDENCEITEM INTO AN ARRAY OF COMBINATIONS + // (One combination per expected GraphKB statement) + const combinations = []; - if (content.subject && !content.conditions.includes(content.subject)) { - content.conditions.push(content.subject); - } + try { + combinations.push(...await processEvidenceItem(evidenceItem)); + } catch (err) { + logger.error(err); + countsEI.error++; + errorSourceIds.processingIntoCombinations.set(id, err); + continue; + } + logger.info(`${combinations.length} combination(s)`); - if (!content.subject) { - throw Error(`unable to determine statement subject for evidence (${content.sourceId}) record`); - } + // PROCESSING INDIVIDUAL COMBINATION + // Formatting each combination's content for GraphKB statement requirements + const contents = []; + let processCombinationErrors = 0; - const fetchConditions = [ - { sourceId: content.sourceId }, - { source: content.source }, - { evidence: content.evidence }, // civic evidence items are per publication - ]; - - if (!oneToOne) { - fetchConditions.push(...[ - { relevance: content.relevance }, - { subject: content.subject }, - { conditions: content.conditions }, - ]); - } - - let original; - - if (oneToOne) { - // get previous iteration - const originals = await conn.getRecords({ - filters: { - AND: [ - { source: rid(sources.civic) }, - { sourceId: rawRecord.id }, - ], - }, - target: 'Statement', - }); + for (const combination of combinations) { + try { + contents.push( + await processCombination(conn, { + record: combination, + sourceRid, + }), + ); + } catch (err) { + logger.error(err); + processCombinationErrors++; - if (originals.length > 1) { - throw Error(`Supposed to be 1to1 mapping between graphKB and civic but found multiple records with source ID (${rawRecord.id})`); - } - if (originals.length === 1) { - [original] = originals; - - const excludeTerms = [ - '@rid', - '@version', - 'comment', - 'createdAt', - 'createdBy', - 'reviews', - 'updatedAt', - 'updatedBy', - ]; - - if (!shouldUpdate('Statement', original, content, excludeTerms)) { - return original; + if (!errorSourceIds.individualCombinationProcessing.get(id)) { + errorSourceIds.individualCombinationProcessing.set(id, []); + } + const v = errorSourceIds.individualCombinationProcessing.get(id); + errorSourceIds.individualCombinationProcessing.set(id, [...v, err]); } } - } - if (original) { - // update the existing record - return conn.updateRecord('Statement', rid(original), content); - } + const successRatio = `${combinations.length - processCombinationErrors}/${combinations.length}`; + const processCombinationsMsg = `Processed ${successRatio} combination(s)`; - // create a new record - return conn.addRecord({ - content, - existsOk: true, - fetchConditions: { - AND: fetchConditions, - }, - target: 'Statement', - upsert: true, - upsertCheckExclude: [ - 'comment', - 'displayNameTemplate', - 'reviews', - ], - }); -}; - - -/** - * Get a list of CIViC Evidence Items which have since been deleted. - * Returns the list of evidence item IDs to be purged from GraphKB - * - * @param {string} url endpoint for the CIViC API - */ -const fetchDeletedEvidenceItems = async (url) => { - const ids = new Set(); - - // Get rejected evidenceItems - logger.info(`loading rejected evidenceItems from ${url}`); - const rejected = await requestEvidenceItems(url, { - query: `query evidenceItems($after: String, $status: EvidenceStatusFilter) { - evidenceItems(after: $after, status: $status) { - nodes {id} - pageCount - pageInfo {endCursor, hasNextPage} - totalCount - } - }`, - variables: { - status: 'REJECTED', - }, - }); - rejected.forEach(node => ids.add(node.id)); - logger.info(`fetched ${ids.size} rejected entries from CIViC`); - return ids; -}; - - -/** - * Fetch civic approved evidence entries as well as those submitted by trusted curators - * - * @param {string} url the endpoint for the request - * @param {string[]} trustedCurators a list of curator IDs to also fetch submitted only evidence items for - */ -const downloadEvidenceRecords = async (url, trustedCurators) => { - const records = []; - const errorList = []; - const counts = { - error: 0, exists: 0, skip: 0, success: 0, - }; - - const evidenceItems = []; - const query = fs.readFileSync(path.join(__dirname, 'evidenceItems.graphql')).toString(); - - // Get accepted evidenceItems - logger.info(`loading accepted evidenceItems from ${url}`); - const accepted = await requestEvidenceItems(url, { - query, - variables: { - status: 'ACCEPTED', - }, - }); - logger.info(`fetched ${accepted.length} accepted entries from CIViC`); - evidenceItems.push(...accepted); - - // Get submitted evidenceItems from trusted curators - for (const curator of Array.from(new Set(trustedCurators))) { - if (!Number.isNaN(curator)) { - logger.info(`loading submitted evidenceItems by trusted curator ${curator} from ${url}`); - const submittedByATrustedCurator = await requestEvidenceItems(url, { - query, - variables: { - status: 'SUBMITTED', - userId: parseInt(curator, 10), - }, - }); - evidenceItems.push(...submittedByATrustedCurator); + // If at least some combinations succeeds, then it's a success + if (processCombinationErrors === 0) { + countsEI.success++; + logger.info(processCombinationsMsg); + } else if (processCombinationErrors < combinations.length) { + countsEI.partialSuccess++; + logger.warn(processCombinationsMsg); + } else { + countsEI.error++; + logger.error(processCombinationsMsg); } - } - const submittedCount = evidenceItems.length - accepted.length; - logger.info(`loaded ${submittedCount} unaccepted entries by trusted submitters from CIViC`); - // Validation - for (const record of evidenceItems) { - try { - checkSpec(validateEvidenceSpec, record); - } catch (err) { - errorList.push({ error: err, errorMessage: err.toString(), record }); - logger.error(err); - counts.error++; - continue; - } - records.push(record); - } - logger.info(`${records.length}/${evidenceItems.length} evidenceItem records successfully validated with the specs`); - return { counts, errorList, records }; -}; + /* MATCHING EVIDENCEITEMS WITH STATEMENTS */ -/** - * Access the CIVic API, parse content, transform and load into GraphKB - * - * @param {object} opt options - * @param {ApiConnection} opt.conn the api connection object for GraphKB - * @param {string} [opt.url] url to use as the base for accessing the civic ApiConnection - * @param {string[]} opt.trustedCurators a list of curator IDs to also fetch submitted only evidence items for - */ -const upload = async ({ - conn, errorLogPrefix, trustedCurators, ignoreCache = false, maxRecords, url = BASE_URL, -}) => { - const source = await conn.addSource(SOURCE_DEFN); + // Content matching between CIViC and GraphKB records + // so we know which CRUD operation to perform on each statement + const { toCreate, toDelete, toUpdate } = contentMatching({ + allFromCivic: contents, + allFromGkb: statementsBySourceId[id] || [], + }); - // Get list of all previous statements from CIVIC in GraphKB - let previouslyEntered = await conn.getRecords({ - filters: { source: rid(source) }, - returnProperties: ['sourceId'], - target: 'Statement', - }); - previouslyEntered = new Set(previouslyEntered.map(r => r.sourceId)); - logger.info(`Found ${previouslyEntered.size} records previously added from ${SOURCE_DEFN.name}`); - // Get list of all Pubmed publication reccords from GraphKB - logger.info('caching publication records'); - _pubmed.preLoadCache(conn); - - // Get evidence records from CIVIC (Accepted, or Submitted from a trusted curator) - const { counts, errorList, records } = await downloadEvidenceRecords(url, trustedCurators); - // Get rejected evidence records ids from CIVIC - const purgeableEvidenceItems = await fetchDeletedEvidenceItems(url); - - logger.info(`Processing ${records.length} records`); - // keep track of errors and already processed variants by their CIViC ID to avoid repeat logging - const variantsCache = { - errors: {}, - records: {}, - }; + /* CREATE/UPDATE/DELETE STATEMENTS */ + + const loaclCountsST = { + create: { err: 0, success: 0 }, + delete: { err: 0, success: 0 }, + noUpdateNeeded: { success: 0 }, + update: { err: 0, success: 0 }, + }; + + // UPDATE + if (!noUpdate && toUpdate.length > 0) { + for (let i = 0; i < toUpdate.length; i++) { + const { fromCivic, fromGkb } = toUpdate[i]; + + // Check if an update is needed to avoid unnecessary API calls + if (needsUpdate({ fromCivic, fromGkb })) { + const updatedCount = await updateStatement(conn, { fromCivic, fromGkb }); + loaclCountsST.update.err += updatedCount.err; + loaclCountsST.update.success += updatedCount.success; + } else { + loaclCountsST.noUpdateNeeded.success++; + } + } + } - // Refactor records into recordsById - const recordsById = {}; + // DELETE + if (!noUpdate && toDelete.length > 0) { + const rids = toDelete.map(el => el['@rid']); - for (const record of records) { - // Check if max records limit has been reached - if (maxRecords && Object.keys(recordsById).length >= maxRecords) { - logger.warn(`not loading all content due to max records limit (${maxRecords})`); - break; + if (processCombinationErrors > 0) { + // Do not delete any statements if some combinations have processing errors + logger.info(`${toDelete.length} unmatched statement(s) to be reviewed for deletion`); + casesToReview.set(id, rids); + } else { + loaclCountsST.delete = await deleteStatements(conn, { rids }); + } } - // Check if record id is unique - if (recordsById[record.id]) { - logger.error(`Multiple evidenceItems with the same id: ${record.id}. Violates assumptions. Only the 1st one was kept.`); - counts.skip++; - continue; + // CREATE + if (toCreate.length > 0) { + for (let i = 0; i < toCreate.length; i++) { + const createdCount = await createStatement(conn, { fromCivic: toCreate[i] }); + loaclCountsST.create.err += createdCount.err; + loaclCountsST.create.success += createdCount.success; + } } - // Introducing Molecular Profiles with CIViC GraphQL API v2.2.0 - // [EvidenceItem]--(many-to-one)--[MolecularProfile]--(many-to-many)--[Variant] - if (!record.molecularProfile) { - logger.error(`Evidence Item without Molecular Profile. Violates assumptions: ${record.id}`); - counts.skip++; - continue; - } - if (!record.molecularProfile.variants || record.molecularProfile.variants.length === 0) { - logger.error(`Molecular Profile without Variants. Violates assumptions: ${record.molecularProfile.id}`); - counts.skip++; - continue; + // logging + for (const level of Object.keys(loaclCountsST)) { + if (loaclCountsST[level].err > 0 || loaclCountsST[level].success > 0) { + logger.info(`${level}: ${JSON.stringify(loaclCountsST[level])}`); + } } + countsST = incrementCounts(countsST, loaclCountsST); - // Adding EvidenceItem to object for upload - recordsById[record.id] = record; + // END OF MAIN LOOP } + logger.info(`\n\n${'#'.repeat(80)}\n## END OF RECORD PROCESSING\n${'#'.repeat(80)}\n`); - // Main loop on recordsById - for (const [sourceId, record] of Object.entries(recordsById)) { - if (previouslyEntered.has(sourceId) && !ignoreCache) { - counts.exists++; - continue; - } - if (purgeableEvidenceItems.has(sourceId)) { - // this should never happen. If it does we have made an invalid assumption about how civic uses IDs. - throw new Error(`Record ID is both deleted and to-be loaded. Violates assumptions: ${sourceId}`); - } - const preupload = new Set((await conn.getRecords({ - filters: [ - { source: rid(source) }, { sourceId }, - ], - target: 'Statement', - })).map(rid)); - - // Resolve combinations of therapies - // Splits civic evidence items therapies into separate evidence items based on their combination type. - if (record.therapies === null || record.therapies.length === 0) { - record.therapies = [null]; - } else if ( - record.therapyInteractionType === 'COMBINATION' - || record.therapyInteractionType === 'SEQUENTIAL' - ) { - record.therapies = [record.therapies]; - } else if (record.therapyInteractionType === 'SUBSTITUTES' || record.therapies.length < 2) { - record.therapies = record.therapies.map(therapy => [therapy]); - record.therapyInteractionType = null; - } else { - logger.error(`(evidence: ${record.id}) unsupported therapy interaction type (${record.therapyInteractionType}) for a multiple therapy (${record.therapies.length}) statement`); - counts.skip++; - continue; - } + // Logging EvidenceItem processing counts + logger.info(); + logger.info('***** CIViC EvidenceItem records processing report: *****'); + logger.info(JSON.stringify(countsEI)); - // Process Molecular Profiles expression into an array of conditions - // Each condition is itself an array of variants, one array for each expected GraphKB Statement from this CIViC Evidence Item - const Mp = MolecularProfile(record.molecularProfile); + // Logging detailed EvidenceItem processing counts + logger.info('Processing errors report:'); - try { - record.conditions = Mp.process().conditions; - } catch (err) { - logger.error(`evidence (${record.id}) ${err}`); - counts.skip++; - continue; - } + for (const [key, value] of Object.entries(errorSourceIds)) { + logger.info(`${key}: ${value.size}`); + // Also formatting Maps into objects for saving to file downstream + errorSourceIds[key] = Object.fromEntries(errorSourceIds[key]); + } - const postupload = []; - - // Upload all GraphKB statements for this CIViC Evidence Item - for (const condition of record.conditions) { - const oneToOne = (condition.length * record.therapies.length) === 1 && preupload.size === 1; - - for (const variant of condition) { - for (const therapies of record.therapies) { - try { - logger.debug(`processing ${record.id}`); - const result = await processEvidenceRecord({ - conn, - oneToOne, - rawRecord: { ..._.omit(record, ['therapies', 'variants']), therapies, variant }, - sources: { civic: source }, - variantsCache, - }); - postupload.push(rid(result)); - counts.success += 1; - } catch (err) { - if ( - err.toString().includes('is not a function') - || err.toString().includes('of undefined') - ) { - console.error(err); - } - if (err instanceof NotImplementedError) { - // accepted evidence that we do not support loading. Should delete as it may have changed from something we did support - purgeableEvidenceItems.add(sourceId); - } - errorList.push({ error: err, errorMessage: err.toString(), record }); - logger.error(`evidence (${record.id}) ${err}`); - counts.error += 1; - } - } - } - } - // compare statments before/after upload to determine if any records should be soft-deleted - postupload.forEach((id) => { - preupload.delete(id); - }); + // DELETING UNWANTED GRAPHKB STATEMENTS + // sourceIds no longer in CIViC (not accepted/submitted by trustedCurators) but still in GraphKB + const allIdsFromCivic = new Set(evidenceItems.map(r => r.id.toString())); + const toDelete = new Set([...sourceIdsFromGKB].filter(x => !allIdsFromCivic.has(x))); + logger.info(); + logger.info('***** Deprecated items *****'); + logger.warn(`${toDelete.size} deprecated ${SOURCE_DEFN.name} Evidence Items still in GraphKB Statement`); - if (preupload.size && purgeableEvidenceItems.has(sourceId)) { - logger.warn(` - Removing ${preupload.size} CIViC Entries (EID:${sourceId}) of unsupported format - `); + if (toDelete.size > 0) { + logger.info(`sourceIds: ${Array.from(toDelete)}`); + } - try { - await Promise.all( - Array.from(preupload).map( - async outdatedId => conn.deleteRecord('Statement', outdatedId), - ), - ); - } catch (err) { - logger.error(err); - } - } else if (preupload.size) { - if (postupload.length) { - logger.warn(`deleting ${preupload.size} outdated statement records (${Array.from(preupload).join(' ')}) has new/retained statements (${postupload.join(' ')})`); - - try { - await Promise.all( - Array.from(preupload).map( - async outdatedId => conn.deleteRecord('Statement', outdatedId), - ), - ); - } catch (err) { - logger.error(err); - } - } else { - logger.error(`NOT deleting ${preupload.size} outdated statement records (${Array.from(preupload).join(' ')}) because failed to create replacements`); - } + // GraphKB Statements Soft-deletion + if (!noUpdate && toDelete.size > 0) { + const deletedCount = await deleteStatements(conn, { + source: sourceRid, + sourceIds: Array.from(toDelete), + }); + const attempts = deletedCount.success + deletedCount.err; + logger.info(`${deletedCount.success}/${attempts} soft-deleted statements`); + + if (countsST) { + countsST.delete.err += deletedCount.err; + countsST.delete.success += deletedCount.success; + } else { + countsST = { delete: { err: deletedCount.err, success: deletedCount.success } }; } } - // purge any remaining entries that are in GraphKB but have since been rejected/deleted by CIViC - const toDelete = await conn.getRecords({ - filters: { - AND: [ - { sourceId: Array.from(purgeableEvidenceItems) }, - { source: rid(source) }, - ], - }, - target: 'Statement', - }); + // Logging processing error cases to be reviewed, + // so a reviewer can decide if corresponding statements need to be deleted or not + logger.info(); + logger.info('***** Cases to be reviewed for deletion *****'); + logger.warn(`${casesToReview.size} Evidence Item(s) with processing errors leading to unmatched Statement(s)`); + casesToReview.forEach((v, k) => logger.info(`${k} -> ${JSON.stringify(v)}`)); + + // Logging Statement CRUD operations counts + if (countsST) { + logger.info(); + logger.info('***** GraphKB Statement records CRUD operations report: *****'); - try { - logger.warn(`Deleting ${toDelete.length} outdated CIViC statements from GraphKB`); - await Promise.all(toDelete.map(async statement => conn.deleteRecord( - 'Statement', rid(statement), - ))); - } catch (err) { - logger.error(err); + for (const op of Object.keys(countsST)) { + logger.info(`${op}: ${JSON.stringify(countsST[op])}`); + } } - logger.info(JSON.stringify(counts)); + // SAVING LOGGED ERRORS TO FILE + const errorFileContent = { + ...errorSourceIds, + validationErrors: validationErrorList, + }; const errorJson = `${errorLogPrefix}-civic.json`; - logger.info(`writing ${errorJson}`); - fs.writeFileSync(errorJson, JSON.stringify(errorList, null, 2)); + logger.info(); + logger.info(`***** Global report: *****\nwriting ${errorJson}`); + fs.writeFileSync(errorJson, JSON.stringify(errorFileContent, null, 2)); }; - module.exports = { - SOURCE_DEFN, - specs: { validateEvidenceSpec }, upload, }; diff --git a/src/civic/profile.js b/src/civic/profile.js index 2530a99c..4d6845d8 100644 --- a/src/civic/profile.js +++ b/src/civic/profile.js @@ -1,7 +1,12 @@ +/** + * Introducing Molecular Profiles with CIViC GraphQL API v2.2.0 + * [EvidenceItem]--(many-to-one)--[MolecularProfile]--(many-to-many)--[Variant] + */ const { error: { ErrorMixin } } = require('@bcgsc-pori/graphkb-parser'); -class NotImplementedError extends ErrorMixin { } +class NotImplementedError extends ErrorMixin { } +const MOLECULAR_PROFILE_CACHE = new Map(); /** * Factory function returning a MolecularProfile object. @@ -60,23 +65,29 @@ const MolecularProfile = (molecularProfile) => ({ }, /* Desambiguation of variants with implicit 'or' in the name */ _disambiguate() { - // Split ambiguous variants - const temp = []; + const newConditions = []; + + // For each set of conditions this.conditions.forEach((condition) => { + const temp = []; condition.forEach((variant) => { temp.push( + // Split ambiguous variants into an array of 1 or more variant(s) this._split(variant), ); }); - }); - let newCondition; + // Combine variations into new condition + let newConditionSet; - // Combine variations into new condition - for (let i = 0; i < temp.length; i++) { - newCondition = this._combine({ arr1: newCondition || [[]], arr2: temp[i] }); - } - this.conditions = newCondition; + for (let i = 0; i < temp.length; i++) { + newConditionSet = this._combine({ arr1: newConditionSet || [[]], arr2: temp[i] }); + } + newConditions.push(...newConditionSet); + }); + + // Replace old conditions by new ones + this.conditions = [...newConditions]; return this; }, /* Returns index of closing parenthesis for end of block */ @@ -192,9 +203,9 @@ const MolecularProfile = (molecularProfile) => ({ newConditions.forEach((condition) => { condition.forEach((variant) => { if (!variant) { - throw new Error( - `unable to process molecular profile with missing or misformatted variants (${this.profile.id || ''})`, - ); + const errMsg = `unable to process molecular profile with missing or misformatted variants (${this.profile.id || ''})`; + this.error = errMsg; + throw new Error(errMsg); } }); }); @@ -205,6 +216,8 @@ const MolecularProfile = (molecularProfile) => ({ }, /* Corresponding GKB Statements' conditions (1 array per statement) */ conditions: [[]], + /* Keep track of processing error */ + error: undefined, /* Main object's method. Process expression into array of conditions' arrays */ process() { // Get Molecular Profile's expression (parsedName property) @@ -216,15 +229,15 @@ const MolecularProfile = (molecularProfile) => ({ || parsedName.length === 0 || typeof parsedName[0] !== 'object' ) { - throw new Error( - `unable to process molecular profile with missing or misformatted parsedName (${this.profile.id || ''})`, - ); + const errMsg = `unable to process molecular profile with missing or misformatted parsedName (${this.profile.id || ''})`; + this.error = errMsg; + throw new Error(errMsg); } // NOT operator not yet supported if (this._not(parsedName)) { - throw new NotImplementedError( - `unable to process molecular profile with NOT operator (${this.profile.id || ''})`, - ); + const errMsg = `unable to process molecular profile with NOT operator (${this.profile.id || ''})`; + this.error = errMsg; + throw new NotImplementedError(errMsg); } // Filters out unwanted Feature info from expression const filteredParsedName = parsedName.filter(el => el.__typename !== 'Feature'); @@ -241,7 +254,38 @@ const MolecularProfile = (molecularProfile) => ({ profile: molecularProfile || {}, }); +/** + * Processing a molecular profile expression while managing the cache + * + * @param {Object} molecularProfile a Molecular Profile segment from GraphQL query + * @returns {MolecularProfile} object whose conditions' property is an array of lists of conditions + */ +const processMolecularProfile = (molecularProfile) => { + let Mp = MOLECULAR_PROFILE_CACHE.get(molecularProfile.id); + + if (Mp) { + if (Mp.error) { + throw new Error( + `Molecular profile ${molecularProfile.id} already processed with error "${Mp.error}"`, + ); + } + return Mp; + } + Mp = MolecularProfile(molecularProfile); + + // Actually process the profile expression + try { + Mp.process(); + } catch (err) { + MOLECULAR_PROFILE_CACHE.set(molecularProfile.id, Mp); + throw err; + } + MOLECULAR_PROFILE_CACHE.set(molecularProfile.id, Mp); + + return Mp; +}; module.exports = { MolecularProfile, + processMolecularProfile, }; diff --git a/src/civic/publication.js b/src/civic/publication.js index ee8e1a35..644111ee 100644 --- a/src/civic/publication.js +++ b/src/civic/publication.js @@ -1,8 +1,18 @@ +const { error: { ErrorMixin } } = require('@bcgsc-pori/graphkb-parser'); + const _asco = require('../asco'); const _pubmed = require('../entrez/pubmed'); +class NotImplementedError extends ErrorMixin { } + + /** - * Check two strings are the same irrespective of casing, trailing periods and other formatting + * Check if two strings are the same irrespective of casing, + * trailing periods and other formatting + * + * @param {string} title1 a publication title + * @param {string} title2 a second publication title + * @returns {Boolean} whether both titles are matching or not */ const titlesMatch = (title1, title2) => { const title1Simple = title1.trim().toLowerCase().replace(/\.$/, '').replace(/<\/?(em|i|bold)>/g, ''); @@ -10,14 +20,15 @@ const titlesMatch = (title1, title2) => { return title1Simple === title2Simple; }; - /** - * Fetches the publication record either from pubmed or the ASCO abstract + * Fetches the publication record either from PubMed or the ASCO abstract * * @param {ApiConnection} conn graphkb API connector - * @param {object} rawRecord CIViC Evidence Item JSON record + * @param {object} rawRecord CIViC EvidenceItem record + * @returns {object} the publication record from GraphKB */ const getPublication = async (conn, rawRecord) => { + // Upload Publication to GraphKB FROM PUBMED if (rawRecord.source.sourceType === 'PUBMED') { const [publication] = await _pubmed.fetchAndLoadByIds(conn, [rawRecord.source.citationId]); @@ -26,6 +37,8 @@ const getPublication = async (conn, rawRecord) => { } return publication; } + + // Upload Publication to GraphKB FROM ASCO if (rawRecord.source.sourceType === 'ASCO') { const abstracts = await _asco.fetchAndLoadByIds(conn, [rawRecord.source.ascoAbstractId]); @@ -54,12 +67,16 @@ const getPublication = async (conn, rawRecord) => { } return abstracts[0]; } + + // Upload Publication to GraphKB FROM ASH - No loader yet! if (rawRecord.source.sourceType === 'ASH') { - // 6 cases - // TODO: ASH loader + // TODO: ASH loader. Only a handfull of cases though } - throw Error(`unable to process non-pubmed/non-asco evidence type (${rawRecord.source.sourceType}) for evidence item (${rawRecord.id})`); + throw new NotImplementedError(`unable to process non-pubmed/non-asco evidence type (${rawRecord.source.sourceType}) for evidence item (${rawRecord.id})`); }; - -module.exports = { getPublication, titlesMatch }; +module.exports = { + getPublication, + loadPubmedCache: _pubmed.preLoadCache, + titlesMatch, +}; diff --git a/src/civic/relevance.js b/src/civic/relevance.js index c79a2624..3c1bff1f 100644 --- a/src/civic/relevance.js +++ b/src/civic/relevance.js @@ -2,7 +2,6 @@ const { error: { ErrorMixin } } = require('@bcgsc-pori/graphkb-parser'); class NotImplementedError extends ErrorMixin { } - const RELEVANCE_CACHE = {}; @@ -333,11 +332,10 @@ const translateRelevance = (evidenceType, evidenceDirection, significance) => { ); }; - /** * Convert the CIViC relevance types to GraphKB terms */ -const getRelevance = async ({ rawRecord, conn }) => { +const getRelevance = async (conn, { rawRecord }) => { // translate the type to a GraphKB vocabulary term let relevance = translateRelevance( rawRecord.evidenceType, @@ -354,8 +352,6 @@ const getRelevance = async ({ rawRecord, conn }) => { return relevance; }; - - module.exports = { getRelevance, translateRelevance, diff --git a/src/civic/specs.json b/src/civic/specs.json index 338a1072..740e28cc 100644 --- a/src/civic/specs.json +++ b/src/civic/specs.json @@ -151,13 +151,11 @@ "name" ], "type": [ - "null", "object" ] }, "type": [ - "array", - "null" + "array" ] } }, diff --git a/src/civic/statement.js b/src/civic/statement.js new file mode 100644 index 00000000..e4049bcf --- /dev/null +++ b/src/civic/statement.js @@ -0,0 +1,292 @@ +const _ = require('lodash'); + +const { logger } = require('../logging'); + + +/** + * Evaluate if two statement's content can be matched to one another. + * Used to map each EvidenceLevel's combination to its corresponding GraphKB statement + * + * @param {object} fromCivic new content from CIViC + * @param {object} fromGkb actual content from GraphKB + * @returns {boolean} whether both contents are matching or not + */ +const isMatching = ({ fromCivic, fromGkb, p = ['conditions', 'subject'] }) => ( + JSON.stringify(_.pick(fromCivic, ...p)) === JSON.stringify(_.pick(fromGkb, ...p)) +); + +/** + * Evaluate if a statement needs to be updated + * when compared to its matching EvidenceLevel's combination + * + * @param {object} param0 + * @param {object} param0.fromCivic new content from CIViC + * @param {object} param0.fromGkb actual content from GraphKB + * @returns {boolean} whether the GraphKB record needs to be updated or not + */ +const needsUpdate = ({ fromCivic, fromGkb }) => { + const isEqual = JSON.stringify(fromCivic) === JSON.stringify(_.omit(fromGkb, ['@rid'])); + + // Logging details if not equal + if (!isEqual) { + const updatedFields = []; + + for (const [key, value] of Object.entries(fromCivic)) { + if (JSON.stringify(value) !== JSON.stringify(fromGkb[key])) { + updatedFields.push(key); + } + } + logger.info(`Update needed on ${updatedFields.toString()}`); + } + + return !isEqual; +}; + +/** + * Given an array of content from civic and an array of actual statements from GraphKG, + * match corresponding content together + * + * @param {object} param0 + * @param {object[]} param0.allFromCivic array of new content from CIViC + * @param {object[]} param0.allFromGkb array of actual content from GraphKB + * @param boolean} param0.matchingOnSubjectAlone if additional matching on subject alone + * @param boolean} param0.matchingWithoutComparing if random matching with remaining records + * @returns {object} content of records to create, update and delete in GrpahKB + */ +const contentMatching = ({ + allFromCivic, + allFromGkb, + matchingOnSubjectAlone = true, + matchingWithoutComparing = true, +}) => { + const records = { + toCreate: [], // Array of content from CIViC to create as GraphKB statements + toDelete: [], // Array of GraphKB statements to delete + toUpdate: [], /* Array of CIViC-GraphKB pairs of content for statement update + Note: statement will be updated only if needed */ + }; + + /* + MATCHING ONE TO ONE + + Will automatically be submitted for update, without deletion/creation + */ + + if (allFromCivic.length === 1 && allFromGkb.length === 1) { + records.toUpdate.push({ fromCivic: allFromCivic[0], fromGkb: allFromGkb[0] }); + return records; + } + + /* + MATCHING ON CONDITIONS AND SUBJECT + */ + + const remainingFromGkb = [...allFromGkb]; + allFromCivic.forEach(el => { + let matched = false; + + for (let i = 0; i < remainingFromGkb.length; i++) { + // matching on conditions and subject (default) + if (isMatching({ + fromCivic: el, + fromGkb: remainingFromGkb[i], + })) { + records.toUpdate.push({ + fromCivic: el, + fromGkb: remainingFromGkb[i], + }); + remainingFromGkb.splice(i, 1); + matched = true; + break; + } + } + + if (!matched) { + records.toCreate.push(el); + } + }); + records.toDelete = [...remainingFromGkb]; + + /* + MATCHING ON SUBJECT ALONE + */ + if (!matchingOnSubjectAlone) { return records; } + + let numUnmatched = Math.min( + records.toCreate.length, + records.toDelete.length, + ); + + if (numUnmatched > 0) { + const remainingToCreate = []; + + for (let j = 0; j < records.toCreate.length; j++) { + let matched = false; + + for (let i = 0; i < records.toDelete.length; i++) { + // matching on subject + if (isMatching({ + fromCivic: records.toCreate[j], + fromGkb: records.toDelete[i], + p: ['subject'], + })) { + records.toUpdate.push({ + fromCivic: records.toCreate[j], + fromGkb: records.toDelete[i], + }); + records.toDelete.splice(i, 1); + matched = true; + break; + } + } + + if (!matched) { + remainingToCreate.push(records.toCreate[j]); + } + } + records.toCreate = [...remainingToCreate]; + } + + /* + ARTIFICIAL MATCHING WITHOUT COMPARISON + + In order to reduce unnecessary create/delete statements, + artificially match pairs until only some records.toCreate record(s) remains + or some records.toDelete record(s) remains. + */ + if (!matchingWithoutComparing) { return records; } + + numUnmatched = Math.min( + records.toCreate.length, + records.toDelete.length, + ); + + // Randomly match remaining content + if (numUnmatched > 0) { + for (let i = 0; i < numUnmatched; i++) { + // 'Artificial' pairing + records.toUpdate.push( + { fromCivic: records.toCreate[i], fromGkb: records.toDelete[i] }, + ); + } + // Remove from records.toCreate and records.toDelete + records.toCreate.splice(0, numUnmatched); + records.toDelete.splice(0, numUnmatched); + } + + return records; +}; + +/** + * Given content from CIViC, try to create the GraphKB record + * + * @param {ApiConnection} conn the API connection object for GraphKB + * @param {object} param1 + * @param {object[]} param1.fromCivic new content from CIViC + * @returns {object} a count object for error and success + */ +const createStatement = async (conn, { fromCivic }) => { + const counts = { err: 0, success: 0 }; + + try { + await conn.addRecord({ content: fromCivic, target: 'Statement' }); + counts.success++; + } catch (err) { + logger.error(err); + counts.err++; + } + + return counts; +}; + +/** + * Given content from CIViC and a corresponding GraphKB Statement rid, + * try to update the GraphKB record + * + * @param {ApiConnection} conn the API connection object for GraphKB + * @param {object} param1 + * @param {object[]} param1.fromCivic new content from CIViC + * @param {object[]} param1.fromGkb actual content from GraphKB + * @returns {object} a count object for error and success + */ +const updateStatement = async (conn, { fromCivic, fromGkb }) => { + const counts = { err: 0, success: 0 }; + + try { + await conn.addRecord({ + content: fromCivic, + existsOk: true, + fetchConditions: { + // Since CIViC content has already been matched + // to its corresponding GraphKB statement + '@rid': fromGkb['@rid'], + }, + target: 'Statement', + upsert: true, + }); + counts.success++; + } catch (err) { + logger.error(err); + counts.err++; + } + + return counts; +}; + +/** + * Soft-delete GraphKB Statements from either an array of Statement's RIDs + * or an array of sourceIds and its corresponding source + * + * @param {ApiConnection} conn the api connection object for GraphKB + * @param {object} param1 + * @param {?string[]} param1.rids an array of Statement's RIDs + * @param {string} param1.source the source RID + * @param {string[]} param1.sourceIds an array of sourceIds + * @returns {object} a count object for error and success + */ +const deleteStatements = async (conn, { rids = [], source, sourceIds }) => { + const counts = { err: 0, success: 0 }; + + // Get rids to delete if none provided + if (rids.length === 0) { + logger.info('Loading corresponding GraphKB statement RIDs to delete'); + const records = await conn.getRecords({ + filters: { + AND: [ + { sourceId: sourceIds }, + { source }, + ], + }, + target: 'Statement', + }); + rids.push(...records.map( + (el) => el['@rid'], + )); + logger.info(`${rids.length} RIDs found`); + } + + // Delete statements + logger.info(`Deleting ${rids.length} statement(s)...`); + logger.info(rids); + + for (const r of rids) { + try { + await conn.deleteRecord('Statement', r); + counts.success++; + } catch (err) { + logger.error(err); + counts.err++; + } + } + + return counts; +}; + +module.exports = { + contentMatching, + createStatement, + deleteStatements, + isMatching, + needsUpdate, + updateStatement, +}; diff --git a/src/civic/therapy.js b/src/civic/therapy.js new file mode 100644 index 00000000..021f88ff --- /dev/null +++ b/src/civic/therapy.js @@ -0,0 +1,200 @@ +const { logger } = require('../logging'); +const { ncit: NCIT_SOURCE_DEFN } = require('../sources'); +const { orderPreferredOntologyTerms, rid } = require('../graphkb'); + + +/** + * Given a CIViC EvidenceItem record, + * compile a list of its therapies into a list of combination of therapies, and + * returns modified 'therapies' & 'therapyInteractionType' properties + * + * record.therapies will be transformed into: + * - a list of 1 list of 1-or-many therapies ('COMBINATION' || 'SEQUENTIAL'), or + * - a list of 1-or-many lists of 1 therapy ('SUBSTITUTES'), or + * - a list of 1 null + * + * @param {object} evidenceItem the original CIViC EvidenceItem + * @returns {object} the modified EvidenceItem + */ +const resolveTherapies = (evidenceItem) => { + const record = JSON.parse(JSON.stringify(evidenceItem)); // Deep copy + + // No therapy + if (record.therapies === null || record.therapies.length === 0) { + record.therapies = [null]; + return record; + } + + // One or more therapies + if (record.therapies.length === 1 || record.therapyInteractionType === 'SUBSTITUTES') { + record.therapies = record.therapies.map(therapy => [therapy]); + record.therapyInteractionType = null; + } else if ( + record.therapyInteractionType === 'COMBINATION' + || record.therapyInteractionType === 'SEQUENTIAL' + ) { + record.therapies = [record.therapies]; + } else { + logger.error(`(evidence: ${record.id}) unsupported therapy interaction type (${record.therapyInteractionType}) for a multiple therapy (${record.therapies.length}) statement`); + throw new Error('Did not find unique record'); + } + + // Since duplicates can occure (from civic !?), lets remove them + // Need to strignify/parse since we're comparing arrays of objects + const unique = new Set(); + record.therapies.forEach(therapy => unique.add(JSON.stringify(therapy))); + record.therapies = []; + unique.forEach(therapy => record.therapies.push(JSON.parse(therapy))); + + return record; +}; + +/** + * Given a Therapy record from CIViC, + * returns a Therapy record from GraphKB + * + * @param {ApiConnection} conn the API connection object for GraphKB + * @param {object} therapyRecord a therapy from CIViC + * @returns {object} Therapy record from GraphKB + */ +const getTherapy = async (conn, therapyRecord) => { + const name = therapyRecord.name.toLowerCase().trim(); + const ncitId = therapyRecord.ncitId && typeof therapyRecord.ncitId === 'string' + ? therapyRecord.ncitId.toLowerCase().trim() + : therapyRecord.ncitId; + + if (ncitId) { + // Trying with the ncitId and the name + try { + return await conn.getUniqueRecordBy({ + filters: [ + { source: { filters: { name: NCIT_SOURCE_DEFN.name }, target: 'Source' } }, + { sourceId: ncitId }, + { name }, + ], + sort: orderPreferredOntologyTerms, + target: 'Therapy', + }); + } catch (err) { + logger.warn(`Failed to fetch therapy with NCIt id (${ncitId}) & name (${therapyRecord.name}) from graphkb`); + } + + // Trying with the ncitId only + // Choosing the most recently created one + try { + const matchingTherapies = await conn.getRecords({ + filters: { + AND: [ + { source: { filters: { name: NCIT_SOURCE_DEFN.name }, target: 'Source' } }, + { sourceId: ncitId }, + ], + }, + target: 'Therapy', + }); + // In-place sorting + matchingTherapies.sort((a, b) => b.createdAt - a.createdAt); + // returning 1st one (latest created) + return matchingTherapies[0]; + } catch (err) { + logger.error(`Failed to fetch therapy with NCIt id (${ncitId}) from graphkb`); + throw err; + } + } + + let originalError; + + // Trying instead with the name + // Using the getTherapy method from the connection object + try { + // With the name as-is first + return await conn.getTherapy(name); + } catch (err) { + originalError = err; + } + + try { + // Then with the name parsed + const match = /^\s*(\S+)\s*\([^)]+\)$/.exec(name); + + if (match) { + return await conn.getTherapy(match[1]); + } + } catch (err) { } + + // Logging errors + throw originalError; +}; + +/** + * Given a list of CIViC Therapy Records, + * + * (If one therapy) + * returns the corresponding Therapy record from GraphKB + * + * (If a combination of therapies) + * will add a therapy combination if there is not an existing record, + * will link the therapy combination to its individual elements with 'ElementOf' edges, then + * returns the corresponding Therapy record from GraphKB + * + * @param {ApiConnection} conn the API connection object for GraphKB + * @param {string} sourceRid + * @param {object[]} therapiesRecords + * @param {string} combinationType + * @returns {object} the corresponding Therapy record from GraphKB + */ +const addOrFetchTherapy = async (conn, sourceRid, therapiesRecords, combinationType) => { + /* ONE OR NO THERAPY */ + + if (therapiesRecords.length === 0) { + return null; + } + if (therapiesRecords.length === 1) { + if (therapiesRecords[0] === null) { + return null; + } + // Get the corresponding Therapy record from GraphKB + return getTherapy(conn, therapiesRecords[0]); + } + + /* COMBINATION OF THERAPIES */ + + // For each therapy, get the corresponding Therapy record from GraphKB + const therapies = await Promise.all( + therapiesRecords.map( + async therapy => getTherapy(conn, therapy), + ), + ); + // concatenating sourceIds and names + const sourceId = therapies.map(e => e.sourceId).sort().join(' + '); + const name = therapies.map(e => e.name).sort().join(' + '); + + // Add a Therapy Vertice for the combined therapies + const combinedTherapy = await conn.addRecord({ + content: { + combinationType, name, source: sourceRid, sourceId, + }, + existsOk: true, + target: 'Therapy', + }); + + // Add ElementOf Edges between corresponding records + for (const therapy of therapies) { + await conn.addRecord({ + content: { + in: rid(combinedTherapy), + out: rid(therapy), + source: sourceRid, + }, + existsOk: true, + target: 'ElementOf', + }); + } + + return combinedTherapy; +}; + +module.exports = { + addOrFetchTherapy, + getTherapy, + resolveTherapies, +}; diff --git a/src/civic/variant.js b/src/civic/variant.js index 5550069c..98ebee67 100644 --- a/src/civic/variant.js +++ b/src/civic/variant.js @@ -7,6 +7,7 @@ const { civic: SOURCE_DEFN } = require('../sources'); const { error: { ErrorMixin, ParsingError } } = kbParser; class NotImplementedError extends ErrorMixin { } +const VARIANT_CACHE = new Map(); // based on discussion with cam here: https://www.bcgsc.ca/jira/browse/KBDEV-844 const SUBS = { @@ -31,7 +32,13 @@ const SUBS = { 'p26.3-25.3 11mb del': 'y.p26.3_p25.3del', }; - +/** + * Compares two gene names together for equality + * + * @param {string} gene1 a gene name + * @param {string} gene2 a second gene name + * @returns {boolean} whether the genes names are equal or not + */ const compareGeneNames = (gene1, gene2) => { if (['abl1', 'abl'].includes(gene1.toLowerCase()) && ['abl1', 'abl'].includes(gene2.toLowerCase())) { return true; @@ -42,7 +49,14 @@ const compareGeneNames = (gene1, gene2) => { }; /** - * Given a CIViC Variant record entrez information and name, normalize into a set of graphkb-style variants + * Given a CIViC Variant record entrez information and name, + * normalize into a set of graphkb-style variants + * + * @param {object} param0 + * @param {string} param0.name + * @param {string} param0.entrezId + * @param {string} param0.entrezName + * @returns {object} */ const normalizeVariantRecord = ({ name: rawName, entrezId, entrezName: rawEntrezName, @@ -231,6 +245,7 @@ const normalizeVariantRecord = ({ * @param {ApiConnection} conn the connection to GraphKB * @param {Object} normalizedVariant the normalized variant record * @param {Object} feature the gene feature already grabbed from GraphKB + * @returns {object[]} */ const uploadNormalizedVariant = async (conn, normalizedVariant, feature) => { let result; @@ -321,19 +336,20 @@ const uploadNormalizedVariant = async (conn, normalizedVariant, feature) => { return result; }; - /** * Given some variant record and a feature, process the variant and return a GraphKB equivalent * * @param {ApiConnection} conn the connection to GraphKB * @param {Object} civicVariantRecord the raw variant record from CIViC * @param {Object} feature the gene feature already grabbed from GraphKB + * @returns {object[]} */ const processVariantRecord = async (conn, civicVariantRecord, feature) => { - const featureInstance = civicVariantRecord.feature.featureInstance; + const { feature: { featureInstance } } = civicVariantRecord; let entrezId, entrezName; + // featureInstance if (featureInstance.__typename === 'Gene') { entrezId = featureInstance.entrezId; entrezName = featureInstance.name; @@ -345,22 +361,46 @@ const processVariantRecord = async (conn, civicVariantRecord, feature) => { ); } - const variants = normalizeVariantRecord({ + // Raw variant from CIViC to normalize & upload to GraphKB if needed + const rawVariant = { entrezId, entrezName, name: civicVariantRecord.name, - }); + }; + + // Trying cache first + const fromCache = VARIANT_CACHE.get(JSON.stringify(rawVariant)); + + if (fromCache) { + if (fromCache.err) { + throw new Error('Variant record previously processed with errors'); + } + if (fromCache.result) { + return fromCache.result; + } + } const result = []; - for (const normalizedVariant of variants) { - result.push(await uploadNormalizedVariant(conn, normalizedVariant, feature)); + try { + // Normalizing + const variants = normalizeVariantRecord(rawVariant); + + // Uploading + for (const normalizedVariant of variants) { + result.push(await uploadNormalizedVariant(conn, normalizedVariant, feature)); + } + } catch (err) { + VARIANT_CACHE.set(JSON.stringify(rawVariant), { err }); } + + VARIANT_CACHE.set(JSON.stringify(rawVariant), { result }); return result; }; - module.exports = { + compareGeneNames, normalizeVariantRecord, processVariantRecord, + uploadNormalizedVariant, }; diff --git a/src/graphkb.js b/src/graphkb.js index acc4333a..914d2e01 100644 --- a/src/graphkb.js +++ b/src/graphkb.js @@ -557,6 +557,7 @@ class ApiConnection { const model = schema.get(target); const filters = fetchConditions || convertRecordToQueryFilters(content); + // Will first try to fetch and/or update the record if it already exists if (fetchFirst || upsert) { try { const result = await this.getUniqueRecordBy({ @@ -577,6 +578,7 @@ class ApiConnection { throw new Error(`cannot find model from target (${target})`); } + // Then (since record dosen't already exists) will create a new record try { const { result } = jc.retrocycle(await this.request({ body: content, diff --git a/test/civic.profile.test.js b/test/civic/civic.profile.test.js similarity index 93% rename from test/civic.profile.test.js rename to test/civic/civic.profile.test.js index 5f70ac31..ee8ea60d 100644 --- a/test/civic.profile.test.js +++ b/test/civic/civic.profile.test.js @@ -1,4 +1,4 @@ -const { MolecularProfile } = require('../src/civic/profile'); +const { MolecularProfile } = require('../../src/civic/profile'); describe('MolecularProfile._combine()', () => { @@ -32,7 +32,7 @@ describe('MolecularProfile._compile()', () => { }); describe('MolecularProfile._disambiguate()', () => { - test('disambiguate conditions', () => { + test('disambiguate conditions in AND statements', () => { const Mp = MolecularProfile(); Mp.conditions = [ [{ id: 8, name: 'X123M/N' }, { id: 9, name: 'X456O/P' }, { id: 10, name: 'X456Q' }], @@ -46,6 +46,22 @@ describe('MolecularProfile._disambiguate()', () => { ], ); }); + + test('disambiguate conditions in OR statements', () => { + const Mp = MolecularProfile(); + Mp.conditions = [ + [{ id: 8, name: 'X123M/N' }], + [{ id: 9, name: 'X456O/P' }], + ]; + expect(Mp._disambiguate().conditions).toEqual( + [ + [{ id: 8, name: 'X123M' }], + [{ id: 8, name: 'X123N' }], + [{ id: 9, name: 'X456O' }], + [{ id: 9, name: 'X456P' }], + ], + ); + }); }); describe('MolecularProfile._end()', () => { @@ -140,7 +156,6 @@ describe('MolecularProfile._parse()', () => { ); }); - describe('MolecularProfile._split()', () => { test.each([ ['Q157P/R', [[{ name: 'Q157P' }], [{ name: 'Q157R' }]]], diff --git a/test/civic.publication.test.js b/test/civic/civic.publication.test.js similarity index 94% rename from test/civic.publication.test.js rename to test/civic/civic.publication.test.js index fde748dd..1445ec72 100644 --- a/test/civic.publication.test.js +++ b/test/civic/civic.publication.test.js @@ -1,4 +1,4 @@ -const { titlesMatch } = require('../src/civic/publication'); +const { titlesMatch } = require('../../src/civic/publication'); describe('titlesMatch', () => { diff --git a/test/civic.relevance.test.js b/test/civic/civic.relevance.test.js similarity index 90% rename from test/civic.relevance.test.js rename to test/civic/civic.relevance.test.js index c7a02630..bb922dbc 100644 --- a/test/civic.relevance.test.js +++ b/test/civic/civic.relevance.test.js @@ -1,4 +1,4 @@ -const { translateRelevance } = require('../src/civic/relevance'); +const { translateRelevance } = require('../../src/civic/relevance'); describe('translateRelevance', () => { test.each([ @@ -32,7 +32,11 @@ describe('translateRelevance', () => { ['NA', 'PREDISPOSING', 'NA', 'likely predisposing'], ])( '%s|%s|%s returns %s', (evidenceDirection, evidenceType, clinicalSignificance, expected) => { - expect(translateRelevance(evidenceType, evidenceDirection, clinicalSignificance)).toEqual(expected); + expect(translateRelevance( + evidenceType, + evidenceDirection, + clinicalSignificance, + )).toEqual(expected); }, ); @@ -74,7 +78,11 @@ describe('translateRelevance', () => { ['--', '--', '--'], ])( '%s|%s|%s errors', (evidenceDirection, evidenceType, clinicalSignificance) => { - expect(() => translateRelevance(evidenceType, evidenceDirection, clinicalSignificance)).toThrow('unable to process relevance'); + expect(() => translateRelevance( + evidenceType, + evidenceDirection, + clinicalSignificance, + )).toThrow('unable to process relevance'); }, ); }); diff --git a/test/civic/civic.statement.test.js b/test/civic/civic.statement.test.js new file mode 100644 index 00000000..a74d1269 --- /dev/null +++ b/test/civic/civic.statement.test.js @@ -0,0 +1,167 @@ +/* eslint-disable jest/no-disabled-tests */ +const { + contentMatching, + isMatching, + needsUpdate, +} = require('../../src/civic/statement'); + + +// Generic content +const content = { + conditions: ['#123:1', '#123:2'], // conditions NEEDS to be already sorted in ascending order + description: 'test', + evidence: ['#123:1'], + evidenceLevel: ['#123:1'], + relevance: '#123:1', + reviewStatus: 'not required', + source: '#123:1', + sourceId: '9999', + subject: '#123:1', +}; + +// Combination of matching and not matching content +const allFromCivic = [ + { ...content, subject: '#888:0' }, // matching with allFromGkb[3] + { ...content, subject: '#888:1' }, // matching with allFromGkb[1] + { ...content, subject: '#888:2' }, // not matching +]; +const allFromGkb = [ + { ...content, '@rid': '#999:0', subject: '#888:3' }, // not matching + { ...content, '@rid': '#999:1', subject: '#888:1' }, // matching with allFromCivic[1] + { ...content, '@rid': '#999:2', subject: '#888:4' }, // not matching + { ...content, '@rid': '#999:3', subject: '#888:0' }, // matching with allFromCivic[0] +]; + +describe('needsUpdate', () => { + // No need to update + test('identical content', () => { + expect(needsUpdate({ + fromCivic: content, + fromGkb: content, + })).toBe(false); + }); + + test('discarding gkb rid', () => { + expect(needsUpdate({ + fromCivic: content, + fromGkb: { ...content, '@rid': '#123:1' }, + })).toBe(false); + }); + + // Need to update + test('any difference', () => { + expect(needsUpdate({ + fromCivic: content, + fromGkb: { ...content, description: '' }, + })).toBe(true); + }); +}); + +describe('isMatching', () => { + // No matching + test('difference on conditions', () => { + expect(isMatching({ + fromCivic: content, + fromGkb: { ...content, conditions: ['#123:1', '#123:3'] }, + })).toBe(false); + }); + + test('difference on subject', () => { + expect(isMatching({ + fromCivic: content, + fromGkb: { ...content, subject: '#123:2' }, + })).toBe(false); + }); + + // Matching + test('difference on conditions while matching only on subject', () => { + expect(isMatching({ + fromCivic: content, + fromGkb: { ...content, conditions: ['#123:1', '#123:3'] }, + p: ['subject'], + })).toBe(true); + }); + + // Matching on subject alone + test('any other difference', () => { + expect(isMatching({ + fromCivic: content, + fromGkb: { ...content, description: '' }, + })).toBe(true); + }); +}); + +describe('contentMatching', () => { + test('matching only on conditions and subject', () => { + const records = contentMatching({ + allFromCivic, + allFromGkb, + matchingOnSubjectAlone: false, + }); + + // matching content + expect(records.toUpdate.length).toBe(2); + + // allFromGkb with no matches + expect(records.toDelete.length).toBe(2); + + // allFromCivic with no matches + expect(records.toCreate.length).toBe(1); + + // matching content + expect(records.toUpdate[0]).toEqual({ + fromCivic: allFromCivic[0], + fromGkb: allFromGkb[3], + }); + expect(records.toUpdate[1]).toEqual({ + fromCivic: allFromCivic[1], + fromGkb: allFromGkb[1], + }); + + // allFromGkb with no matches + expect(records.toDelete[0]).toEqual(allFromGkb[0]); + expect(records.toDelete[1]).toEqual(allFromGkb[2]); + + // allFromCivic with no matches + expect(records.toCreate[0]).toEqual(allFromCivic[2]); + }); + + test('matching also on subject alone, without artificial matching', () => { + const records = contentMatching({ + allFromCivic: [ + { ...content, conditions: ['#777:77'], subject: '#777:1' }, + { ...content, conditions: ['#777:77'], subject: '#777:2' }, + ], + allFromGkb: [ + { ...content, conditions: ['#888:88'], subject: '#777:1' }, + { ...content, conditions: ['#888:88'], subject: '#888:2' }, + ], + matchingWithoutComparing: false, + }); + + // matching content + expect(records.toUpdate.length).toBe(1); + + // allFromGkb with no matches + expect(records.toDelete.length).toBe(1); + + // allFromCivic with no matches + expect(records.toCreate.length).toBe(1); + }); + + test('matching until artificial matching', () => { + const records = contentMatching({ + allFromCivic, + allFromGkb, + }); + + // matching content + expect(records.toUpdate.length).toBe(3); + + // allFromGkb with no matches + expect(records.toDelete.length).toBe(1); + + // allFromCivic with no matches + expect(records.toCreate.length).toBe(0); + }); +}); diff --git a/test/civic.variant.test.js b/test/civic/civic.variant.test.js similarity index 98% rename from test/civic.variant.test.js rename to test/civic/civic.variant.test.js index d37649e8..4814157c 100644 --- a/test/civic.variant.test.js +++ b/test/civic/civic.variant.test.js @@ -1,4 +1,5 @@ -const { normalizeVariantRecord } = require('../src/civic/variant'); +/* eslint-disable jest/no-disabled-tests */ +const { normalizeVariantRecord } = require('../../src/civic/variant'); describe('normalizeVariantRecord', () => { test('exon mutation', () => { @@ -218,7 +219,6 @@ describe('normalizeVariantRecord', () => { ]); }); - test('categorical variant with spaces', () => { const variants = normalizeVariantRecord({ entrezId: 1, @@ -334,7 +334,6 @@ describe('normalizeVariantRecord', () => { ]); }); - test('cds notation', () => { // BCR-ABL const variants = normalizeVariantRecord({ @@ -521,7 +520,6 @@ describe('normalizeVariantRecord', () => { ]); }); - test('protein dup with cds dup', () => { // p.s193_c196dupstsc (c.577_588dupagcaccagctgc) const variants = normalizeVariantRecord({ @@ -584,7 +582,7 @@ describe('normalizeVariantRecord', () => { ]); }); - test('catalogue variant', () => { + test.skip('catalogue variant', () => { // RS3910384 }); @@ -620,11 +618,11 @@ describe('normalizeVariantRecord', () => { ]); }); - test('duplicate fusion', () => { + test.skip('duplicate fusion', () => { // AGGF1-PDGFRB, AGGF1-PDGFRB C843G }); - test('non-specific positional mutaiton', () => { + test.skip('non-specific positional mutaiton', () => { // E1813 mutations });