forked from CMSCompOps/WmAgentScripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcloseOutWorkflows.py
executable file
·309 lines (283 loc) · 12.7 KB
/
closeOutWorkflows.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
#!/usr/bin/env python
import json
import urllib2,urllib, httplib, sys, re, os
from xml.dom.minidom import getDOMImplementation
import dbs3Client, reqMgrClient, phedexClient
"""
Close out script:
gathers all completed scripts and check one by one if it is ready for
closing out:
- Has the expected number of events
- Datasets are properly registered on Phedex
- Datasets are healthy (no duplicate lumis)
This uses DBS3 client, reqMgrClient and phedexClient now instead of dbsTest.py
and phedexSubscription.py.
For running the previous version look for closeOutScript_leg.py
"""
def getOverviewRequestsWMStats(url):
"""
Retrieves workflows overview from WMStats
by querying couch db JSON direcly
"""
conn = httplib.HTTPSConnection(url, cert_file = os.getenv('X509_USER_PROXY'),
key_file = os.getenv('X509_USER_PROXY'))
conn.request("GET",
"/couchdb/wmstats/_design/WMStats/_view/requestByStatusAndType?stale=update_after")
response = conn.getresponse()
data = response.read()
conn.close()
myString=data.decode('utf-8')
workflows=json.loads(myString)['rows']
return workflows
def classifyCompletedRequests(url, requests):
"""
Sorts completed requests using the type.
returns a dic cointaining a list for each
type of workflows.
"""
workflows={'ReDigi':[],'MonteCarloFromGEN':[],'MonteCarlo':[] , 'ReReco':[], 'LHEStepZero':[]}
for request in requests:
name=request['id']
#if a wrong or weird name
if len(request['key'])<3:
print request
continue
status=request['key'][1]
#only completed requests
if status=='completed':
requestType=request['key'][2]
#sort by type
if requestType=='MonteCarlo':
#MonteCarlo's which datasets end with /GEN
#are Step0
datasets = reqMgrClient.outputdatasetsWorkflow(url, name)
m = re.search('.*/GEN$', datasets[0])
if m:
workflows['LHEStepZero'].append(name)
else:
workflows[requestType].append(name)
elif requestType in ['MonteCarloFromGEN', 'LHEStepZero', 'ReDigi', 'ReReco']:
workflows[requestType].append(name)
return workflows
def closeOutReRecoWorkflows(url, workflows):
"""
Closeout ReReco workflows
"""
noSiteWorkflows = []
for workflow in workflows:
if 'RelVal' in workflow:
continue
if 'TEST' in workflow:
continue
datasets = reqMgrClient.outputdatasetsWorkflow(url, workflow)
inputDataset = reqMgrClient.getInputDataSet(url, workflow)
closeOutWorkflow = True
#check if dataset is ready
for dataset in datasets:
duplicate = False
closeOutDataset = True
percentage = percentageCompletion(url, workflow, dataset)
phedexSubscription = phedexClient.hasCustodialSubscription(dataset)
closeOutDataset = False
#dataset can be closed out only with 100% of events
if percentage == 1 and phedexSubscription and not duplicate:
closeOutDataset = True
else:
closeOutDataset = False
#validate when percentage is ok but has not phedex subscription
if percentage == 1 and not phedexSubscription:
noSiteWorkflows.append(workflow)
#if at least one dataset is not ready wf cannot be closed out
closeOutWorkflow = closeOutWorkflow and closeOutDataset
print '| %80s | %100s | %4s | %5s| %3s | %5s|%5s| ' % (workflow, dataset,str(int(percentage*100)),
str(phedexSubscription), 100, duplicate, closeOutDataset)
#workflow can only be closed out if all datasets are ready
if closeOutWorkflow:
reqMgrClient.closeOutWorkflowCascade(url, workflow)
print '-'*180
return noSiteWorkflows
def closeOutRedigiWorkflows(url, workflows):
"""
Closes out a list of redigi workflows
"""
noSiteWorkflows = []
for workflow in workflows:
closeOutWorkflow = True
inputDataset = reqMgrClient.getInputDataSet(url, workflow)
datasets = reqMgrClient.outputdatasetsWorkflow(url, workflow)
for dataset in datasets:
closeOutDataset = False
percentage = percentageCompletion(url, workflow, dataset)
phedexSubscription = phedexClient.hasCustodialSubscription(dataset)
duplicate = None
# if dataset has subscription and more than 95% events we check
# duplicates
if phedexSubscription and percentage >= float(0.95):
duplicate = dbs3Client.duplicateRunLumi(dataset)
#if not duplicate events, dataset is ready
if not duplicate:
closeOutDataset = True
else:
closeOutDataset = False
#validate when percentage is ok but has not phedex subscription
if percentage >= float(0.95) and not phedexSubscription:
noSiteWorkflows.append(workflow)
#if at least one dataset is not ready wf cannot be closed out
closeOutWorkflow = closeOutWorkflow and closeOutDataset
print '| %80s | %100s | %4s | %5s| %3s | %5s|%5s| ' % (workflow, dataset,str(int(percentage*100)),
str(phedexSubscription), 100, duplicate, closeOutDataset)
#workflow can only be closed out if all datasets are ready
if closeOutWorkflow:
reqMgrClient.closeOutWorkflowCascade(url, workflow)
print '-'*180
return noSiteWorkflows
def closeOutMonterCarloRequests(url, workflows):
"""
Closes either montecarlo or montecarlo from gen
workflows
"""
noSiteWorkflows = []
for workflow in workflows:
datasets = reqMgrClient.outputdatasetsWorkflow(url, workflow)
closeOutWorkflow = True
#skip montecarlos on a special queue
if reqMgrClient.getRequestTeam(url, workflow) == 'analysis':
continue
for dataset in datasets:
closePercentage = 0.95
# validation for SMS montecarlos
if 'SMS' in dataset:
closePercentage= 1.00
percentage = percentageCompletion(url, workflow, dataset)
phedexSubscription = phedexClient.getCustodialMoveSubscriptionSite(dataset)
transPerc = 0
closedBlocks = None
duplicate = None
# if dataset has subscription and enough events we check
# duplicates, transfer percentage and closed blocks
if phedexSubscription and percentage >= float(closePercentage):
transPerc = phedexClient.getTransferPercentage(url, dataset, phedexSubscription)
duplicate = dbs3Client.duplicateLumi(dataset)
if not duplicate:
closeOutDataset = True
else:
closeOutDataset = False
else:
closeOutDataset = False
#validate when percentage is ok but has not phedex subscription
if percentage >= float(closePercentage) and not phedexSubscription:
noSiteWorkflows.append(workflow)
#if at least one dataset is not ready wf cannot be closed out
closeOutWorkflow = closeOutWorkflow and closeOutDataset
print '| %80s | %100s | %4s | %5s| %3s | %5s| %5s|' % (workflow, dataset,str(int(percentage*100)),
str(phedexSubscription), str(int(transPerc*100)), duplicate, closeOutDataset)
#workflow can only be closed out if all datasets are ready
if closeOutWorkflow:
reqMgrClient.closeOutWorkflowCascade(url, workflow)
#separation line
print '-'*180
return noSiteWorkflows
def closeOutStep0Requests(url, workflows):
"""
Closes either montecarlo step0 requests
"""
noSiteWorkflows = []
for workflow in workflows:
datasets = reqMgrClient.outputdatasetsWorkflow(url, workflow)
status = reqMgrClient.getWorkflowStatus(url, workflow)
#if not completed skip
if status != 'completed':
continue
closeOutWorkflow = True
#skip montecarlos on a special queue
if reqMgrClient.getRequestTeam(url, workflow) == 'analysis':
continue
for dataset in datasets:
closeOutDataset = False
percentage = percentageCompletion(url, workflow, dataset)
phedexSubscription = phedexClient.getCustodialMoveSubscriptionSite(dataset)
transPerc = 0
closedBlocks = None
duplicate = None
correctLumis = None
# if dataset has subscription and enough events we check
# duplicates, transfer percentage, closed blocks and lumis
if phedexSubscription and percentage >= float(0.95):
transPerc = phedexClient.getTransferPercentage(url, dataset, phedexSubscription)
duplicate = dbs3Client.duplicateLumi(dataset)
correctLumis = checkCorrectLumisEventGEN(dataset)
#TODO validate closed blocks
if not duplicate and correctLumis:
closeOutDataset = True
else:
closeOutDataset = False
#validate when percentage is ok but has not phedex subscription
if percentage >= float(0.95) and not phedexSubscription:
noSiteWorkflows.append(workflow)
#if at least one dataset is not ready wf cannot be closed out
closeOutWorkflow = closeOutWorkflow and closeOutDataset
print '| %80s | %100s | %4s | %5s| %3s | %5s| %5s| ' % (workflow, dataset,str(int(percentage*100)),
str(phedexSubscription), str(correctLumis), duplicate, closeOutDataset)
#workflow can only be closed out if all datasets are ready
if closeOutWorkflow:
reqMgrClient.closeOutWorkflowCascade(url, workflow)
print '-'*180
return noSiteWorkflows
def checkCorrectLumisEventGEN(dataset):
"""
Checks that the dataset has more than 300 events per lumi
"""
numlumis = dbs3Client.getLumiCountDataSet(dataset)
numEvents = dbs3Client.getEventCountDataSet(dataset)
# numEvents / numLumis >= 300
if numlumis >= numEvents / 300.0:
return True
else:
return False
def percentageCompletion(url, workflow, dataset):
"""
Calculates Percentage of completion for a given workflow
taking a particular output dataset
"""
inputEvents = reqMgrClient.getInputEvents(url, workflow)
outputEvents = reqMgrClient.getOutputEvents(url, workflow, dataset)
if inputEvents == 0:
return 0
if not outputEvents:
return 0
percentage = outputEvents/float(inputEvents)
return percentage
def listWorkflows(workflows):
for wf in workflows:
print wf
print '-'*150
def main():
url='cmsweb.cern.ch'
print "Gathering Requests"
requests = getOverviewRequestsWMStats(url)
print "Classifying Requests"
workflowsCompleted = classifyCompletedRequests(url, requests)
#print header
print '-'*220
print '| Request'+(' '*74)+'| OutputDataSet'+(' '*86)+'|%Compl|Subscr|Tran|Dupl|ClosOu|'
print '-'*220
noSiteWorkflows = closeOutReRecoWorkflows(url, workflowsCompleted['ReReco'])
workflowsCompleted['NoSite-ReReco'] = noSiteWorkflows
noSiteWorkflows = closeOutRedigiWorkflows(url, workflowsCompleted['ReDigi'])
workflowsCompleted['NoSite-ReDigi'] = noSiteWorkflows
noSiteWorkflows = closeOutMonterCarloRequests(url, workflowsCompleted['MonteCarlo'])
workflowsCompleted['NoSite-MonteCarlo'] = noSiteWorkflows
noSiteWorkflows = closeOutMonterCarloRequests(url, workflowsCompleted['MonteCarloFromGEN'])
workflowsCompleted['NoSite-MonteCarloFromGEN'] = noSiteWorkflows
noSiteWorkflows = closeOutStep0Requests(url, workflowsCompleted['LHEStepZero'])
workflowsCompleted['NoSite-LHEStepZero'] = noSiteWorkflows
print "MC Workflows for which couldn't find Custodial Tier1 Site"
output.write("<table border=1> <tr><th>MC Workflows for which couldn't find Custodial Tier1 Site</th></tr>")
listWorkflows(workflowsCompleted['NoSite-ReReco'])
listWorkflows(workflowsCompleted['NoSite-ReDigi'])
listWorkflows(workflowsCompleted['NoSite-MonteCarlo'])
listWorkflows(workflowsCompleted['NoSite-MonteCarloFromGEN'])
listWorkflows(workflowsCompleted['NoSite-LHEStepZero'])
sys.exit(0);
if __name__ == "__main__":
main()