-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathimport_drug_stats.exs
104 lines (90 loc) · 2.67 KB
/
import_drug_stats.exs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
require Logger
alias Risteys.{FGEndpoint, Repo, ATCDrug, DrugStats}
Logger.configure(level: :info)
[drug_descs_path, drug_scores_path | _] = System.argv()
# ATC drug descriptions
Logger.info("Inserting/Updating drug descriptions")
drug_descs_path
|> File.stream!()
|> CSV.decode!(headers: true)
|> Enum.each(fn %{"code" => atc, "desc" => description} ->
atc_drug =
case Repo.get_by(ATCDrug, atc: atc) do
nil -> %ATCDrug{}
existing -> existing
end
{status, changeset} =
atc_drug
|> ATCDrug.changeset(%{atc: atc, description: description})
|> Repo.insert_or_update()
case status do
:ok -> Logger.debug("insert/update ok")
:error -> Logger.warning(inspect(changeset))
end
end)
# Drug Stats
Logger.info("Inserting/Updating drug stats")
drug_scores_path
|> File.stream!()
|> CSV.decode!(headers: true)
|> Stream.reject(fn %{"score" => score,
"stderr" => stderr,
"endpoint" => endpoint,
"drug" => atc,
"pvalue" => pvalue} ->
is_nan = score == "nan" or stderr == "nan" or pvalue == "nan"
if is_nan do
Logger.warning(
"Drug score with NaN value for #{endpoint}/#{atc}: score:#{score} ; stderr:#{stderr} ; pvalue:#{pvalue}"
)
end
is_nan
end)
|> Stream.reject(fn %{"endpoint" => endpoint, "drug" => atc, "n_indivs" => n_indivs} ->
if n_indivs < 6 do
Logger.warning("Reject entry for #{endpoint}/#{atc} with indidivual level-data N=#{n_indivs}")
true
else
false
end
end)
|> Enum.each(fn %{
"endpoint" => endpoint,
"drug" => atc,
"score" => score,
"stderr" => stderr,
"pvalue" => pvalue,
"n_indivs" => n_indivs
} ->
Logger.debug("Data for #{endpoint} / #{atc}")
endpoint = Repo.get_by(FGEndpoint.Definition, name: endpoint)
atc_drug = Repo.get_by(ATCDrug, atc: atc)
if not is_nil(endpoint) and not is_nil(atc_drug) do
drug_stats =
case Repo.get_by(DrugStats,
fg_endpoint_id: endpoint.id,
atc_id: atc_drug.id
) do
nil -> %DrugStats{}
existing -> existing
end
drug_stats =
drug_stats
|> DrugStats.changeset(%{
fg_endpoint_id: endpoint.id,
atc_id: atc_drug.id,
score: score |> String.to_float(),
stderr: stderr |> String.to_float(),
pvalue: pvalue |> String.to_float(),
n_indivs: n_indivs |> String.to_integer()
})
|> Repo.insert_or_update()
case drug_stats do
{:ok, _} ->
Logger.debug("insert/update ok")
{:error, changeset} ->
Logger.warning(inspect(changeset))
end
end
end)
Logger.info("Import done.")