-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathanalyze.py
89 lines (69 loc) · 2.49 KB
/
analyze.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import json
import os
import argparse
from tasks.metric_completeness import *
from tasks.metric_correctness import *
from tasks.metric_consistency import *
from tasks.entry_count import *
from datetime import datetime
from lib.data_processor import DataProcessor
class Analyzer(DataProcessor):
"""Runs specified tasks for data analyzing and stores the results"""
REPO_FILE = "repo/quality.json"
METRICS = {
"MinLength": MinLength(),
"MinValue": MinValue(),
"NotNull": NotNull(),
"MinPopulation": MinPopulation(),
"MinObject": MinObject(),
"Completeness" : Completeness(),
"CorrectValue": CorrectValue(),
"Correctness": Correctness(),
"UniqueValue": UniqueValue(),
"NoContradict": NoContradict(),
"UniqueObject": UniqueObject(),
"Consistency": Consistency()
}
METRICS_COMPLETE = [
"Completeness",
"Correctness",
"Consistency"
]
def _run(self, dataFrames, config, spark):
results = {}
# run all metrics
if "complete" == config.metrics[0]:
print("running complete metric stack: ", list(Analyzer.METRICS.keys()))
for name in Analyzer.METRICS_COMPLETE:
metric = Analyzer.METRICS[name]
metric_results = metric.calc(dataFrames, spark)
results.update(metric_results)
# run given metrics
else:
for name, metric in Analyzer.METRICS.items():
if name in config.metrics:
metric_results = metric.calc(dataFrames, spark)
results.update(metric_results)
print(results)
# save results
repo = []
if os.path.exists(Analyzer.REPO_FILE):
with open(Analyzer.REPO_FILE, 'r', encoding='utf-8') as repoIn:
repo = json.load(repoIn)
time = datetime.now().strftime("%d/%m/%Y %H:%M:%S ") + ",".join(config.metrics)
repo.append({
"time": datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"metrics": ",".join(config.metrics),
"chain": config.chain,
"counts": CountEntries().calc(dataFrames, spark),
"indicators": results
})
with open(Analyzer.REPO_FILE, 'w') as outFile:
json_string = json.dumps(repo, indent=4)
outFile.write(json_string)
if "__main__" == __name__:
# init parameters
parser = argparse.ArgumentParser(prog='Data Analyzer', description='Run metrics')
parser.add_argument('-m', '--metrics', help='names of metrics to run', nargs="+", default=['complete'])
parser.add_argument('-c', '--chain', help='the source data related to the transformation chain', default='initial')
Analyzer().run(parser.parse_args())