diff --git a/src/python/WMCore/MicroService/MSPileup/DataStructs/MSPileupObj.py b/src/python/WMCore/MicroService/MSPileup/DataStructs/MSPileupObj.py index c24378e704..86b287d22a 100644 --- a/src/python/WMCore/MicroService/MSPileup/DataStructs/MSPileupObj.py +++ b/src/python/WMCore/MicroService/MSPileup/DataStructs/MSPileupObj.py @@ -4,21 +4,21 @@ Description: MSPileupObj module provides MSPileup data structure: { - "pileupName": string with the pileup dataset - "pileupType": string with a constant value - "insertTime": int, seconds since epoch in GMT timezone - "lastUpdateTime": int, seconds since epoch in GMT timezone - "expectedRSEs": ["Disk1", "Disk2", etc], # these values need to be validated against Rucio - "currentRSEs": ["Disk1", "Disk3"], # values provided by the MS itself - "fullReplicas": integer, # total number of replicas to keep on Disk - "campaigns": [ "name", ... ] # list of workflow campaigns using this pileup - "containerFraction": real number with the container fraction to be distributed (TBFD) - "replicationGrouping": string with a constant value (DATASET or ALL, to be in sync with Rucio) - "activatedOn": int, seconds since epoch in GMT timezone - "deactivatedOn": int, seconds since epoch in GMT timezone - "active": boolean, - "pileupSizeGB": integer, current size of the pileup in GB (integer) - "rulesList: list of strings (rules) used to lock the pileup id + "pileupName": string with the pileup dataset (madatory) + "pileupType": string with a constant value (mandatory) + "insertTime": int, seconds since epoch in GMT timezone (service-based) + "lastUpdateTime": int, seconds since epoch in GMT timezone (service-based) + "expectedRSEs": ["Disk1", "Disk2", etc], a non-empty list of strings with the RSE names (mandatory) + "currentRSEs": ["Disk1", "Disk3"], a list of strings with the RSE names (service-based) + "fullReplicas": integer, # total number of replicas to keep on Disk (optional) + "campaigns": [ "name", ... ] # list of workflow campaigns using this pileup (optional) + "containerFraction": real number with the container fraction to be distributed (optional) + "replicationGrouping": string with a constant value (DATASET or ALL, (optional) + "activatedOn": int, seconds since epoch in GMT timezone (service-based) + "deactivatedOn": int, seconds since epoch in GMT timezone (service-based) + "active": boolean, (mandatory) + "pileupSize": integer, current size of the pileup in bytes (service-based) + "ruleIds: list of strings (rules) used to lock the pileup id (service-based) } The data flow should be done via list of objects, e.g. @@ -29,12 +29,12 @@ import json # WMCore modules +from Utils.Timers import gmtimeSeconds from WMCore.MicroService.Tools.Common import getMSLogger from WMCore.Lexicon import dataset -from Utils.Timers import gmtimeSeconds -class MSPileupObj(object): +class MSPileupObj(): """ MSPileupObj defines MSPileup data stucture """ @@ -44,22 +44,27 @@ def __init__(self, pdict, verbose=None, logger=None, validRSEs=None): validRSEs = [] self.validRSEs = validRSEs + expectedRSEs = pdict['expectedRSEs'] + if len(expectedRSEs) == 0: + msg = 'MSPileup document require non-empty list of expectedRSEs' + raise Exception(msg) + self.data = { - 'pileupName': pdict.get('pileupName', ''), - 'pileupType': pdict.get('pileupType', ''), + 'pileupName': pdict['pileupName'], + 'pileupType': pdict['pileupType'], 'insertTime': pdict.get('insertTime', gmtimeSeconds()), 'lastUpdateTime': pdict.get('lastUpdateTime', gmtimeSeconds()), - 'expectedRSEs': pdict.get('expectedRSEs', []), + 'expectedRSEs': expectedRSEs, 'currentRSEs': pdict.get('currentRSEs', []), - 'fullReplicas': pdict.get('fullReplicas', 0), + 'fullReplicas': pdict.get('fullReplicas', 1), 'campaigns': pdict.get('campaigns', []), 'containerFraction': pdict.get('containerFraction', 1.0), - 'replicationGrouping': pdict.get('replicationGrouping', ""), + 'replicationGrouping': pdict.get('replicationGrouping', 'ALL'), 'activatedOn': pdict.get('activatedOn', gmtimeSeconds()), 'deactivatedOn': pdict.get('deactivatedOn', gmtimeSeconds()), 'active': pdict.get('active', False), 'pileupSize': pdict.get('pileupSize', 0), - 'ruleList': pdict.get('ruleList', [])} + 'ruleIds': pdict.get('ruleIds', [])} valid, msg = self.validate(self.data) if not valid: msg = f'MSPileup input is invalid, {msg}' @@ -126,7 +131,7 @@ def validate(self, pdict=None): msg = f"containerFraction value {val} outside [0,1] range" self.logger.error(msg) return False, msg - if (key == 'expectedRSEs' or key == 'currentRSEs') and not self.validateRSEs(val): + if key in ('expectedRSEs', 'currentRSEs') and not self.validateRSEs(val): msg = f"{key} value {val} is not in validRSEs {self.validRSEs}" self.logger.error(msg) return False, msg @@ -170,5 +175,5 @@ def schema(): 'deactivatedOn': (0, int), 'active': (False, bool), 'pileupSize': (0, int), - 'ruleList': ([], list)} + 'ruleIds': ([], list)} return doc diff --git a/src/python/WMCore/MicroService/MSPileup/MSPileupReport.py b/src/python/WMCore/MicroService/MSPileup/MSPileupReport.py new file mode 100644 index 0000000000..3cf173189c --- /dev/null +++ b/src/python/WMCore/MicroService/MSPileup/MSPileupReport.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +""" +File : MSPileupReport.py +Author : Valentin Kuznetsov +Description: MSPileup report module +""" + +# WMCore modules +from Utils.Timers import gmtimeSeconds, encodeTimestamp + + +class MSPileupReport(): + """ + MSPileupReport class represents MSPileup report object(s) + """ + def __init__(self, autoExpire=3600, autoCleanup=False): + """ + Constructor for MSPileup object + """ + self.docs = [] + self.autoExpire = autoExpire + self.autoCleanup = autoCleanup + + def addEntry(self, task, uuid, entry): + """ + Add new entry into MSPileup documents + + :param task: task name + :param uuid: unique id of the entry + :param entry: entry message or any other object to store + """ + if self.autoCleanup: + self.purgeExpired() + gmtime = gmtimeSeconds() + report = {'gmtime': gmtime, 'uuid': uuid, + 'timestamp': encodeTimestamp(gmtime), + 'entry': entry, 'task': task} + self.docs.append(report) + + def purgeExpired(self): + """ + Purge expired records from internal docs + """ + gmtime = gmtimeSeconds() + for entry in list(self.docs): + if gmtime - entry['gmtime'] > self.autoExpire: + self.docs.remove(entry) + + def getDocuments(self): + """ + Return report documents + """ + if self.autoCleanup: + self.purgeExpired() + return self.docs + + def getReportByUuid(self): + """ + Return report documents in dictonary form with uuid's as keys + """ + if self.autoCleanup: + self.purgeExpired() + rdict = {} + for doc in self.docs: + uuid = doc['uuid'] + timestamp = doc['timestamp'] + entry = doc['entry'] + task = doc['task'] + record = f"{timestamp} {task} task {uuid} {entry}" + rdict.setdefault(uuid, []).append(record) + return rdict diff --git a/src/python/WMCore/MicroService/MSPileup/MSPileupTaskManager.py b/src/python/WMCore/MicroService/MSPileup/MSPileupTaskManager.py new file mode 100644 index 0000000000..f4bbf08b78 --- /dev/null +++ b/src/python/WMCore/MicroService/MSPileup/MSPileupTaskManager.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +""" +File : MSPileupTaskManager.py +Author : Valentin Kuznetsov +Description: MSPileupTaskManager handles MSPileupTasks + +In particular, it perform the following tasks each polling cycle: + - fetches pileup sizes for all pileup documents in database back-end + - update RSE quotas + - perform monitoring task + - perform task for active pileups using up-to-date RSE quotas + - perform task for inactive pileups +""" + +# system modules +import os +from threading import current_thread + +# WMCore modules +from WMCore.MicroService.MSCore.MSCore import MSCore +from WMCore.MicroService.DataStructs.DefaultStructs import PILEUP_REPORT +from WMCore.MicroService.MSPileup.MSPileupData import MSPileupData +from WMCore.MicroService.MSPileup.MSPileupTasks import MSPileupTasks +from WMCore.MicroService.MSTransferor.DataStructs.RSEQuotas import RSEQuotas +from WMCore.MicroService.Tools.PycurlRucio import getPileupContainerSizesRucio, getRucioToken +from WMCore.Services.Rucio.Rucio import Rucio + + +class MSPileupTaskManager(MSCore): + """ + MSPileupTaskManager handles MSPileup tasks + """ + + def __init__(self, msConfig, **kwargs): + super().__init__(msConfig, **kwargs) + self.marginSpace = msConfig.get('marginSpace', 1024**4) + self.rucioAccount = msConfig.get('rucioAccount', 'ms-pileup') + self.rucioUrl = msConfig.get('rucioHost', 'http://cms-rucio.cern.ch') + self.rucioAuthUrl = msConfig.get('authHost', 'https://cms-rucio-auth.cern.ch') + creds = {"client_cert": os.getenv("X509_USER_CERT", "Unknown"), + "client_key": os.getenv("X509_USER_KEY", "Unknown")} + configDict = {'rucio_host': self.rucioUrl, 'auth_host': self.rucioAuthUrl, + 'creds': creds, 'auth_type': 'x509'} + self.rucioClient = Rucio(self.rucioAccount, configDict=configDict) + self.dataManager = MSPileupData(msConfig) + self.mgr = MSPileupTasks(self.dataManager, self.logger, + self.rucioAccount, self.rucioClient) + self.rseQuotas = RSEQuotas(self.rucioAccount, msConfig["quotaUsage"], + minimumThreshold=msConfig["minimumThreshold"], + verbose=msConfig['verbose'], logger=self.logger) + + def status(self): + """ + Provide MSPileupTaskManager status API. + + :return: status dictionary + """ + summary = dict(PILEUP_REPORT) + summary.update({'thread_id': current_thread().name}) + summary.update({'tasks': self.msg.getReport()) + return summary + + def executeCycle(self): + """ + execute MSPileupTasks polling cycle + """ + # get pileup sizes and update them in DB + spec = {} + docs = self.dataManager.getPileup(spec) + rucioToken = getRucioToken(self.rucioAuthUrl, self.rucioAccount) + containers = [r['pileupName'] for r in docs] + datasetSizes = getPileupContainerSizesRucio(containers, self.rucioUrl, rucioToken) + for doc in docs: + pileupSize = datasetSizes.get(doc['pileupName'], 0) + doc['pileupSize'] = pileupSize + self.dataManager.updatePileup(doc) + + # fetch all rse quotas + self.rseQuotas.fetchStorageUsage(self.rucioClient) + nodeUsage = self.rseQuotas.getNodeUsage() + + # execute all tasks + self.mgr.monitoringTask() + self.mgr.activeTask(nodeUsage=nodeUsage, marginSpace=self.marginSpace) + self.mgr.inactiveTask() diff --git a/src/python/WMCore/MicroService/MSPileup/MSPileupTasks.py b/src/python/WMCore/MicroService/MSPileup/MSPileupTasks.py new file mode 100644 index 0000000000..6c8a45acdd --- /dev/null +++ b/src/python/WMCore/MicroService/MSPileup/MSPileupTasks.py @@ -0,0 +1,483 @@ +#!/usr/bin/env python +""" +File : MSPileupTasks.py +Author : Valentin Kuznetsov +Description: perform different set of tasks over MSPileup data +""" + +# system modules +import time +import asyncio + +# rucio modules +from rucio.client import Client +from rucio.common.exception import (AccountNotFound, AccessDenied, DuplicateRule, + DuplicateContent, InvalidRSEExpression, + UnsupportedOperation, RuleNotFound, RSENotFound) +# WMCore modules +from WMCore.MicroService.Tools.Common import getMSLogger +from WMCore.MicroService.MSPileup.MSPileupReport import MSPileupReport +from WMCore.Services.UUIDLib import makeUUID + + +class MSPileupTasks(): + """ + MSPileupTaskManager class is resposible for data placement logic. It performs + three main tasks: + - monitoring task to fetch current state of rule ID + - inactive task to lookup pileup docs that has been set to inactive state + - active task to look-up pileup docs in active state + """ + + def __init__(self, dataManager, logger, rucioAccount, rucioClient): + """ + MSPileupTaskManager constructor + :param dataManager: MSPileup Data Management layer instance + :param logger: logger instance + :param rucioAccount: rucio account name to use + :param rucioClient: rucio client or WMCore Rucio wrapper class to use + """ + self.mgr = dataManager + self.logger = logger + self.rucioAccount = rucioAccount + self.rucioClient = rucioClient + self.report = MSPileupReport() + + def monitoringTask(self): + """ + Execute Monitoring task according to the following logic: + + 1. Read pileup document from MongoDB with filter active=true + 2. For each rule id in ruleIds: + - query Rucio for that rule id and fetch its state (e.g.: afd122143kjmdskj) + - if state=OK, log that the rule has been satisfied and add that RSE to + the currentRSEs (unique) + - otherwise, calculate the rule completion based on the 3 locks_* field + 3. now that all the known rules have been inspected, persist the up-to-date + pileup doc in MongoDB + """ + spec = {'active': True} + docs = self.mgr.getPileup(spec) + taskSpec = self.getTaskSpec() + self.logger.info("Running the monitoring task on %d pileup objects", len(docs)) + asyncio.run(performTasks('monitoring', docs, taskSpec)) + + def inactiveTask(self): + """ + Inactive pileup task: + + This task is supposed to look at pileup documents that have been set to + inactive. The main goal here is to ensure that there are no Rucio rules + left in the system (of course, for the relevant DID and the Rucio + account adopted by our microservice). Pileup documents that are updated + as a result of this logic should have their data persisted back in + MongoDB. A short algorithm for it can be done described as follows: + + 1. Read pileup document from MongoDB with filter active=false + 2. for each DID and Rucio account, get a list of all the existent rules + - make a Rucio call to delete that rule id, then: + - remove the rule id from ruleIds (if any) and remove the RSE name + from currentRSEs (if any) + 3. make a log record if the DID + Rucio account tuple does not have any + existent rules + - and set ruleIds and currentRSEs to an empty list + 4. once all the relevant rules have been removed, persist an up-to-date + version of the pileup data structure in MongoDB + """ + spec = {'active': False} + docs = self.mgr.getPileup(spec) + taskSpec = self.getTaskSpec() + self.logger.info("Running the inactive task on %d pileup objects", len(docs)) + asyncio.run(performTasks('inactive', docs, taskSpec)) + + def activeTask(self, nodeUsage=None, marginSpace=1024**4): + """ + Active pileup task: + :param nodeUsage: node usage dictionary, see RSEQuotas + :param marginSpace: minimum margin space size in bytes to have at RSE to place a dataset + + This task is supposed to look at pileup documents active in the system. + Its main goal is to ensure that the pileup DID has all the requested + rules (and nothing beyond them), according to the pileup object + configuration. Pileup documents that are updated as a result of this + logic should have their data persisted back in MongoDB. + + 1. Read pileup document from MongoDB with filter active=true + 2. if expectedRSEs is different than currentRSEs, then further data placement + is required (it's possible that data removal is required!) + 3. make a local copy of the currentRSEs value to track rules incomplete but ongoing + 4. for each rule matching the DID + Rucio account, perform: + - if rule RSE is not in expectedRSEs, then this rule needs to be deleted. + - upon successful rule deletion, also remove the RSE name + from currentRSEs and ruleIds (if any) + - make a log record + - else, save this rule RSE in the local copy of currentRSEs. + This rule is likely still being processed by Rucio. + 5. now that we evaluated expected versus current, + for each expectedRSEs not in our local copy of currentRSEs: + - first, check whether the RSE has enough available space available for that + (we can assume that any storage with less than 1 TB available cannot be + considered for pileup data placement) + - in case of no space available, make a log record + - in case there is enough space, make a Rucio rule for + that DID + Rucio account + RSE + - now append the rule id to the ruleIds list + 6. once all the relevant rules have been created, + or if there was any changes to the pileup object, + persist an up-to-date version of the pileup data structure in MongoDB + """ + spec = {'active': True} + docs = self.mgr.getPileup(spec) + taskSpec = self.getTaskSpec() + taskSpec['marginSpace'] = marginSpace + taskSpec['nodeUsage'] = nodeUsage + self.logger.info("Running the active task on %d pileup objects", len(docs)) + asyncio.run(performTasks('active', docs, taskSpec)) + + def getTaskSpec(self): + """Return task spec""" + spec = {'manager': self.mgr, 'logger': self.logger, 'report': self.report, + 'rucioClient': self.rucioClient, 'rucioAccount': self.rucioAccount} + return spec + + def getReport(self): + """ + Return report object to upstream codebase + """ + return self.report + + +def rucioArgs(account): + """Return relevant rucio arguments""" + kwargs = {'groupping': 'ALL', 'account': account, + 'lifeTime': None, 'locked': False, 'notify': 'N', + 'purge_replicas': False, 'ignore_availability': False, + 'ask_approval': False, 'asynchrounous': True, 'priority': 3} + return kwargs + + +def monitoringTask(doc, spec): + """ + Perform single monitoring task over provided MSPileup document + + :param doc: MSPileup document + :param spec: task spec dict + """ + mgr = spec['manager'] + uuid = spec['uuid'] + rucioClient = spec['rucioClient'] + rucioAccount = spec['rucioAccount'] + report = spec['report'] + logger = spec['logger'] + + # get list of existent rule ids + pname = doc['pileupName'] + kwargs = {'scope': 'cms', 'account': rucioAccount} + if isinstance(rucioClient, Client): + inputArgs = {'scope': 'cms', 'name': pname, 'account': rucioAccount} + rules = list(rucioClient.list_replication_rules(inputArgs)) + else: + rules = rucioClient.listDataRules(pname, **kwargs) + modify = False + + for rdoc in rules: + if isinstance(rucioClient, Client): + rses = [r['rse'] for r in rucioClient.list_rses(rdoc['rse_expression'])] + else: + rses = rucioClient.evaluateRSEExpression(rdoc['rse_expression']) + rid = rdoc['id'] + state = rdoc['state'] + + # rucio state have the following values + # https://github.com/rucio/rucio/blob/master/lib/rucio/db/sqla/constants.py + # the states are: OK, REPLICATING, STUCK, SUSPENDED, WAITING_APPROVAL, INJECT + if state == 'OK': + # log that the rule has been satisfied and add that RSE to the currentRSEs (unique) + msg = f"monitoring task {uuid}, container {pname} under the rule ID {rid}" + msg += f" targeting RSEs {rses} in rucio account {rucioAccount} is completely available" + logger.info(msg) + for rse in rses: + if rse not in doc['currentRSEs']: + doc['currentRSEs'].append(rse) + modify = True + msg = f"update currentRSEs with {rse}" + report.addEntry('monitoring', uuid, msg) + else: + # calculate the rule completion based on the 3 locks_* field + sumOfLocks = rdoc['locks_ok_cnt'] + rdoc['locks_replicating_cnt'] + rdoc['locks_stuck_cnt'] + completion = rdoc['locks_ok_cnt'] / sumOfLocks + msg = f"monitoring task {uuid}, container {pname} under the rule ID {rid}" + msg += f" targeting RSEs {rses} in rucio account {rucioAccount} has" + msg += f" a fraction completion of {completion}" + logger.info(msg) + for rse in rses: + if rse in doc['currentRSEs']: + doc['currentRSEs'].remove(rse) + modify = True + msg = f"delete rse {rse} from currentRSEs" + report.addEntry('monitoring', uuid, msg) + + # persist an up-to-date version of the pileup data structure in MongoDB + if modify: + logger.info(f"monitoring task {uuid}, update {pname}") + mgr.updatePileup(doc) + msg = f"update pileup {pname}" + report.addEntry('monitoring', uuid, msg) + else: + logger.info("monitoring task {uuid}, processed without update for {pname}") + + +def inactiveTask(doc, spec): + """ + Perform single inactive task over provided MSPileup document + + :param doc: MSPileup document + :param spec: task spec dict + """ + mgr = spec['manager'] + uuid = spec['uuid'] + rucioClient = spec['rucioClient'] + rucioAccount = spec['rucioAccount'] + report = spec['report'] + logger = spec['logger'] + pname = doc['pileupName'] + kwargs = {'scope': 'cms', 'account': rucioAccount} + if isinstance(rucioClient, Client): + inputArgs = {'scope': 'cms', 'name': pname, 'account': rucioAccount} + rules = list(rucioClient.list_replication_rules(inputArgs)) + else: + rules = rucioClient.listDataRules(pname, **kwargs) + modify = False + + for rdoc in rules: + # make a Rucio call to delete that rule id + rid = rdoc['id'] + msg = f"inactive task {uuid}, container: {pname} for Rucio account {rucioAccount}" + msg += f", delete replication rule {rid}" + logger.info(msg) + if isinstance(rucioClient, Client): + rucioClient.delete_replication_rule(rid) + rses = [r['rse'] for r in rucioClient.list_rses(rdoc['rse_expression'])] + else: + rucioClient.deleteRule(rid) + rses = rucioClient.evaluateRSEExpression(rdoc['rse_expression']) + + # remove the rule id from ruleIds (if any) and remove the RSE name from currentRSEs (if any) + if rid in doc['ruleIds']: + doc['ruleIds'].remove(rid) + modify = True + msg = f"remove rid {rid}" + report.addEntry('inactive', uuid, msg) + for rse in rses: + if rse in doc['currentRSEs']: + doc['currentRSEs'].remove(rse) + modify = True + msg = f"remove rse {rse}" + report.addEntry('inactive', uuid, msg) + + # make a log record if the DID + Rucio account tuple does not have any existent rules + if not rules: + msg = f"inactive task {uuid}, container: {pname} for Rucio account {rucioAccount}" + msg += " does not have any existing rules, proceed without update" + logger.info(msg) + + # persist an up-to-date version of the pileup data structure in MongoDB + if modify: + logger.info(f"inactive task {uuid}, update {pname}") + mgr.updatePileup(doc) + msg = f"update pileup {pname}" + report.addEntry('inactive', uuid, msg) + + +def activeTask(doc, spec): + """ + Perform single active task over provided MSPileup document + + :param doc: MSPileup document + :param spec: task spec dict + """ + mgr = spec['manager'] + uuid = spec['uuid'] + logger = spec['logger'] + rucioClient = spec['rucioClient'] + rucioAccount = spec['rucioAccount'] + report = spec['report'] + marginSpace = spec['marginSpace'] + nodeUsage = spec['nodeUsage'] + + # extract relevant part of our pileup document we'll use in our logic + expectedRSEs = set(doc['expectedRSEs']) + currentRSEs = set(doc['currentRSEs']) + pileupSize = doc['pileupSize'] + pname = doc['pileupName'] + inputArgs = {'scope': 'cms', 'name': pname, 'account': rucioAccount} + kwargs = {'scope': 'cms', 'account': rucioAccount} + modify = False + localCopyOfCurrentRSEs = list(currentRSEs) + + # if expectedRSEs is different than currentRSEs, then further data placement is required + # it's possible that data removal is required! + if expectedRSEs != currentRSEs: + msg = f"active task {uuid}" + msg += f", further data placement required for pileup name: {pname}," + msg += f" with expectedRSEs: {expectedRSEs} and data currently available at: {currentRSEs}" + logger.info(msg) + + # get list of replication rules for our scope, pileup name and account + if isinstance(rucioClient, Client): + rules = list(rucioClient.list_replication_rules(inputArgs)) + else: + rules = rucioClient.listDataRules(pname, **kwargs) + # for each rse_expression in rules get list of rses + for rdoc in rules: + if isinstance(rucioClient, Client): + rses = [r['rse'] for r in rucioClient.list_rses(rdoc['rse_expression'])] + else: + rses = rucioClient.evaluateRSEExpression(rdoc['rse_expression']) + rdoc['rses'] = rses + + # for each rule matching the DID + Rucio account + for rdoc in rules: + rid = rdoc['id'] + rses = rdoc['rses'] + for rse in rses: + # if rule RSE is not in expectedRSEs, then this rule needs to be deleted + if rse not in expectedRSEs: + # upon successful rule deletion, also remove the RSE name from currentRSEs + if isinstance(rucioClient, Client): + rucioClient.delete_replication_rule(rid) + else: + rucioClient.deleteRule(rid) + if rse in doc['currentRSEs']: + doc['currentRSEs'].remove(rse) + modify = True + msg = f"rse {rse} rule is deleted and remove from currentRSEs of {pname}" + logger.info(msg) + # delete rid in ruleIds (if any) + if rid in doc['ruleIds']: + doc['ruleIds'].remove(rid) + modify = True + msg = f"rule Id {rid} is removed from ruleIds for {pname}" + logger.info(msg) + else: + # else, save this rule RSE in the local copy of currentRSEs. + # This rule is likely still being processed by Rucio. + if rse not in localCopyOfCurrentRSEs: + localCopyOfCurrentRSEs.append(rse) + msg = f"rse {rse} is added to localCopyOfCurrentRSEs" + logger.info(msg) + + # for each expectedRSEs not in localCopyOfCurrentRSEs + for rse in set(expectedRSEs).difference(localCopyOfCurrentRSEs): + # check whether the RSE has enough available space available for that + if isinstance(rucioClient, Client): + generator = rucioClient.get_rse_usage(rse) + else: + generator = rucioClient.getRSEUsage(rse) + enoughSpace = False + for rec in generator: + # use nodeUsage provided in spec to get rse space info + if nodeUsage and rse in nodeUsage: + rseUsage = nodeUsage[rse] + if rseUsage['bytes_remaining'] > marginSpace: + enoughSpace = True + break + else: + # we'll use total - used space to exceed pileupSize with some margin + if rec['total'] - rec['used'] - pileupSize > marginSpace: + enoughSpace = True + break + dids = [inputArgs] + if enoughSpace: + msg = f"active task {uuid}, for dids {dids} there is enough space at RSE {rse}" + logger.warning(msg) + # create the rule and append the rule id to ruleIds + if isinstance(rucioClient, Client): + kwargs = rucioArgs(rucioAccount) + copies = 1 + rids = rucioClient.add_replication_rule(dids, copies, rse, **kwargs) + else: + rids = rucioClient.createReplicationRule(pname, rse, **kwargs) + # add new ruleId to document ruleIds + for rid in rids: + if rid not in doc['ruleIds']: + doc['ruleIds'].append(rid) + modify = True + else: + # make a log record saying that there is not enough space + msg = f"active task {uuid}, for dids {dids} there is not enough space at RSE {rse}" + logger.warning(msg) + + # persist an up-to-date version of the pileup data structure in MongoDB + if modify: + logger.info(f"active task {uuid} update {pname}") + mgr.updatePileup(doc) + msg = f"update pileup {pname}" + report.addEntry('active', uuid, msg) + else: + logger.info(f"active task {uuid}, processed without update") + + +async def runTask(task, doc, spec): + """ + Run specified task for given document and spec + + :param task: task to perform + :param doc: MSPileup document to process + :param spec: task spec dictionary + """ + time0 = time.time() + report = spec['report'] + logger = spec.get('logger', getMSLogger(False)) + pname = doc['pileupName'] + + # set report hash + uuid = makeUUID() + spec['uuid'] = uuid + report.addEntry(task, uuid, 'starts') + msg = f"MSPileup {task} task {uuid} pileup {pname}" + try: + if task == 'monitoring': + monitoringTask(doc, spec) + elif task == 'inactive': + inactiveTask(doc, spec) + elif task == 'active': + activeTask(doc, spec) + msg += ", successfully processed" + # specific rucio exception handling if we use rucio client + except (AccountNotFound, AccessDenied) as exp: + msg += f", failed with access issue, error {exp}" + logger.error(msg) + except (DuplicateRule, DuplicateContent) as exp: + msg += f", failed with duplicate issue, error {exp}" + logger.error(msg) + except (RSENotFound, InvalidRSEExpression) as exp: + msg += f", failed with rse issue, error {exp}" + logger.error(msg) + except RuleNotFound as exp: + msg += f", failed with rule not found, error {exp}" + logger.error(msg) + except UnsupportedOperation as exp: + msg += f", failed with unsupported operation, error {exp}" + logger.error(msg) + except Exception as exp: + msg += f", failed with error {exp}" + logger.exception(msg) + report.addEntry(task, uuid, msg) + + # update task report + etime = time.time() - time0 + msg = "ends, elapsed time %.2f (sec)" % etime + report.addEntry(task, uuid, msg) + + +async def performTasks(task, docs, spec): + """ + Perform tasks via async IO co-routines + + :param task: task to perform + :param docs: list of MSPileup documents to process + :param spec: task spec dictionary + """ + coRoutines = [runTask(task, doc, spec) for doc in docs] + await asyncio.gather(*coRoutines) diff --git a/src/python/WMCore/Services/Rucio/Rucio.py b/src/python/WMCore/Services/Rucio/Rucio.py index 365794eb35..178e5f69d1 100644 --- a/src/python/WMCore/Services/Rucio/Rucio.py +++ b/src/python/WMCore/Services/Rucio/Rucio.py @@ -52,7 +52,7 @@ class Rucio(object): We will try to use Container -> CMS dataset, block -> CMS block. """ - def __init__(self, acct, hostUrl=None, authUrl=None, configDict=None): + def __init__(self, acct, hostUrl=None, authUrl=None, configDict=None, client=None): """ Constructs a Rucio object with the Client object embedded. In order to instantiate a Rucio Client object, it assumes the host has @@ -62,6 +62,7 @@ def __init__(self, acct, hostUrl=None, authUrl=None, configDict=None): :param hostUrl: defaults to the rucio config one :param authUrl: defaults to the rucio config one :param configDict: dictionary with extra parameters + :param client: optional Rucio client to use (useful for mock-up) """ configDict = configDict or {} # default RSE data caching to 12h @@ -79,15 +80,18 @@ def __init__(self, acct, hostUrl=None, authUrl=None, configDict=None): self.rucioParams.setdefault('user_agent', 'wmcore-client') self.logger.info("WMCore Rucio initialization parameters: %s", self.rucioParams) - self.cli = Client(rucio_host=hostUrl, auth_host=authUrl, account=acct, - ca_cert=self.rucioParams['ca_cert'], auth_type=self.rucioParams['auth_type'], - creds=self.rucioParams['creds'], timeout=self.rucioParams['timeout'], - user_agent=self.rucioParams['user_agent']) - clientParams = {} - for k in ("host", "auth_host", "auth_type", "account", "user_agent", - "ca_cert", "creds", "timeout", "request_retries"): - clientParams[k] = getattr(self.cli, k) - self.logger.info("Rucio client initialization parameters: %s", clientParams) + if client: + self.cli = client + else: + self.cli = Client(rucio_host=hostUrl, auth_host=authUrl, account=acct, + ca_cert=self.rucioParams['ca_cert'], auth_type=self.rucioParams['auth_type'], + creds=self.rucioParams['creds'], timeout=self.rucioParams['timeout'], + user_agent=self.rucioParams['user_agent']) + clientParams = {} + for k in ("host", "auth_host", "auth_type", "account", "user_agent", + "ca_cert", "creds", "timeout", "request_retries"): + clientParams[k] = getattr(self.cli, k) + self.logger.info("Rucio client initialization parameters: %s", clientParams) # keep a map of rse expression to RSE names mapped for some time self.cachedRSEs = MemoryCache(rseCacheExpiration, {}) @@ -1216,3 +1220,16 @@ def getContainerLockedAndAvailable(self, **kwargs): self.logger.info("Container: %s with block-based location at: %s, and final location: %s", kwargs['name'], commonBlockRSEs, finalRSEs) return finalRSEs + + def getRSEUsage(self, rse): + """ + get_rse_usage rucio API + + :param rse: name of RSE + :return: generator of records about given RSE + """ + try: + return self.cli.get_rse_usage(rse) + except Exception as exp: + self.logger.error("Failed to get information about rse %s. Error: %s", rse, str(exp)) + raise exp diff --git a/test/data/Mock/RucioMockData.json b/test/data/Mock/RucioMockData.json index 7661d14b91..20feef90ca 100644 --- a/test/data/Mock/RucioMockData.json +++ b/test/data/Mock/RucioMockData.json @@ -1392,6 +1392,19 @@ "scope": "cms", "type": "CONTAINER" }, + "listDataRules:[('account', 'ms-pileup'), ('scope', 'cms')]": [{ + "state": "OK", + "id": "123", + "rse_expression": "T2_XX_CERN", + "locks_ok_cnt": 2, + "locks_replicating_cnt": 2, + "locks_stuck_cnt": 2, + "scope": "cms", + "name": "/MinimumBias/ComissioningHI-v1/RAW", + "account": "ms-pileup" + }], + "evaluateRSEExpression": ["T2_XX_CERN"], + "deleteRule": true, "isContainer:[('didName', '/Cosmics/ComissioningHI-PromptReco-v1/RECO#00ad285a-0d7b-11e1-9b6c-003048caaace')]": false, "isContainer:[('didName', '/Cosmics/ComissioningHI-PromptReco-v1/RECO#14ad7dfe-0acf-11e1-8347-003048caaace')]": false, "isContainer:[('didName', '/Cosmics/ComissioningHI-PromptReco-v1/RECO#1bb7b9b6-0d95-11e1-9b6c-003048caaace')]": false, @@ -1499,4 +1512,4 @@ "isContainer:[('didName', '/MinimumBias/ComissioningHI-v1/RAW#e2ed5d7a-0c09-11e1-b764-003048caaace')]": false, "isContainer:[('didName', '/MinimumBias/ComissioningHI-v1/RAW#f29b82f0-0c50-11e1-b764-003048caaace')]": false, "isContainer:[('didName', '/MinimumBias/ComissioningHI-v1/RAW')]": true -} \ No newline at end of file +} diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupData_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupData_t.py index 62a7084ccf..a85e37e69b 100644 --- a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupData_t.py +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupData_t.py @@ -19,10 +19,12 @@ class MSPileupTest(unittest.TestCase): def setUp(self): """setup unit test class""" + self.validRSEs = ['rse1'] msConfig = {'reqmgr2Url': 'http://localhost', 'rucioAccount': 'wmcore_mspileup', 'rucioUrl': 'http://cms-rucio-int.cern.ch', 'rucioAuthUrl': 'https://cms-rucio-auth-int.cern.ch', + 'validRSEs': self.validRSEs, 'mongoDB': 'msPileupDB', 'mongoDBCollection': 'msPileupDBCollection', 'mongoDBServer': 'mongodb://localhost', @@ -74,10 +76,10 @@ def testDocs(self): skeys = ['_id'] pname = '/skldjflksdjf/skldfjslkdjf/PREMIX' now = int(time.mktime(time.gmtime())) - expectedRSEs = [] + expectedRSEs = self.validRSEs fullReplicas = 1 pileupSize = 1 - ruleList = [] + ruleIds = [] campaigns = [] containerFraction = 0.0 replicationGrouping = "ALL" @@ -97,7 +99,7 @@ def testDocs(self): 'deactivatedOn': now, 'active': True, 'pileupSize': pileupSize, - 'ruleList': ruleList} + 'ruleIds': ruleIds} out = self.mgr.createPileup(pdict) self.assertEqual(len(out), 0) diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupObj_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupObj_t.py index 1148ebfbea..2aaffebd22 100644 --- a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupObj_t.py +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupObj_t.py @@ -39,7 +39,7 @@ def testMSPileupObj(self): 'replicationGrouping': "ALL", 'active': True, 'pileupSize': 0, - 'ruleList': []} + 'ruleIds': []} obj = MSPileupObj(data, validRSEs=expectedRSEs) for key in ['insertTime', 'lastUpdateTime', 'activatedOn', 'deactivatedOn']: self.assertNotEqual(obj.data[key], 0) @@ -90,7 +90,7 @@ def testWrongMSPileupObj(self): 'replicationGrouping': "", 'active': True, 'pileupSize': 0, - 'ruleList': []} + 'ruleIds': []} try: MSPileupObj(data) self.assertIsNone(1, msg="MSPileupObj should not be created") diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupReport_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupReport_t.py new file mode 100644 index 0000000000..1408194280 --- /dev/null +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupReport_t.py @@ -0,0 +1,50 @@ +""" +File : MSPileupTasks_t.py +Author : Valentin Kuznetsov +Description: Unit tests for MicorService/MSPileup/MSPileupReport.py module +""" + +# system modules +import unittest + +# WMCore modules +from WMCore.MicroService.MSPileup.MSPileupReport import MSPileupReport + + +class MSPileupReportTest(unittest.TestCase): + """Unit test for MSPileupTasks module""" + + def testMSPileupReport(self): + """ + Test MSPileup report functionality + """ + mgr = MSPileupReport(autoCleanup=True) + entry = 'test' + task = 'tast' + uuid = '123' + mgr.addEntry(task, uuid, entry) + + # check documents functionality + docs = mgr.getDocuments() + self.assertEqual(len(docs), 1) + self.assertEqual(docs[0]['entry'], entry) + self.assertEqual(docs[0]['task'], task) + self.assertEqual(docs[0]['uuid'], uuid) + + # check dictionary funtionality + rdict = mgr.getReportByUuid() + self.assertEqual(uuid in rdict, True) + self.assertEqual(len(rdict[uuid]), 1) + self.assertEqual(entry in rdict[uuid][0], True) + self.assertEqual(uuid in rdict[uuid][0], True) + self.assertEqual(task in rdict[uuid][0], True) + + # test purge functionality + mgr = MSPileupReport(autoExpire=-1, autoCleanup=True) + mgr.addEntry(task, uuid, entry) + docs = mgr.getDocuments() + self.assertEqual(len(docs), 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupTasks_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupTasks_t.py new file mode 100644 index 0000000000..39cc2c8591 --- /dev/null +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupTasks_t.py @@ -0,0 +1,233 @@ +""" +File : MSPileupTasks_t.py +Author : Valentin Kuznetsov +Description: Unit tests for MicorService/MSPileup/MSPileupTasks.py module +""" + +# system modules +import os +import logging +import unittest + +# rucio modules +from rucio.client import Client + +# WMCore modules +from WMQuality.Emulators.RucioClient.MockRucioApi import MockRucioApi +from WMQuality.Emulators.EmulatedUnitTestCase import EmulatedUnitTestCase +from WMCore.MicroService.MSPileup.MSPileupTasks import MSPileupTasks +from WMCore.MicroService.MSPileup.MSPileupData import MSPileupData +from WMCore.MicroService.Tools.Common import getMSLogger +from WMCore.Services.Rucio.Rucio import Rucio + + +class TestRucioClient(Client): + """Fake implementation for Rucio client""" + def __init__(self, account='test', logger=None, state='OK'): + self.account = 'test' + if logger: + self.logger = logger + else: + self.logger = getMSLogger(False) + self.state = state + self.doc = {'id': '123', 'rse_expression': 'T2_XX_CERN', 'state': self.state} + + def list_replication_rules(self, kwargs): + """Immitate list_replication_rules Rucio client API""" + # we will mock rucio doc based on provided state + # the states are: OK, REPLICATING, STUCK, SUSPENDED, WAITING_APPROVAL, INJECT + doc = dict(self.doc) + if self.state != 'OK': + doc['locks_ok_cnt'] = 2 + doc['locks_replicating_cnt'] = 2 + doc['locks_stuck_cnt'] = 2 + for key, val in kwargs.items(): + doc[key] = val + docs = [doc] + self.logger.info("### TestRucioClient docs %s", docs) + if 'Exception' in self.state: + raise Exception(self.state) + return docs + + def delete_replication_rule(self, rid): + """Immidate delete replication_rule Rucio client API""" + msg = f"delete rule ID {rid}" + self.logger.info(msg) + + def add_replication_rule(self, dids, copies, rses, **kwargs): + """Immitate add replication rule Rucio client API""" + msg = f"add replication rule for {dids}, copies {copies} rses {rses}" + self.logger.info(msg) + + def list_rses(self, rseExpression): + """Immitate get list_rses Rucio client API""" + for rse in ['rse1', 'rse2', 'T2_XX_CERN']: + yield {'rse': rse} + + def get_rse_usage(self, rse): + """Immitate get rse usage Rucio client API""" + # it is a generator which provides information about given RSE + doc = {'id': '123', 'source': 'unavailable', + 'used': 440245583794751, + 'free': None, + 'total': 440245583794751, 'files': 138519, + 'rse': rse} + yield doc + + +class MSPileupTasksTest(EmulatedUnitTestCase): + """Unit test for MSPileupTasks module""" + + def setUp(self): + """ + setup Unit tests + """ + # we will define log stream to capture everything that goes to the log stream + self.logger = logging.getLogger() + + # setup rucio client + self.rucioAccount = 'ms-pileup' + self.hostUrl = 'http://cms-rucio.cern.ch' + self.authUrl = 'https://cms-rucio-auth.cern.ch' + creds = {"client_cert": os.getenv("X509_USER_CERT", "Unknown"), + "client_key": os.getenv("X509_USER_KEY", "Unknown")} + configDict = {'rucio_host': self.hostUrl, 'auth_host': self.authUrl, + 'creds': creds, 'auth_type': 'x509'} + + # setup rucio wrapper + testRucioClient = TestRucioClient(logger=self.logger, state='OK') + self.rucioClient = Rucio(self.rucioAccount, configDict=configDict, client=testRucioClient) + + expectedRSEs = ['rse1', 'rse2'] + + # setup pileup data manager + msConfig = {'reqmgr2Url': 'http://localhost', + 'validRSEs': expectedRSEs, + 'rucioAccount': 'wmcore_mspileup', + 'rucioUrl': 'http://cms-rucio-int.cern.ch', + 'rucioAuthUrl': 'https://cms-rucio-auth-int.cern.ch', + 'mongoDB': 'msPileupDB', + 'mongoDBCollection': 'msPileupDBCollection', + 'mongoDBServer': 'mongodb://localhost', + 'mongoDBReplicaSet': '', + 'mongoDBUser': None, + 'mongoDBPassword': None, + 'mockMongoDB': True} + self.mgr = MSPileupData(msConfig, skipRucio=True) + + # setup our pileup data + self.pname = '/primary/processed/PREMIX' + pname = self.pname + fullReplicas = 3 + campaigns = ['c1', 'c2'] + data = { + 'pileupName': pname, + 'pileupType': 'classic', + 'expectedRSEs': expectedRSEs, + 'currentRSEs': expectedRSEs, + 'fullReplicas': fullReplicas, + 'campaigns': campaigns, + 'containerFraction': 0.0, + 'replicationGrouping': "ALL", + 'active': True, + 'pileupSize': 0, + 'ruleIds': ['rse1']} + self.data = data + + self.mgr.createPileup(data) + + # add more docs similar in nature but with different size + data['pileupName'] = pname.replace('processed', 'processed-2') + self.mgr.createPileup(data) + data['pileupName'] = pname.replace('processed', 'processed-3') + self.mgr.createPileup(data) + + def testMSPileupTasks(self): + """ + Unit test for MSPileupTasks + """ + self.logger.info("---------- CHECK for state=OK -----------") + + obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient) + obj.monitoringTask() + + # we added three pileup documents and should have update at least one of them + # in our report, so we check for update pileup message in report + report = obj.getReport() + found = False + for doc in report.getDocuments(): + if 'update pileup' in doc['entry']: + found = True + self.assertEqual(found, True) + + # at this step the T2_XX_CERN should be added to currentRSEs as it is provided + # by TestRucioClient class via list_replication_rules + spec = {'pileupName': self.pname} + results = self.mgr.getPileup(spec) + self.assertEqual(len(results), 1) + doc = results[0] + self.assertEqual('T2_XX_CERN' in doc['currentRSEs'], True) + obj.activeTask() + obj.inactiveTask() + + # get report documents and log them accordingly + report = obj.getReport() + for uuid, entries in report.getReportByUuid().items(): + msg = f"-------- task {uuid} --------" + self.logger.info(msg) + for item in entries: + self.logger.info(item) + + # now we can test non OK state in Rucio + self.logger.info("---------- CHECK for state=STUCK -----------") + self.rucioClient = TestRucioClient(logger=self.logger, state='STUCK') + obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient) + obj.monitoringTask() + # at this step the T2_XX_CERN should NOT be added to currentRSEs + spec = {'pileupName': self.pname} + results = self.mgr.getPileup(spec) + self.assertEqual(len(results), 1) + doc = results[0] + self.assertEqual('T2_XX_CERN' in doc['currentRSEs'], False) + obj.activeTask() + obj.inactiveTask() + + # now we can test how our code will behave with rucio exceptions + self.logger.info("---------- CHECK for state=CustomException -----------") + + # we use CustomException for state to check how our code will + # handle Rucio API exceptions + self.rucioClient = TestRucioClient(logger=self.logger, state='CustomException') + obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient) + obj.monitoringTask() + + def testMSPileupTasksWithMockApi(self): + """ + Unit test for MSPileupTasks with RucioMockApi + """ + # we may take some mock data from + # https://github.com/dmwm/WMCore/blob/master/test/data/Mock/RucioMockData.json + # e.g. /MinimumBias/ComissioningHI-v1/RAW' dataset + pname = '/MinimumBias/ComissioningHI-v1/RAW' + data = dict(self.data) + data['pileupName'] = pname + self.mgr.createPileup(data) + + # now create mock rucio client + rucioClient = MockRucioApi(self.rucioAccount, hostUrl=self.hostUrl, authUrl=self.authUrl) + obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, rucioClient) + obj.monitoringTask() + obj.activeTask() + obj.inactiveTask() + + # we added new pileup document and should have update pileup message in report + report = obj.getReport() + found = False + for doc in report.getDocuments(): + if 'update pileup' in doc['entry']: + found = True + self.assertEqual(found, True) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileup_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileup_t.py index 86ce0c1fc1..66360a5080 100644 --- a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileup_t.py +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileup_t.py @@ -35,7 +35,6 @@ def setUp(self): 'mockMongoDB': True} self.mgr = MSPileup(msConfig) - self.pname = '/lksjdflksdjf/kljsdklfjsldfj/PREMIX' expectedRSEs = self.validRSEs fullReplicas = 0 @@ -51,7 +50,7 @@ def setUp(self): 'replicationGrouping': "ALL", 'active': True, 'pileupSize': 0, - 'ruleList': []} + 'ruleIds': []} obj = MSPileupObj(data, validRSEs=self.validRSEs) for key in ['insertTime', 'lastUpdateTime', 'activatedOn', 'deactivatedOn']: