-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathncbi_datasets_resolver.py
146 lines (121 loc) · 4.43 KB
/
ncbi_datasets_resolver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import json
import os
from Bio import SeqIO
top = r'/mnt/d/new_ncbi_dataset/genomes_store'
rec = r'/mnt/d/new_ncbi_dataset/genomes_rec'
# top = r'D:\new_ncbi_dataset\genomes'
def print_ass_json(ass_json : dict):
# recursively print an assembly json information
for k, v in ass_json.items():
if isinstance(v, dict):
print_ass_json(v)
else:
print(str(k) + ': ' + str(v))
def resolve_ass_json(file_path):
json_file = open(file_path)
ass_json_lis = []
for line in json_file.readlines():
dic = json.loads(line)
ass_json_lis.append(dic)
return ass_json_lis
def get_ass_list_from_json(ass_json_lis):
# accepts a parsed json list of multiple assemblies
ass_acc_lis = []
for ass in ass_json_lis:
ass_acc_lis.append(ass['assemblyInfo']['assemblyAccession'])
return ass_acc_lis
def ass2lineage(ass) -> str:
ass_line_dic = {}
# store_top = r'D:\new_ncbi_dataset\genomes_store'
store_top = r'/mnt/d/new_ncbi_dataset/genomes_store'
lineage_lis = os.listdir(store_top)
for lineage in lineage_lis:
ass_top_path = os.path.join(store_top, lineage, 'ncbi_dataset', 'data')
ass_lis = os.listdir(ass_top_path)
ass_line_dic[lineage] = ass_lis
for lineage in ass_line_dic.keys():
if ass in ass_line_dic[lineage]:
return lineage
def list_fna_file(top):
assembly = os.listdir(top)
fna = [file for file in assembly if '.fna' in file]
return fna
def list_lineage(top):
# top: genomes
# -(taxon1_taxid1)
# -(taxon2_taxid2)
species = os.listdir(top)
species_path = []
for sp in species:
sp = os.path.join(top, sp, 'ncbi_dataset', 'data')
species_path.append(sp)
return species_path
def is_chr(fna):
for i in fna :
if ('chr' in i) and ('Pl' not in i):
return True
return False
def get_chr_len_list(ass_path):
chr_len_list = []
files = list_fna_file(ass_path)
for file in files:
# remove 'chrPltd.unlocalized.scaf.fna'
if ('chr' in file) and ('Pl' not in file):
if 'unlocal' in file:
un_chr = SeqIO.to_dict(SeqIO.parse(os.path.join(ass_path, file), "fasta"))
chr_len = 0
for id, record in un_chr.items():
chr_len += len(record.seq)
else:
chr = SeqIO.read(os.path.join(ass_path, file), "fasta")
chr_len = len(chr.seq)
chr_len_list.append(chr_len)
return chr_len_list
def select_unplace_with_chr(ass_path):
# not use because min_contig
seq = os.path.join(ass_path, 'unplaced.scaf.fna')
unpl_seq = SeqIO.to_dict((SeqIO.parse(seq, "fasta")))
unpl_seq = dict(sorted(unpl_seq.items(), key = lambda item : len(item[1].seq), reverse = True))
chr_len_list = get_chr_len_list(ass_path)
len_list = []
len_dict = {}
for id, record in unpl_seq.items():
if len(record.seq) > 20000:
len_list.append(len(record.seq))
len_dict[id] = record
def cat_fna(fna : list):
# remove non-genome part
if 'cds_from_genomic.fna' in fna:
fna.remove('cds_from_genomic.fna')
if 'rna.fna' in fna:
fna.remove('rna.fna')
# cat chrs
cat_fna_lis = str()
for i in fna:
cat_fna_lis += (i + ', ')
if cat_fna_lis != '' and len(cat_fna_lis) > 1:
cat_fna_lis = cat_fna_lis.strip(', ').replace(',', ' ')
os.system("cat %s > genome.fna" % (cat_fna_lis))
def main():
ass_cnt = 0
lineage_list = list_lineage(top)
for lineage in lineage_list:
ass_json_lis = resolve_ass_json(os.path.join(lineage, 'assembly_data_report.jsonl'))
ass_list = get_ass_list_from_json(ass_json_lis)
for ass in ass_list:
ass_path = os.path.join(lineage, ass)
fna_list = list_fna_file(ass_path)
os.chdir(ass_path)
print("ass_path: " + str(ass_path))
cat_fna(fna_list)
print("cated!")
os.chdir(lineage.replace("genomes_store", "genomes_rec").replace("ncbi_dataset/data", ''))
os.system("mkdir %s" % ass)
os.chdir(ass)
os.system("pwd")
os.system("mv %s/genome.fna ." % ass_path)
ass_cnt += 1
print("ass_cnt" + str(ass_cnt) + '\t' + " comp: " + str(ass_cnt/300*100) + "%")
print()
if __name__ == '__main__':
main()