Skip to content

Commit

Permalink
Merge pull request #11487 from vkuznet/fix-issue-11426
Browse files Browse the repository at this point in the history
First implementation of MSPileup data placement logic via MSPileupTasks class
  • Loading branch information
amaltaro authored Feb 28, 2023
2 parents 39c4d5f + f9c2e57 commit de8ca32
Show file tree
Hide file tree
Showing 11 changed files with 1,001 additions and 43 deletions.
55 changes: 30 additions & 25 deletions src/python/WMCore/MicroService/MSPileup/DataStructs/MSPileupObj.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@
Description: MSPileupObj module provides MSPileup data structure:
{
"pileupName": string with the pileup dataset
"pileupType": string with a constant value
"insertTime": int, seconds since epoch in GMT timezone
"lastUpdateTime": int, seconds since epoch in GMT timezone
"expectedRSEs": ["Disk1", "Disk2", etc], # these values need to be validated against Rucio
"currentRSEs": ["Disk1", "Disk3"], # values provided by the MS itself
"fullReplicas": integer, # total number of replicas to keep on Disk
"campaigns": [ "name", ... ] # list of workflow campaigns using this pileup
"containerFraction": real number with the container fraction to be distributed (TBFD)
"replicationGrouping": string with a constant value (DATASET or ALL, to be in sync with Rucio)
"activatedOn": int, seconds since epoch in GMT timezone
"deactivatedOn": int, seconds since epoch in GMT timezone
"active": boolean,
"pileupSizeGB": integer, current size of the pileup in GB (integer)
"rulesList: list of strings (rules) used to lock the pileup id
"pileupName": string with the pileup dataset (madatory)
"pileupType": string with a constant value (mandatory)
"insertTime": int, seconds since epoch in GMT timezone (service-based)
"lastUpdateTime": int, seconds since epoch in GMT timezone (service-based)
"expectedRSEs": ["Disk1", "Disk2", etc], a non-empty list of strings with the RSE names (mandatory)
"currentRSEs": ["Disk1", "Disk3"], a list of strings with the RSE names (service-based)
"fullReplicas": integer, # total number of replicas to keep on Disk (optional)
"campaigns": [ "name", ... ] # list of workflow campaigns using this pileup (optional)
"containerFraction": real number with the container fraction to be distributed (optional)
"replicationGrouping": string with a constant value (DATASET or ALL, (optional)
"activatedOn": int, seconds since epoch in GMT timezone (service-based)
"deactivatedOn": int, seconds since epoch in GMT timezone (service-based)
"active": boolean, (mandatory)
"pileupSize": integer, current size of the pileup in bytes (service-based)
"ruleIds: list of strings (rules) used to lock the pileup id (service-based)
}
The data flow should be done via list of objects, e.g.
Expand All @@ -29,12 +29,12 @@
import json

# WMCore modules
from Utils.Timers import gmtimeSeconds
from WMCore.MicroService.Tools.Common import getMSLogger
from WMCore.Lexicon import dataset
from Utils.Timers import gmtimeSeconds


class MSPileupObj(object):
class MSPileupObj():
"""
MSPileupObj defines MSPileup data stucture
"""
Expand All @@ -44,22 +44,27 @@ def __init__(self, pdict, verbose=None, logger=None, validRSEs=None):
validRSEs = []
self.validRSEs = validRSEs

expectedRSEs = pdict['expectedRSEs']
if len(expectedRSEs) == 0:
msg = 'MSPileup document require non-empty list of expectedRSEs'
raise Exception(msg)

self.data = {
'pileupName': pdict.get('pileupName', ''),
'pileupType': pdict.get('pileupType', ''),
'pileupName': pdict['pileupName'],
'pileupType': pdict['pileupType'],
'insertTime': pdict.get('insertTime', gmtimeSeconds()),
'lastUpdateTime': pdict.get('lastUpdateTime', gmtimeSeconds()),
'expectedRSEs': pdict.get('expectedRSEs', []),
'expectedRSEs': expectedRSEs,
'currentRSEs': pdict.get('currentRSEs', []),
'fullReplicas': pdict.get('fullReplicas', 0),
'fullReplicas': pdict.get('fullReplicas', 1),
'campaigns': pdict.get('campaigns', []),
'containerFraction': pdict.get('containerFraction', 1.0),
'replicationGrouping': pdict.get('replicationGrouping', ""),
'replicationGrouping': pdict.get('replicationGrouping', 'ALL'),
'activatedOn': pdict.get('activatedOn', gmtimeSeconds()),
'deactivatedOn': pdict.get('deactivatedOn', gmtimeSeconds()),
'active': pdict.get('active', False),
'pileupSize': pdict.get('pileupSize', 0),
'ruleList': pdict.get('ruleList', [])}
'ruleIds': pdict.get('ruleIds', [])}
valid, msg = self.validate(self.data)
if not valid:
msg = f'MSPileup input is invalid, {msg}'
Expand Down Expand Up @@ -126,7 +131,7 @@ def validate(self, pdict=None):
msg = f"containerFraction value {val} outside [0,1] range"
self.logger.error(msg)
return False, msg
if (key == 'expectedRSEs' or key == 'currentRSEs') and not self.validateRSEs(val):
if key in ('expectedRSEs', 'currentRSEs') and not self.validateRSEs(val):
msg = f"{key} value {val} is not in validRSEs {self.validRSEs}"
self.logger.error(msg)
return False, msg
Expand Down Expand Up @@ -170,5 +175,5 @@ def schema():
'deactivatedOn': (0, int),
'active': (False, bool),
'pileupSize': (0, int),
'ruleList': ([], list)}
'ruleIds': ([], list)}
return doc
71 changes: 71 additions & 0 deletions src/python/WMCore/MicroService/MSPileup/MSPileupReport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python
"""
File : MSPileupReport.py
Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: MSPileup report module
"""

