diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupData_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupData_t.py index 62a7084ccf..a85e37e69b 100644 --- a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupData_t.py +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupData_t.py @@ -19,10 +19,12 @@ class MSPileupTest(unittest.TestCase): def setUp(self): """setup unit test class""" + self.validRSEs = ['rse1'] msConfig = {'reqmgr2Url': 'http://localhost', 'rucioAccount': 'wmcore_mspileup', 'rucioUrl': 'http://cms-rucio-int.cern.ch', 'rucioAuthUrl': 'https://cms-rucio-auth-int.cern.ch', + 'validRSEs': self.validRSEs, 'mongoDB': 'msPileupDB', 'mongoDBCollection': 'msPileupDBCollection', 'mongoDBServer': 'mongodb://localhost', @@ -74,10 +76,10 @@ def testDocs(self): skeys = ['_id'] pname = '/skldjflksdjf/skldfjslkdjf/PREMIX' now = int(time.mktime(time.gmtime())) - expectedRSEs = [] + expectedRSEs = self.validRSEs fullReplicas = 1 pileupSize = 1 - ruleList = [] + ruleIds = [] campaigns = [] containerFraction = 0.0 replicationGrouping = "ALL" @@ -97,7 +99,7 @@ def testDocs(self): 'deactivatedOn': now, 'active': True, 'pileupSize': pileupSize, - 'ruleList': ruleList} + 'ruleIds': ruleIds} out = self.mgr.createPileup(pdict) self.assertEqual(len(out), 0) diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupObj_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupObj_t.py index 1148ebfbea..2aaffebd22 100644 --- a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupObj_t.py +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupObj_t.py @@ -39,7 +39,7 @@ def testMSPileupObj(self): 'replicationGrouping': "ALL", 'active': True, 'pileupSize': 0, - 'ruleList': []} + 'ruleIds': []} obj = MSPileupObj(data, validRSEs=expectedRSEs) for key in ['insertTime', 'lastUpdateTime', 'activatedOn', 'deactivatedOn']: self.assertNotEqual(obj.data[key], 0) @@ -90,7 +90,7 @@ def testWrongMSPileupObj(self): 'replicationGrouping': "", 'active': True, 'pileupSize': 0, - 'ruleList': []} + 'ruleIds': []} try: MSPileupObj(data) self.assertIsNone(1, msg="MSPileupObj should not be created") diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupReport_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupReport_t.py new file mode 100644 index 0000000000..1408194280 --- /dev/null +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupReport_t.py @@ -0,0 +1,50 @@ +""" +File : MSPileupTasks_t.py +Author : Valentin Kuznetsov +Description: Unit tests for MicorService/MSPileup/MSPileupReport.py module +""" + +# system modules +import unittest + +# WMCore modules +from WMCore.MicroService.MSPileup.MSPileupReport import MSPileupReport + + +class MSPileupReportTest(unittest.TestCase): + """Unit test for MSPileupTasks module""" + + def testMSPileupReport(self): + """ + Test MSPileup report functionality + """ + mgr = MSPileupReport(autoCleanup=True) + entry = 'test' + task = 'tast' + uuid = '123' + mgr.addEntry(task, uuid, entry) + + # check documents functionality + docs = mgr.getDocuments() + self.assertEqual(len(docs), 1) + self.assertEqual(docs[0]['entry'], entry) + self.assertEqual(docs[0]['task'], task) + self.assertEqual(docs[0]['uuid'], uuid) + + # check dictionary funtionality + rdict = mgr.getReportByUuid() + self.assertEqual(uuid in rdict, True) + self.assertEqual(len(rdict[uuid]), 1) + self.assertEqual(entry in rdict[uuid][0], True) + self.assertEqual(uuid in rdict[uuid][0], True) + self.assertEqual(task in rdict[uuid][0], True) + + # test purge functionality + mgr = MSPileupReport(autoExpire=-1, autoCleanup=True) + mgr.addEntry(task, uuid, entry) + docs = mgr.getDocuments() + self.assertEqual(len(docs), 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupTasks_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupTasks_t.py new file mode 100644 index 0000000000..39cc2c8591 --- /dev/null +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupTasks_t.py @@ -0,0 +1,233 @@ +""" +File : MSPileupTasks_t.py +Author : Valentin Kuznetsov +Description: Unit tests for MicorService/MSPileup/MSPileupTasks.py module +""" + +# system modules +import os +import logging +import unittest + +# rucio modules +from rucio.client import Client + +# WMCore modules +from WMQuality.Emulators.RucioClient.MockRucioApi import MockRucioApi +from WMQuality.Emulators.EmulatedUnitTestCase import EmulatedUnitTestCase +from WMCore.MicroService.MSPileup.MSPileupTasks import MSPileupTasks +from WMCore.MicroService.MSPileup.MSPileupData import MSPileupData +from WMCore.MicroService.Tools.Common import getMSLogger +from WMCore.Services.Rucio.Rucio import Rucio + + +class TestRucioClient(Client): + """Fake implementation for Rucio client""" + def __init__(self, account='test', logger=None, state='OK'): + self.account = 'test' + if logger: + self.logger = logger + else: + self.logger = getMSLogger(False) + self.state = state + self.doc = {'id': '123', 'rse_expression': 'T2_XX_CERN', 'state': self.state} + + def list_replication_rules(self, kwargs): + """Immitate list_replication_rules Rucio client API""" + # we will mock rucio doc based on provided state + # the states are: OK, REPLICATING, STUCK, SUSPENDED, WAITING_APPROVAL, INJECT + doc = dict(self.doc) + if self.state != 'OK': + doc['locks_ok_cnt'] = 2 + doc['locks_replicating_cnt'] = 2 + doc['locks_stuck_cnt'] = 2 + for key, val in kwargs.items(): + doc[key] = val + docs = [doc] + self.logger.info("### TestRucioClient docs %s", docs) + if 'Exception' in self.state: + raise Exception(self.state) + return docs + + def delete_replication_rule(self, rid): + """Immidate delete replication_rule Rucio client API""" + msg = f"delete rule ID {rid}" + self.logger.info(msg) + + def add_replication_rule(self, dids, copies, rses, **kwargs): + """Immitate add replication rule Rucio client API""" + msg = f"add replication rule for {dids}, copies {copies} rses {rses}" + self.logger.info(msg) + + def list_rses(self, rseExpression): + """Immitate get list_rses Rucio client API""" + for rse in ['rse1', 'rse2', 'T2_XX_CERN']: + yield {'rse': rse} + + def get_rse_usage(self, rse): + """Immitate get rse usage Rucio client API""" + # it is a generator which provides information about given RSE + doc = {'id': '123', 'source': 'unavailable', + 'used': 440245583794751, + 'free': None, + 'total': 440245583794751, 'files': 138519, + 'rse': rse} + yield doc + + +class MSPileupTasksTest(EmulatedUnitTestCase): + """Unit test for MSPileupTasks module""" + + def setUp(self): + """ + setup Unit tests + """ + # we will define log stream to capture everything that goes to the log stream + self.logger = logging.getLogger() + + # setup rucio client + self.rucioAccount = 'ms-pileup' + self.hostUrl = 'http://cms-rucio.cern.ch' + self.authUrl = 'https://cms-rucio-auth.cern.ch' + creds = {"client_cert": os.getenv("X509_USER_CERT", "Unknown"), + "client_key": os.getenv("X509_USER_KEY", "Unknown")} + configDict = {'rucio_host': self.hostUrl, 'auth_host': self.authUrl, + 'creds': creds, 'auth_type': 'x509'} + + # setup rucio wrapper + testRucioClient = TestRucioClient(logger=self.logger, state='OK') + self.rucioClient = Rucio(self.rucioAccount, configDict=configDict, client=testRucioClient) + + expectedRSEs = ['rse1', 'rse2'] + + # setup pileup data manager + msConfig = {'reqmgr2Url': 'http://localhost', + 'validRSEs': expectedRSEs, + 'rucioAccount': 'wmcore_mspileup', + 'rucioUrl': 'http://cms-rucio-int.cern.ch', + 'rucioAuthUrl': 'https://cms-rucio-auth-int.cern.ch', + 'mongoDB': 'msPileupDB', + 'mongoDBCollection': 'msPileupDBCollection', + 'mongoDBServer': 'mongodb://localhost', + 'mongoDBReplicaSet': '', + 'mongoDBUser': None, + 'mongoDBPassword': None, + 'mockMongoDB': True} + self.mgr = MSPileupData(msConfig, skipRucio=True) + + # setup our pileup data + self.pname = '/primary/processed/PREMIX' + pname = self.pname + fullReplicas = 3 + campaigns = ['c1', 'c2'] + data = { + 'pileupName': pname, + 'pileupType': 'classic', + 'expectedRSEs': expectedRSEs, + 'currentRSEs': expectedRSEs, + 'fullReplicas': fullReplicas, + 'campaigns': campaigns, + 'containerFraction': 0.0, + 'replicationGrouping': "ALL", + 'active': True, + 'pileupSize': 0, + 'ruleIds': ['rse1']} + self.data = data + + self.mgr.createPileup(data) + + # add more docs similar in nature but with different size + data['pileupName'] = pname.replace('processed', 'processed-2') + self.mgr.createPileup(data) + data['pileupName'] = pname.replace('processed', 'processed-3') + self.mgr.createPileup(data) + + def testMSPileupTasks(self): + """ + Unit test for MSPileupTasks + """ + self.logger.info("---------- CHECK for state=OK -----------") + + obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient) + obj.monitoringTask() + + # we added three pileup documents and should have update at least one of them + # in our report, so we check for update pileup message in report + report = obj.getReport() + found = False + for doc in report.getDocuments(): + if 'update pileup' in doc['entry']: + found = True + self.assertEqual(found, True) + + # at this step the T2_XX_CERN should be added to currentRSEs as it is provided + # by TestRucioClient class via list_replication_rules + spec = {'pileupName': self.pname} + results = self.mgr.getPileup(spec) + self.assertEqual(len(results), 1) + doc = results[0] + self.assertEqual('T2_XX_CERN' in doc['currentRSEs'], True) + obj.activeTask() + obj.inactiveTask() + + # get report documents and log them accordingly + report = obj.getReport() + for uuid, entries in report.getReportByUuid().items(): + msg = f"-------- task {uuid} --------" + self.logger.info(msg) + for item in entries: + self.logger.info(item) + + # now we can test non OK state in Rucio + self.logger.info("---------- CHECK for state=STUCK -----------") + self.rucioClient = TestRucioClient(logger=self.logger, state='STUCK') + obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient) + obj.monitoringTask() + # at this step the T2_XX_CERN should NOT be added to currentRSEs + spec = {'pileupName': self.pname} + results = self.mgr.getPileup(spec) + self.assertEqual(len(results), 1) + doc = results[0] + self.assertEqual('T2_XX_CERN' in doc['currentRSEs'], False) + obj.activeTask() + obj.inactiveTask() + + # now we can test how our code will behave with rucio exceptions + self.logger.info("---------- CHECK for state=CustomException -----------") + + # we use CustomException for state to check how our code will + # handle Rucio API exceptions + self.rucioClient = TestRucioClient(logger=self.logger, state='CustomException') + obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient) + obj.monitoringTask() + + def testMSPileupTasksWithMockApi(self): + """ + Unit test for MSPileupTasks with RucioMockApi + """ + # we may take some mock data from + # https://github.com/dmwm/WMCore/blob/master/test/data/Mock/RucioMockData.json + # e.g. /MinimumBias/ComissioningHI-v1/RAW' dataset + pname = '/MinimumBias/ComissioningHI-v1/RAW' + data = dict(self.data) + data['pileupName'] = pname + self.mgr.createPileup(data) + + # now create mock rucio client + rucioClient = MockRucioApi(self.rucioAccount, hostUrl=self.hostUrl, authUrl=self.authUrl) + obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, rucioClient) + obj.monitoringTask() + obj.activeTask() + obj.inactiveTask() + + # we added new pileup document and should have update pileup message in report + report = obj.getReport() + found = False + for doc in report.getDocuments(): + if 'update pileup' in doc['entry']: + found = True + self.assertEqual(found, True) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileup_t.py b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileup_t.py index 86ce0c1fc1..66360a5080 100644 --- a/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileup_t.py +++ b/test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileup_t.py @@ -35,7 +35,6 @@ def setUp(self): 'mockMongoDB': True} self.mgr = MSPileup(msConfig) - self.pname = '/lksjdflksdjf/kljsdklfjsldfj/PREMIX' expectedRSEs = self.validRSEs fullReplicas = 0 @@ -51,7 +50,7 @@ def setUp(self): 'replicationGrouping': "ALL", 'active': True, 'pileupSize': 0, - 'ruleList': []} + 'ruleIds': []} obj = MSPileupObj(data, validRSEs=self.validRSEs) for key in ['insertTime', 'lastUpdateTime', 'activatedOn', 'deactivatedOn']: