forked from UtrechtUniversity/yoda-ruleset
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathyc2Vault.r
388 lines (363 loc) · 14.7 KB
/
yc2Vault.r
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# \file
# \brief move selected datasets from intake area to the vault area
# this rule is to be executed by a background process with write access to vault
# and read access to the intake area
# \author Ton Smeele
# \copyright Copyright (c) 2015, Utrecht university. All rights reserved
# \license GPLv3, see LICENSE
#
#test {
# *intakeRoot = '/nluu1ot/home/grp-intake-youth';
# *vaultRoot = '/nluu1ot/home/grp-vault-youth';
# uuYc2Vault(*intakeRoot, *vaultRoot, *status);
# writeLine("serverLog","result status of yc2Vault is *status");
#}
# \brief
#
# \param[in] path pathname of the tree-item
# \param[in] name segment of path, name of collection or data object
# \param[in] isCol true if the object is a collection, otherwise false
# \param[in,out] buffer
#
#uuTreeMyRule(*parent, *objectName, *isCol, *buffer) {
# writeLine("serverLog","parent = *parent");
# writeLine("serverLog","name = *objectName");
# writeLine("serverLog","isCol = *isCol");
# writeLine("serverLog","buffer[path]= " ++ *buffer."path");
# if (*isCol) {
# *buffer."path" = *buffer."path"++"=";
# }
#}
uuYcVaultDatasetGetPath(*vaultRoot, *datasetId, *datasetPath) {
uuYcDatasetParseId(*datasetId, *datasetComponents);
*wave = *datasetComponents."wave";
*experimentType = *datasetComponents."experiment_type";
*pseudocode = *datasetComponents."pseudocode";
*version = *datasetComponents."version";
*sep = "_";
*wepv = *wave ++ *sep ++ *experimentType ++ *sep ++ *pseudocode ++ *sep ++ "ver*version";
*datasetPath = "*vaultRoot/*wave/*experimentType/*pseudocode/*wepv";
}
uuYcVaultDatasetExists(*vaultRoot, *datasetId, *exists) {
*exists = false;
uuYcVaultDatasetGetPath(*vaultRoot, *datasetId, *datasetPath);
foreach (*row in SELECT COLL_NAME WHERE COLL_NAME = '*datasetPath') {
*exists = true;
break;
}
}
uuYcVaultDatasetAddMeta(*vaultPath, *datasetId) {
uuYcDatasetParseId(*datasetId, *datasetComponents);
*wave = *datasetComponents."wave";
*experimentType = *datasetComponents."experiment_type";
*pseudocode = *datasetComponents."pseudocode";
*version = *datasetComponents."version";
msiGetIcatTime(*date, "unix");
msiAddKeyVal(*kv, "wave", *wave);
msiAddKeyVal(*kv, "experiment_type", *experimentType);
msiAddKeyVal(*kv, "pseudocode", *pseudocode);
msiAddKeyVal(*kv, "version", *version);
msiAddKeyVal(*kv, "dataset_date_created", *date);
msiAssociateKeyValuePairsToObj(*kv, *vaultPath, "-C");
}
uuYcVaultWalkRemoveObject(*itemParent, *itemName, *itemIsCollection, *buffer, *status) {
# writeLine("serverLog", "...removing *itemParent/*itemName");
if (*itemIsCollection) {
msiRmColl("*itemParent/*itemName", "forceFlag=", *status);
} else {
msiDataObjUnlink("objPath=*itemParent/*itemName++++forceFlag=", *status);
}
}
uuYcVaultIngestObject(*objectPath, *isCollection, *vaultPath, *status) {
# from the original object only the below list '*copiedMetadata' of metadata keys
# is copied to the vault object, other info is ignored
*copiedMetadata = list("wave", "experiment_type", "pseudocode", "version",
"error", "warning", "comment", "dataset_error",
"dataset_warning", "datasetid");
*status = 0;
if (*isCollection) {
msiCollCreate(*vaultPath, "1", *status);
if (*status == 0) {
foreach (*row in SELECT META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE
WHERE COLL_NAME = '*objectPath'
) {
msiGetValByKey(*row, "META_COLL_ATTR_NAME", *key);
msiGetValByKey(*row, "META_COLL_ATTR_VALUE", *value);
msiString2KeyValPair("*key=*value",*kv);
# add relevant kvlist to vault collection object
foreach (*meta in *copiedMetadata) {
if (*key == *meta) {
msiAssociateKeyValuePairsToObj(*kv, *vaultPath, "-C");
}
}
}
foreach (*row in SELECT COLL_OWNER_NAME, COLL_OWNER_ZONE, COLL_CREATE_TIME
WHERE COLL_NAME = '*objectPath'
) {
msiGetValByKey(*row, "COLL_OWNER_NAME", *ownerName);
msiGetValByKey(*row, "COLL_OWNER_ZONE", *ownerZone);
msiGetValByKey(*row, "COLL_CREATE_TIME", *createTime);
msiString2KeyValPair("submitted_by=*ownerName#*ownerZone",*kvSubmittedBy);
msiString2KeyValPair("submitted_date=*createTime",*kvSubmittedDate);
msiAssociateKeyValuePairsToObj(*kvSubmittedBy, *vaultPath, "-C");
msiAssociateKeyValuePairsToObj(*kvSubmittedDate, *vaultPath, "-C");
}
}
} else { # its not a collection but a data object
# first chksum the original file, then use it to verify the vault copy
msiDataObjChksum(*objectPath, "forceChksum=", *checksum);
msiDataObjCopy(*objectPath, *vaultPath, "verifyChksum=", *status);
if (*status == 0) {
uuChopPath(*objectPath, *collection, *dataName);
foreach (*row in SELECT META_DATA_ATTR_NAME, META_DATA_ATTR_VALUE
WHERE COLL_NAME = '*collection'
AND DATA_NAME = '*dataName'
) {
msiGetValByKey(*row, "META_DATA_ATTR_NAME", *key);
msiGetValByKey(*row, "META_DATA_ATTR_VALUE", *value);
# add relevant kvlist to vault collection object
msiString2KeyValPair("*key=*value",*kv);
foreach (*meta in *copiedMetadata) {
if (*key == *meta) {
msiAssociateKeyValuePairsToObj(*kv, *vaultPath, "-d");
}
}
}
# add metadata found in system info
foreach (*row in SELECT DATA_OWNER_NAME, DATA_OWNER_ZONE, DATA_CREATE_TIME
WHERE COLL_NAME = '*collection'
AND DATA_NAME = '*dataName'
) {
msiGetValByKey(*row, "DATA_OWNER_NAME", *ownerName);
msiGetValByKey(*row, "DATA_OWNER_ZONE", *ownerZone);
msiGetValByKey(*row, "DATA_CREATE_TIME", *createTime);
msiString2KeyValPair("submitted_by=*ownerName#*ownerZone",*kvSubmittedBy);
msiString2KeyValPair("submitted_date=*createTime",*kvSubmittedDate);
msiAssociateKeyValuePairsToObj(*kvSubmittedBy, *vaultPath, "-d");
msiAssociateKeyValuePairsToObj(*kvSubmittedDate, *vaultPath, "-d");
# Skip duplicas
break;
}
}
}
}
uuYcVaultWalkIngestObject(*itemParent, *itemName, *itemIsCollection, *buffer, *status) {
*sourcePath = "*itemParent/*itemName";
*destPath = *buffer."destination"; # top level destination is specified
if (*sourcePath != *buffer."source") {
# rewrite path to copy objects that are located underneath the toplevel collection
*sourceLength = strlen(*sourcePath);
*relativePath = substr(*sourcePath, strlen(*buffer."source") + 1, *sourceLength);
*destPath = *buffer."destination" ++ "/" ++ *relativePath;
}
# writeLine("serverLog","VLT from = *sourcePath");
# writeLine("serverLog","VLT to = *destPath");
uuYcVaultIngestObject(*sourcePath, *itemIsCollection, *destPath, *status);
}
uuYcDatasetCollectionMove2Vault(*intakeRoot,*topLevelCollection, *datasetId, *vaultRoot, *status) {
writeLine("serverLog","\nmoving dataset-typeA *datasetId from *topLevelCollection to vault");
*status = 0;
uuYcVaultDatasetExists(*vaultRoot, *datasetId, *exists);
if (!*exists) {
uuYcVaultDatasetGetPath(*vaultRoot, *datasetId, *vaultPath);
# create the in-between levels of the path to the toplevel collection
uuChopPath(*vaultPath, *vaultParent, *vaultCollection);
msiCollCreate(*vaultParent, "1", *status);
# writeLine("serverLog","VAULT: dataset created *datasetId status=*status path=*vaultPath");
if (*status == 0) {
# copy the dataset tree to the vault
uuChopPath(*topLevelCollection, *intakeParent, *intakeCollection);
*buffer."source" = *topLevelCollection;
*buffer."destination" = *vaultPath;
# writeLine("serverLog","VAULT: source = *topLevelCollection");
# writeLine("serverLog","VAULT: dest = *vaultPath");
uuTreeWalk(
"forward",
*topLevelCollection,
"uuYcVaultWalkIngestObject",
*buffer,
*status
);
uuKvClear(*buffer);
if (*status == 0) {
# stamp the vault dataset collection with additional metadata
msiGetIcatTime(*date, "unix");
msiAddKeyVal(*kv, "dataset_date_created", *date);
msiAssociateKeyValuePairsToObj(*kv, *vaultPath, "-C");
# and finally remove the dataset original in the intake area
msiRmColl(*topLevelCollection, "forceFlag=", *error);
# uuTreeWalk(
# "reverse",
# *topLevelCollection,
# "uuYcVaultWalkRemoveObject",
# *buffer,
# *error
# );
if (*error != 0) {
writeLine("serverLog",
"ERROR: unable to remove intake collection *topLevelCollection");
}
} else {
# move failed (partially), cleanup vault
# NB: keep the dataset in the vault queue so we can retry some other time
writeLine("serverLog","ERROR: Ingest failed for *datasetId error = *status");
uuTreeWalk("reverse", *vaultPath, "uuYcVaultWalkRemoveObject", *buffer, *error);
}
}
} else {
writeLine("serverLog","INFO: version already exists in vault: *datasetId");
# duplicate dataset, signal error and throw out of vault queue
*message = "Duplicate dataset, version already exists in vault";
uuYcDatasetErrorAdd(*intakeRoot, *datasetId,*message);
uuYcDatasetMelt(*topLevelCollection, *datasetId, *status);
uuYcDatasetUnlock(*topLevelCollection, *datasetId, *status);
*status = 1; # duplicate dataset version error
}
}
uuYcDatasetObjectsOnlyMove2Vault(*intakeRoot, *topLevelCollection, *datasetId, *vaultRoot, *status) {
writeLine("serverLog","\nmoving dataset-typeB *datasetId from *topLevelCollection to vault");
uuYcVaultDatasetExists(*vaultRoot, *datasetId, *exists);
if (!*exists) {
# new dataset(version) we can safely ingest into vault
uuYcVaultDatasetGetPath(*vaultRoot, *datasetId, *vaultPath);
# create path to and including the toplevel collection (will create in-between levels)
msiCollCreate(*vaultPath, "1", *status);
# writeLine("serverLog","VAULT: dataset created *datasetId status=*status path=*vaultPath");
if (*status == 0) {
# stamp the vault dataset collection with default metadata
uuYcVaultDatasetAddMeta(*vaultPath, *datasetId);
# copy data objects to the vault
foreach (*dataRow in SELECT DATA_NAME
WHERE COLL_NAME = '*topLevelCollection'
AND META_DATA_ATTR_NAME = 'dataset_toplevel'
AND META_DATA_ATTR_VALUE = '*datasetId'
) {
msiGetValByKey(*dataRow, "DATA_NAME", *dataName);
*intakePath = "*topLevelCollection/*dataName";
uuYcVaultIngestObject(*intakePath, false, "*vaultPath/*dataName", *status);
if (*status != 0) {
break;
}
}
if (*status == 0) {
# data ingested, what's left is to delete the original in intake area
# this will also melt/unfreeze etc because metadata is removed too
foreach (*dataRow in SELECT DATA_NAME
WHERE COLL_NAME = '*topLevelCollection'
AND META_DATA_ATTR_NAME = 'dataset_toplevel'
AND META_DATA_ATTR_VALUE = '*datasetId'
) {
msiGetValByKey(*dataRow, "DATA_NAME", *dataName);
*intakePath = "*topLevelCollection/*dataName";
# writeLine("serverLog","removing intake file: *intakePath");
msiDataObjUnlink("objPath=*intakePath++++forceFlag=", *error);
if (*error != 0) {
writeLine("serverLog","ERROR: unable to remove intake object *intakePath");
}
}
} else {
# error occurred during ingest, cleanup vault area and relay the error to user
# NB: keep the dataset in the vault queue so we can retry some other time
writeLine("serverLog","ERROR: Ingest failed for *datasetId error = *status");
*buffer = "required yet dummy parameter";
uuTreeWalk("reverse", *vaultPath, "uuYcVaultWalkRemoveObject", *buffer, *error);
}
}
} else {
# duplicate dataset, signal error and throw out of vault queue
writeLine("serverLog","INFO: version already exists in vault: *datasetId");
*message = "Duplicate dataset, version already exists in vault";
uuYcDatasetErrorAdd(*intakeRoot, *datasetId,*message);
uuYcDatasetMelt(*topLevelCollection, *datasetId, *status);
uuYcDatasetUnlock(*topLevelCollection, *datasetId, *status);
*status = 1; # duplicate dataset version error
}
}
# \brief move all locked datasets to the vault
#
# \param[in] intakeCollection pathname root of intake area
# \param[in] vaultCollection pathname root of vault area
# \param[out] status result of operation either "ok" or "error"
#
uuYc2Vault(*intakeRoot, *vaultRoot, *status) {
# 1. add to_vault_freeze metadata lock to the dataset
# 2. check that dataset does not yet exist in the vault
# 3. copy dataset to vault with its metadata
# 4. remove dataset from intake
# upon any error:
# - delete partial data from vault
# - add error to intake dataset metadata
# - remove locks on intake dataset (to_vault_freeze, to_vault_lock)
*status = 0; # 0 is success, nonzero is error
*datasets_moved = 0;
# note that we have to allow for multiple types of datasets:
# type A: a single toplevel collection with a tree underneath
# type B: one or more data files located within the same collection
# processing varies slightly between them, so process each type in turn
#
# TYPE A:
foreach (*row in SELECT COLL_NAME, META_COLL_ATTR_VALUE
WHERE META_COLL_ATTR_NAME = 'dataset_toplevel'
AND COLL_NAME like '*intakeRoot/%') {
msiGetValByKey(*row, "COLL_NAME", *topLevelCollection);
msiGetValByKey(*row, "META_COLL_ATTR_VALUE", *datasetId);
uuYcObjectIsLocked(*topLevelCollection, true, *locked, *frozen);
if (*locked) {
uuYcDatasetFreeze(*topLevelCollection, *datasetId, *status);
if (*status == 0) {
# dataset frozen; now move to vault and remove from intake area
uuYcDatasetCollectionMove2Vault(
*intakeRoot,
*topLevelCollection,
*datasetId,
*vaultRoot,
*status
);
if (*status == 0) {
*datasets_moved = *datasets_moved + 1;
}
}
}
}
# TYPE B:
foreach (*row in SELECT COLL_NAME, META_DATA_ATTR_VALUE
WHERE META_DATA_ATTR_NAME = 'dataset_toplevel'
AND COLL_NAME like '*intakeRoot%'
# fixme: skip collnames that are not in the same tree yet share the prefix
) {
msiGetValByKey(*row, "COLL_NAME", *topLevelCollection);
msiGetValByKey(*row, "META_DATA_ATTR_VALUE", *datasetId);
# check if to_vault_lock exists on all the dataobjects of this dataset
*allLocked = true;
foreach (*dataRow in SELECT DATA_NAME
WHERE COLL_NAME = '*topLevelCollection'
AND META_DATA_ATTR_NAME = 'dataset_toplevel'
AND META_DATA_ATTR_VALUE = '*datasetId'
) {
msiGetValByKey(*dataRow, "DATA_NAME", *dataName);
uuYcObjectIsLocked("*topLevelCollection/*dataName", false, *locked, *frozen);
*allLocked = *allLocked && *locked;
}
if (*allLocked) {
uuYcDatasetFreeze(*topLevelCollection, *datasetId, *status);
if (*status == 0) {
# dataset frozen, now move to fault and remove from intake area
uuYcDatasetObjectsOnlyMove2Vault(
*intakeRoot,
*topLevelCollection,
*datasetId,
*vaultRoot,
*status
);
if (*status == 0) {
*datasets_moved = *datasets_moved + 1;
}
}
}
}
if (*datasets_moved > 0) {
writeLine("serverLog","\nmoved in total *datasets_moved dataset(s) to the vault");
}
}
#input null
#output ruleExecOut