-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathvalidate.py
executable file
·354 lines (289 loc) · 11.8 KB
/
validate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Copyright [2009-2017] EMBL-European Bioinformatics Institute
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import json
import logging
from collections import Counter
import click
import requests
import jsonschema as js
# import jsonref as jr
HERE = os.path.abspath(os.path.dirname(__file__))
SECTIONS = os.path.join(HERE, "sections")
SCHEMA_NAME = "rnacentral-schema.json"
LOGGER = logging.getLogger(__name__)
TAX_URL = "https://www.ebi.ac.uk/ena/data/taxonomy/v1/taxon/tax-id/{taxon_id}"
PUB_URLS = {
"PMID": "https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=EXT_ID:{value}+AND+SRC:MED&format=json",
"DOI": "https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=DOI:{value}&format=json",
}
class ValidationWarning(js.ValidationError):
pass
class UnrunnableValidator(js.ValidationError):
pass
class ExtendedValidator:
def __init__(self, schema, extra, *args, **kwargs):
validator_class = js.validators.validator_for(schema)
validator_class.check_schema(schema)
self.schema = schema
self.json_validator = validator_class(schema, *args, **kwargs)
self.extra_validators = extra
def update_error(self, instance, validator, item, error):
# Copied from the jsonschema validators
try:
name = validator.__name__
except AttributeError:
name = validator.__class__.__name__
error._set(validator=name, validator_value=item, instance=instance, schema=None)
return error
def validate_metadata(self, instance):
for validator in self.extra_validators:
if not hasattr(validator, "validate_metadata"):
continue
value = None
try:
value = instance["metaData"]
for error in validator.validate_metadata(value):
yield (validator, value, error)
except Exception as err:
yield (validator, value, UnrunnableValidator(err))
def validate_ncrnas(self, instance):
for ncrna in instance["data"]:
for validator in self.extra_validators:
if not hasattr(validator, "validate_ncrna"):
continue
try:
for error in validator.validate_ncrna(ncrna):
yield (validator, ncrna, error)
except Exception as err:
yield (validator, ncrna, UnrunnableValidator(err))
def iter_errors(self, instance):
for error in self.json_validator.iter_errors(instance):
yield error
if isinstance(instance, dict):
if "metaData" in instance:
for validator, value, error in self.validate_metadata(instance):
yield self.update_error(instance, validator, value, error)
if "data" in instance:
for validator, value, error in self.validate_ncrnas(instance):
yield self.update_error(instance, validator, value, error)
class KnownGlobalIdValidator(object):
def __init__(self):
with open("sections/data-provider.json", "r") as raw:
known = json.load(raw)
self.known = set(known["properties"]["dataProvider"]["enum"])
def validate_ncrna(self, ncrna):
gene_id = ncrna.get("gene", {}).get("geneId", None)
if gene_id:
name, _ = gene_id.split(":", 1)
if name.upper() not in self.known:
yield js.ValidationError("Unknown database: %s" % name)
for global_id in ncrna.get("crossReferenceIds", []):
name, _ = global_id.split(":", 1)
if name.upper() not in self.known:
yield js.ValidationError("Xref to unknown db: %s" % name)
class ActiveTaxonIdValidator(object):
def __init__(self):
self.seen = set()
self.failed = set()
def validate_ncrna(self, ncrna):
taxon_id = int(ncrna["taxonId"].split(":", 1)[1])
if taxon_id in self.seen:
return
if taxon_id in self.failed:
yield js.ValidationError("Invalid Taxon id: %s" % taxon_id)
else:
try:
response = requests.get(TAX_URL.format(taxon_id=taxon_id))
response.raise_for_status()
self.seen.add(taxon_id)
except requests.HTTPError:
self.failed.add(taxon_id)
yield js.ValidationError("Invalid Taxon id: %s" % taxon_id)
class PublicationValidator(object):
def __init__(self):
self.seen = set()
self.failed = set()
self.requires_ncrna_publications = True
def validate_pmid(self, pub_id):
db, db_id = pub_id.split(":", 1)
if db_id in self.seen:
return
if db not in PUB_URLS:
return ValidationWarning("Could not validate %s" % pub_id)
if db_id in self.failed:
return js.ValidationError("Invalid reference id: %s" % pub_id)
try:
url = PUB_URLS[db].format(value=db_id)
response = requests.get(url)
response.raise_for_status()
data = response.json()
if data["hitCount"] == 0:
raise requests.HTTPError("Not found")
self.seen.add(db_id)
except requests.HTTPError:
self.failed.add(db_id)
return js.ValidationError("Invalid reference id: %s" % pub_id)
def validate_metadata(self, metadata):
publications = metadata.get("publications", [])
if not publications:
yield ValidationWarning("Databases should have a reference")
else:
self.requires_ncrna_publications = False
for pmid in publications:
error = self.validate_pmid(pmid)
if error:
yield error
def validate_ncrna(self, ncrna):
publications = ncrna.get("publications", [])
if not publications and self.requires_ncrna_publications:
yield js.ValidationError(
"Must have at least one reference for: %s" % ncrna["primaryId"]
)
for pub_id in publications:
error = self.validate_pmid(pub_id)
if error:
yield error
class SecondaryStructureValidator(object):
def validate_ncrna(self, ncrna):
if "secondaryStructure" not in ncrna:
return
if len(ncrna["secondaryStructure"]) != len(ncrna["sequence"]):
yield js.ValidationError("Secondary structure wrong size")
# Not clear if this is actually needed.
# def trna_annotations(ncrna):
# isoType = ncrna.get('additionalAnnotations', {}).get('isoType', None)
# anticodon = ncrna.get('sequenceFeatures', {}).get('anticodon', None)
# if isoType or anticodon:
# if ncrna['soTermId'] != 'SO:0000253':
# yield js.ValidationError("tRNA has the wrong SO term")
class NameValidator(object):
def validate_ncrna(self, ncrna):
name = None
if "description" in ncrna and ncrna["description"]:
LOGGER.debug(
"Using transcript description for name of %s", ncrna["primaryId"]
)
name = ncrna["description"]
if "name" in ncrna and ncrna["name"]:
LOGGER.debug("Using transcript name for name of %s", ncrna["primaryId"])
name = ncrna["name"]
if "gene" in ncrna:
gene = ncrna["gene"]
if "name" in gene:
LOGGER.debug("Using gene name for name of %s", ncrna["primaryId"])
name = gene["name"]
if "symbol" in gene:
LOGGER.debug("Using gene symbol for name of %s", ncrna["primaryId"])
name = gene["symbol"]
if name:
LOGGER.debug("Using name %s for %s", name, ncrna["primaryId"])
else:
yield js.ValidationError("No name for %s" % ncrna["primaryId"])
class CoordinateDirectionValidator(object):
def validate_ncrna(self, ncrna):
for location in ncrna.get("genomeLocations", []):
for exon in location["exons"]:
if (
exon["strand"] == "+"
or exon["strand"] == "."
or exon["strand"] == "-"
):
if not exon["startPosition"] < exon["endPosition"]:
yield js.ValidationError(
"Start must be < end: {}".format(ncrna.get("primaryId"))
)
else:
raise ValueError("Shouldn't be here")
class AcceptableUncertaintyValidator(object):
def validate_ncrna(self, ncrna):
standard = set("ACGT")
sequence = ncrna["sequence"]
total = float(len(ncrna["sequence"]))
uncertainty = sum(1 for s in sequence if s not in standard)
if float(uncertainty) / total > 0.1:
yield ValidationWarning(
"Sequence for %s is too uncertain (%f/%i)" % (ncrna, uncertainty, total)
)
def validate(data, schema_path, sections_path, suppressed_errors=[]):
with open(schema_path, "r") as raw:
schema = json.load(raw)
validators = {
"uncertainty": AcceptableUncertaintyValidator(),
"direction": CoordinateDirectionValidator(),
"name": NameValidator(),
"structure": SecondaryStructureValidator(),
"active_taxon": ActiveTaxonIdValidator(),
"known_db": KnownGlobalIdValidator(),
"publications": PublicationValidator(),
}
# Skip LOGGING of these validators, but still count them
skipped_validators = []
for e_type in suppressed_errors:
skipped_validators.append(type(validators[e_type]).__name__)
base = "file://%s/" % sections_path
validator = ExtendedValidator(
schema,
validators.values(),
format_checker=js.FormatChecker(),
resolver=js.RefResolver(base, None),
)
found = False
counts = Counter()
for error in validator.iter_errors(data):
counts[error.validator] += 1
if error.validator in skipped_validators:
continue
if isinstance(error, ValidationWarning):
LOGGER.warning(error.message)
else:
found = True
LOGGER.error(error.message)
if found:
summary = ", ".join("%s: %s" % (k, v) for k, v in counts.items())
raise click.ClickException("Validation failed: %s" % summary)
@click.command()
@click.argument("filename")
@click.option("--schema", default=SCHEMA_NAME, help="Filename of the schema to use")
@click.option(
"--sections", default=SECTIONS, help="Directory where schema parts are kept"
)
@click.option(
"--suppress",
"-s",
multiple=True,
type=click.Choice(
[
"known_db",
"publications",
"active_taxon",
"structure",
"name",
"direction",
"uncertainty",
]
),
)
def main(filename, schema=None, sections=None, suppress=[]):
with open(filename, "r", encoding="utf-8") as raw:
data = json.load(raw)
data["metaData"]["dataProvider"] = data["metaData"]["dataProvider"].upper()
validate(data, schema, os.path.abspath(sections), suppress)
if __name__ == "__main__":
logging.basicConfig(
format="%(levelname)s: %(message)s",
level=logging.WARNING,
)
main()