-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathname_string_indices.py
105 lines (75 loc) · 2.85 KB
/
name_string_indices.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from __future__ import print_function
import json
import os.path
import shutil
from pyspark import SparkContext
from pyspark.sql import SparkSession
from settings import parse_spark, csvs_dir, mysql_export_dir
output_dir_name_string_indices = os.path.join(csvs_dir, "name_string_indices")
def cleanup():
shutil.rmtree(output_dir_name_string_indices, ignore_errors=True)
def nullable(value):
return value if value else 'NULL'
def name_string_index_result(value):
name_json, rec = value
return [
str(rec['data_source_id'])
, name_json['name_string_id']
, nullable(rec['url']).replace('\t', ' ')
, nullable(rec['taxon_id'])
, nullable(rec['global_id'])
, nullable(rec['local_id'])
, str(nullable(rec['nomenclatural_code_id']))
, nullable(rec['rank'])
, nullable(rec['classification_path'])
, nullable(rec['classification_path_ids'])
, nullable(rec['classification_path_ranks'])
]
empty_accepted_columns = ['NULL', 'NULL', 'NULL']
def is_synonym(value):
name_json, rec = value
return rec['accepted_taxon_id'] != rec['taxon_id']
def add_accepted_data_to_synonym_name(value):
key, (syn, acpt) = value
if acpt:
acpt_json, acpt_rec = acpt
output_acpt = [
acpt_rec['taxon_id']
, nullable(acpt_rec['name'])
, nullable(acpt_json['name_string_id'])
]
else:
output_acpt = empty_accepted_columns
return name_string_index_result(syn) + output_acpt
def add_accepted_data_accepted_name(value):
return name_string_index_result(value) + empty_accepted_columns
def to_key_value(value):
name_json, rec = value
key = (rec['accepted_taxon_id'], rec['data_source_id'])
return key, value
def join_fields(fields):
assert len(fields) == 14
return '\t'.join(fields)
def main():
cleanup()
sc = SparkContext()
spark = SparkSession(sc)
path = os.path.join(mysql_export_dir, "name_string_indices.tsv")
df = spark.read.csv(path, header=True, inferSchema=True, sep='\t', nullValue='NULL')
names = df.select('name').rdd.map(lambda r: r['name'])
names_json = parse_spark(sc, names) \
.map(json.loads) \
.zip(df.rdd)
synonym_names = names_json.filter(lambda n: is_synonym(n))
accepted_names = names_json.filter(lambda n: not is_synonym(n))
synonym_names_with_accepted_columns = synonym_names \
.map(to_key_value) \
.leftOuterJoin(accepted_names.map(to_key_value)) \
.map(add_accepted_data_to_synonym_name)
accepted_names_with_accepted_columns = accepted_names \
.map(add_accepted_data_accepted_name)
sc.union([synonym_names_with_accepted_columns, accepted_names_with_accepted_columns]) \
.map(join_fields) \
.saveAsTextFile(output_dir_name_string_indices)
if __name__ == "__main__":
main()