forked from UtrechtUniversity/yoda-ruleset
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathintake_dataset.py
284 lines (225 loc) · 11.7 KB
/
intake_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# -*- coding: utf-8 -*-
"""Functions for intake datasets."""
__copyright__ = 'Copyright (c) 2019-2021, Utrecht University'
__license__ = 'GPLv3, see LICENSE'
import itertools
import genquery
from util import *
def intake_report_export_study_data(ctx, study_id):
""" Get the information for the export functionality
Retrieved metadata for a study:
- dataset_date_created
- wave
- version
- experiment_type
- pseudocode
- number of files
- total file size
:param ctx: Combined type of a callback and rei struct
:param study_id: Unique identifier op study
:returns: returns datasets
"""
zone = user.zone(ctx)
main_collection_iterator = genquery.row_iterator("COLL_NAME, COLL_PARENT_NAME, META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
" = '/{}/home/grp-vault-{}' AND META_COLL_ATTR_NAME IN ('dataset_id', 'dataset_date_created', 'wave', 'version', 'experiment_type', 'pseudocode')".format(zone, study_id),
genquery.AS_LIST, ctx)
subcollection_iterator = genquery.row_iterator("COLL_NAME, COLL_PARENT_NAME, META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
"COLL_NAME like '/{}/home/grp-vault-{}/%' AND META_COLL_ATTR_NAME IN ('dataset_id', 'dataset_date_created', 'wave', 'version', 'experiment_type', 'pseudocode')".format(zone, study_id),
genquery.AS_LIST, ctx)
datasets = {}
for row in itertools.chain(main_collection_iterator, subcollection_iterator):
path = row[0]
try:
datasets[path][row[2]] = row[3]
except KeyError:
datasets[path] = {row[2]: row[3]}
real_datasets = {}
for set_path in datasets:
if 'dataset_date_created' in datasets[set_path]:
real_datasets[set_path] = datasets[set_path]
# collect total file size and total amount of files
real_datasets[set_path]['totalFileSize'] = 0
real_datasets[set_path]['totalFiles'] = 0
# get the filesize and file count
stat_main_collection_iterator = genquery.row_iterator("count(DATA_ID), sum(DATA_SIZE)",
"COLL_NAME = '{}'".format(set_path),
genquery.AS_LIST, ctx)
stat_subcollection_iterator = genquery.row_iterator("count(DATA_ID), sum(DATA_SIZE)",
"COLL_NAME like '{}/%'".format(set_path),
genquery.AS_LIST, ctx)
for row in itertools.chain(stat_main_collection_iterator, stat_subcollection_iterator):
real_datasets[set_path]['totalFiles'] = int(row[0]) / 2
totalFileSize = 0
if row[1]:
totalFileSize = int(row[1])
real_datasets[set_path]['totalFileSize'] = totalFileSize / 2
return real_datasets
def intake_youth_get_datasets_in_study(ctx, study_id):
"""Get the of datasets (with relevant metadata) in a study.
Retrieved metadata:
- 'dataset_id'
- 'dataset_date_created'
- 'wave'
- 'version'
- 'experiment_type'
- 'pseudocode'
:param ctx: Combined type of a callback and rei struct
:param study_id: Unique identifier of study
:returns: Dict with datasets and relevant metadata.
"""
zone = user.zone(ctx)
main_collection_iterator = genquery.row_iterator("COLL_NAME, COLL_PARENT_NAME, META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
"COLL_NAME = '/{}/home/grp-vault-{}' AND META_COLL_ATTR_NAME IN ('dataset_id', 'dataset_date_created', 'wave', 'version', 'experiment_type', 'pseudocode')".format(zone, study_id),
genquery.AS_LIST, ctx)
subcollection_iterator = genquery.row_iterator("COLL_NAME, COLL_PARENT_NAME, META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
"COLL_NAME LIKE '/{}/home/grp-vault-{}/*' AND META_COLL_ATTR_NAME IN ('dataset_id', 'dataset_date_created', 'wave', 'version', 'experiment_type', 'pseudocode')".format(zone, study_id),
genquery.AS_LIST, ctx)
datasets = {}
# Construct all datasets.
for row in itertools.chain(main_collection_iterator, subcollection_iterator):
dataset = row[0]
attribute_name = row[2]
attribute_value = row[3]
if attribute_name in ['dataset_date_created', 'wave', 'version', 'experiment_type', 'pseudocode']:
if attribute_name in ['version', 'experiment_type']:
val = attribute_value.lower()
else:
val = attribute_value
try:
datasets[dataset][attribute_name] = val
except KeyError:
datasets[dataset] = {attribute_name: val}
return datasets
def intake_youth_dataset_counts_per_study(ctx, study_id):
""""Get the counts of datasets wave/experimenttype.
In the vault a dataset is always located in a folder.
Therefore, looking at the folders only is enough.
:param ctx: Combined type of a callback and rei struct
:param study_id: Unique identifier op study
:returns: Dict with counts of datasets wave/experimenttype
"""
datasets = intake_youth_get_datasets_in_study(ctx, study_id)
dataset_type_counts = {}
# Loop through datasets and count wave and experimenttype.
for dataset in datasets:
# Meta attribute 'dataset_date_created' defines that a folder holds a complete set.
if 'dataset_date_created' in datasets[dataset]:
type = datasets[dataset]['experiment_type']
wave = datasets[dataset]['wave']
version = datasets[dataset]['version']
try:
dataset_type_counts[type][wave][version] += 1
except KeyError:
if type not in dataset_type_counts:
dataset_type_counts[type] = {wave: {version: 1}}
elif wave not in dataset_type_counts[type]:
dataset_type_counts[type][wave] = {version: 1}
else:
dataset_type_counts[type][wave][version] = 1
return dataset_type_counts
def vault_aggregated_info(ctx, study_id):
"""Collects aggregated information for raw and processed datasets.
Collects the following information for RAW and PROCESSED datasets.
Including a totalisation of this all (raw/processed is kept in VERSION)
- Total datasets
- Total files
- Total file size
- File size growth in a month
- Datasets growth in a month
- Pseudocodes (distinct)
:param ctx: Combined type of a callback and rei struct
:param study_id: Unique identifier op study
:returns: Dict with aggregated information for raw and processed datasets
"""
datasets = intake_youth_get_datasets_in_study(ctx, study_id)
dataset_count = {'raw': 0, 'processed': 0}
dataset_growth = {'raw': 0, 'processed': 0}
dataset_file_count = {'raw': 0, 'processed': 0}
dataset_file_size = {'raw': 0, 'processed': 0}
dataset_file_growth = {'raw': 0, 'processed': 0}
dataset_pseudocodes = {'raw': [], 'processed': []}
# Determine full last month reference point
import time
from datetime import datetime, date, timedelta
last_day_of_prev_month = date.today().replace(day=1) - timedelta(days=1)
month = int(last_day_of_prev_month.strftime("%m"))
year = int(last_day_of_prev_month.strftime("%Y"))
last_month = int(time.time() - int(datetime(year, month, int(date.today().strftime("%d")), 0, 0, 0).strftime('%s')))
dataset_paths = []
for dataset in datasets:
# Meta attribute 'dataset_date_created' defines that a folder holds a complete set.
if 'dataset_date_created' in datasets[dataset]:
dataset_paths.append(dataset)
if datasets[dataset]['version'].lower() == 'raw':
version = 'raw'
else:
version = 'processed'
# if version in ['raw', 'processed']:
dataset_count[version] += 1
try:
date_created = int(datasets[dataset]['dataset_date_created'])
except Exception:
# This is nonsense and arose from an erroneous situation
date_created = last_month
if date_created - last_month >= 0:
dataset_growth[version] += 1
try:
pseudocode = datasets[dataset]['pseudocode']
if pseudocode not in dataset_pseudocodes[version]:
dataset_pseudocodes[version].append(pseudocode)
except KeyError:
continue
zone = user.zone(ctx)
main_collection_iterator = genquery.row_iterator("DATA_NAME, COLL_NAME, DATA_SIZE, COLL_CREATE_TIME",
"COLL_NAME = '/{}/home/grp-vault-{}'".format(zone, study_id),
genquery.AS_LIST, ctx)
subcollection_iterator = genquery.row_iterator("DATA_NAME, COLL_NAME, DATA_SIZE, COLL_CREATE_TIME",
"COLL_NAME like '/{}/home/grp-vault-{}/%'".format(zone, study_id),
genquery.AS_LIST, ctx)
for row in itertools.chain(main_collection_iterator, subcollection_iterator):
coll_name = row[1]
data_size = int(row[2])
coll_create_time = int(row[3])
# Check whether the file is part of a dataset.
part_of_dataset = False
for dataset in dataset_paths:
if dataset in coll_name:
part_of_dataset = True
break
# File is part of dataset.
if part_of_dataset:
# version = datasets[dataset]['version']
if datasets[dataset]['version'].lower() == 'raw':
version = 'raw'
else:
version = 'processed'
dataset_file_count[version] += 1
dataset_file_size[version] += data_size
if coll_create_time - last_month >= 0:
dataset_file_growth[version] += data_size
return {
'total': {
'totalDatasets': dataset_count['raw'] + dataset_count['processed'],
'totalFiles': dataset_file_count['raw'] + dataset_file_count['processed'],
'totalFileSize': dataset_file_size['raw'] + dataset_file_size['processed'],
'totalFileSizeMonthGrowth': dataset_file_growth['raw'] + dataset_file_growth['processed'],
'datasetsMonthGrowth': dataset_growth['raw'] + dataset_growth['processed'],
'distinctPseudoCodes': len(dataset_pseudocodes['raw']) + len(dataset_pseudocodes['processed']),
},
'raw': {
'totalDatasets': dataset_count['raw'],
'totalFiles': dataset_file_count['raw'],
'totalFileSize': dataset_file_size['raw'],
'totalFileSizeMonthGrowth': dataset_file_growth['raw'],
'datasetsMonthGrowth': dataset_growth['raw'],
'distinctPseudoCodes': len(dataset_pseudocodes['raw']),
},
'notRaw': {
'totalDatasets': dataset_count['processed'],
'totalFiles': dataset_file_count['processed'],
'totalFileSize': dataset_file_size['processed'],
'totalFileSizeMonthGrowth': dataset_file_growth['processed'],
'datasetsMonthGrowth': dataset_growth['processed'],
'distinctPseudoCodes': len(dataset_pseudocodes['processed']),
},
}