# WMCore modules
from Utils.Timers import gmtimeSeconds, encodeTimestamp


class MSPileupReport():
"""
MSPileupReport class represents MSPileup report object(s)
"""
def __init__(self, autoExpire=3600, autoCleanup=False):
"""
Constructor for MSPileup object
"""
self.docs = []
self.autoExpire = autoExpire
self.autoCleanup = autoCleanup

def addEntry(self, task, uuid, entry):
"""
Add new entry into MSPileup documents
:param task: task name
:param uuid: unique id of the entry
:param entry: entry message or any other object to store
"""
if self.autoCleanup:
self.purgeExpired()
gmtime = gmtimeSeconds()
report = {'gmtime': gmtime, 'uuid': uuid,
'timestamp': encodeTimestamp(gmtime),
'entry': entry, 'task': task}
self.docs.append(report)

def purgeExpired(self):
"""
Purge expired records from internal docs
"""
gmtime = gmtimeSeconds()
for entry in list(self.docs):
if gmtime - entry['gmtime'] > self.autoExpire:
self.docs.remove(entry)

def getDocuments(self):
"""
Return report documents
"""
if self.autoCleanup:
self.purgeExpired()
return self.docs

def getReportByUuid(self):
"""
Return report documents in dictonary form with uuid's as keys
"""
if self.autoCleanup:
self.purgeExpired()
rdict = {}
for doc in self.docs:
uuid = doc['uuid']
timestamp = doc['timestamp']
entry = doc['entry']
task = doc['task']
record = f"{timestamp} {task} task {uuid} {entry}"
rdict.setdefault(uuid, []).append(record)
return rdict
85 changes: 85 additions & 0 deletions src/python/WMCore/MicroService/MSPileup/MSPileupTaskManager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python
"""
File : MSPileupTaskManager.py
Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: MSPileupTaskManager handles MSPileupTasks
In particular, it perform the following tasks each polling cycle:
- fetches pileup sizes for all pileup documents in database back-end
- update RSE quotas
- perform monitoring task
- perform task for active pileups using up-to-date RSE quotas
- perform task for inactive pileups
"""

# system modules
import os
from threading import current_thread

# WMCore modules
from WMCore.MicroService.MSCore.MSCore import MSCore
from WMCore.MicroService.DataStructs.DefaultStructs import PILEUP_REPORT
from WMCore.MicroService.MSPileup.MSPileupData import MSPileupData
from WMCore.MicroService.MSPileup.MSPileupTasks import MSPileupTasks
from WMCore.MicroService.MSTransferor.DataStructs.RSEQuotas import RSEQuotas
from WMCore.MicroService.Tools.PycurlRucio import getPileupContainerSizesRucio, getRucioToken
from WMCore.Services.Rucio.Rucio import Rucio


class MSPileupTaskManager(MSCore):
"""
MSPileupTaskManager handles MSPileup tasks
"""

def __init__(self, msConfig, **kwargs):
super().__init__(msConfig, **kwargs)
self.marginSpace = msConfig.get('marginSpace', 1024**4)
self.rucioAccount = msConfig.get('rucioAccount', 'ms-pileup')
self.rucioUrl = msConfig.get('rucioHost', 'http://cms-rucio.cern.ch')
self.rucioAuthUrl = msConfig.get('authHost', 'https://cms-rucio-auth.cern.ch')
creds = {"client_cert": os.getenv("X509_USER_CERT", "Unknown"),
"client_key": os.getenv("X509_USER_KEY", "Unknown")}
configDict = {'rucio_host': self.rucioUrl, 'auth_host': self.rucioAuthUrl,
'creds': creds, 'auth_type': 'x509'}
self.rucioClient = Rucio(self.rucioAccount, configDict=configDict)
self.dataManager = MSPileupData(msConfig)
self.mgr = MSPileupTasks(self.dataManager, self.logger,
self.rucioAccount, self.rucioClient)
self.rseQuotas = RSEQuotas(self.rucioAccount, msConfig["quotaUsage"],
minimumThreshold=msConfig["minimumThreshold"],
verbose=msConfig['verbose'], logger=self.logger)

def status(self):
"""
Provide MSPileupTaskManager status API.
:return: status dictionary
"""
summary = dict(PILEUP_REPORT)
summary.update({'thread_id': current_thread().name})
summary.update({'tasks': self.msg.getReport())
return summary

def executeCycle(self):
"""
execute MSPileupTasks polling cycle
"""
# get pileup sizes and update them in DB
spec = {}
docs = self.dataManager.getPileup(spec)
rucioToken = getRucioToken(self.rucioAuthUrl, self.rucioAccount)
containers = [r['pileupName'] for r in docs]
datasetSizes = getPileupContainerSizesRucio(containers, self.rucioUrl, rucioToken)
for doc in docs:
pileupSize = datasetSizes.get(doc['pileupName'], 0)
doc['pileupSize'] = pileupSize
self.dataManager.updatePileup(doc)

# fetch all rse quotas
self.rseQuotas.fetchStorageUsage(self.rucioClient)
nodeUsage = self.rseQuotas.getNodeUsage()

# execute all tasks
self.mgr.monitoringTask()
self.mgr.activeTask(nodeUsage=nodeUsage, marginSpace=self.marginSpace)
self.mgr.inactiveTask()
Loading

0 comments on commit de8ca32

Please sign in to comment.