Skip to content

Commit

Permalink
Unit test for MSPileupTasks
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Feb 28, 2023
1 parent db3ef83 commit f9c2e57
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ class MSPileupTest(unittest.TestCase):

def setUp(self):
"""setup unit test class"""
self.validRSEs = ['rse1']
msConfig = {'reqmgr2Url': 'http://localhost',
'rucioAccount': 'wmcore_mspileup',
'rucioUrl': 'http://cms-rucio-int.cern.ch',
'rucioAuthUrl': 'https://cms-rucio-auth-int.cern.ch',
'validRSEs': self.validRSEs,
'mongoDB': 'msPileupDB',
'mongoDBCollection': 'msPileupDBCollection',
'mongoDBServer': 'mongodb://localhost',
Expand Down Expand Up @@ -74,10 +76,10 @@ def testDocs(self):
skeys = ['_id']
pname = '/skldjflksdjf/skldfjslkdjf/PREMIX'
now = int(time.mktime(time.gmtime()))
expectedRSEs = []
expectedRSEs = self.validRSEs
fullReplicas = 1
pileupSize = 1
ruleList = []
ruleIds = []
campaigns = []
containerFraction = 0.0
replicationGrouping = "ALL"
Expand All @@ -97,7 +99,7 @@ def testDocs(self):
'deactivatedOn': now,
'active': True,
'pileupSize': pileupSize,
'ruleList': ruleList}
'ruleIds': ruleIds}

out = self.mgr.createPileup(pdict)
self.assertEqual(len(out), 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def testMSPileupObj(self):
'replicationGrouping': "ALL",
'active': True,
'pileupSize': 0,
'ruleList': []}
'ruleIds': []}
obj = MSPileupObj(data, validRSEs=expectedRSEs)
for key in ['insertTime', 'lastUpdateTime', 'activatedOn', 'deactivatedOn']:
self.assertNotEqual(obj.data[key], 0)
Expand Down Expand Up @@ -90,7 +90,7 @@ def testWrongMSPileupObj(self):
'replicationGrouping': "",
'active': True,
'pileupSize': 0,
'ruleList': []}
'ruleIds': []}
try:
MSPileupObj(data)
self.assertIsNone(1, msg="MSPileupObj should not be created")
Expand Down
50 changes: 50 additions & 0 deletions test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupReport_t.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
File : MSPileupTasks_t.py
Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: Unit tests for MicorService/MSPileup/MSPileupReport.py module
"""

# system modules
import unittest

# WMCore modules
from WMCore.MicroService.MSPileup.MSPileupReport import MSPileupReport


class MSPileupReportTest(unittest.TestCase):
"""Unit test for MSPileupTasks module"""

def testMSPileupReport(self):
"""
Test MSPileup report functionality
"""
mgr = MSPileupReport(autoCleanup=True)
entry = 'test'
task = 'tast'
uuid = '123'
mgr.addEntry(task, uuid, entry)

# check documents functionality
docs = mgr.getDocuments()
self.assertEqual(len(docs), 1)
self.assertEqual(docs[0]['entry'], entry)
self.assertEqual(docs[0]['task'], task)
self.assertEqual(docs[0]['uuid'], uuid)

# check dictionary funtionality
rdict = mgr.getReportByUuid()
self.assertEqual(uuid in rdict, True)
self.assertEqual(len(rdict[uuid]), 1)
self.assertEqual(entry in rdict[uuid][0], True)
self.assertEqual(uuid in rdict[uuid][0], True)
self.assertEqual(task in rdict[uuid][0], True)

# test purge functionality
mgr = MSPileupReport(autoExpire=-1, autoCleanup=True)
mgr.addEntry(task, uuid, entry)
docs = mgr.getDocuments()
self.assertEqual(len(docs), 0)


if __name__ == '__main__':
unittest.main()
233 changes: 233 additions & 0 deletions test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileupTasks_t.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""
File : MSPileupTasks_t.py
Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: Unit tests for MicorService/MSPileup/MSPileupTasks.py module
"""

# system modules
import os
import logging
import unittest

# rucio modules
from rucio.client import Client

# WMCore modules
from WMQuality.Emulators.RucioClient.MockRucioApi import MockRucioApi
from WMQuality.Emulators.EmulatedUnitTestCase import EmulatedUnitTestCase
from WMCore.MicroService.MSPileup.MSPileupTasks import MSPileupTasks
from WMCore.MicroService.MSPileup.MSPileupData import MSPileupData
from WMCore.MicroService.Tools.Common import getMSLogger
from WMCore.Services.Rucio.Rucio import Rucio


class TestRucioClient(Client):
"""Fake implementation for Rucio client"""
def __init__(self, account='test', logger=None, state='OK'):
self.account = 'test'
if logger:
self.logger = logger
else:
self.logger = getMSLogger(False)
self.state = state
self.doc = {'id': '123', 'rse_expression': 'T2_XX_CERN', 'state': self.state}

def list_replication_rules(self, kwargs):
"""Immitate list_replication_rules Rucio client API"""
# we will mock rucio doc based on provided state
# the states are: OK, REPLICATING, STUCK, SUSPENDED, WAITING_APPROVAL, INJECT
doc = dict(self.doc)
if self.state != 'OK':
doc['locks_ok_cnt'] = 2
doc['locks_replicating_cnt'] = 2
doc['locks_stuck_cnt'] = 2
for key, val in kwargs.items():
doc[key] = val
docs = [doc]
self.logger.info("### TestRucioClient docs %s", docs)
if 'Exception' in self.state:
raise Exception(self.state)
return docs

def delete_replication_rule(self, rid):
"""Immidate delete replication_rule Rucio client API"""
msg = f"delete rule ID {rid}"
self.logger.info(msg)

def add_replication_rule(self, dids, copies, rses, **kwargs):
"""Immitate add replication rule Rucio client API"""
msg = f"add replication rule for {dids}, copies {copies} rses {rses}"
self.logger.info(msg)

def list_rses(self, rseExpression):
"""Immitate get list_rses Rucio client API"""
for rse in ['rse1', 'rse2', 'T2_XX_CERN']:
yield {'rse': rse}

def get_rse_usage(self, rse):
"""Immitate get rse usage Rucio client API"""
# it is a generator which provides information about given RSE
doc = {'id': '123', 'source': 'unavailable',
'used': 440245583794751,
'free': None,
'total': 440245583794751, 'files': 138519,
'rse': rse}
yield doc


class MSPileupTasksTest(EmulatedUnitTestCase):
"""Unit test for MSPileupTasks module"""

def setUp(self):
"""
setup Unit tests
"""
# we will define log stream to capture everything that goes to the log stream
self.logger = logging.getLogger()

# setup rucio client
self.rucioAccount = 'ms-pileup'
self.hostUrl = 'http://cms-rucio.cern.ch'
self.authUrl = 'https://cms-rucio-auth.cern.ch'
creds = {"client_cert": os.getenv("X509_USER_CERT", "Unknown"),
"client_key": os.getenv("X509_USER_KEY", "Unknown")}
configDict = {'rucio_host': self.hostUrl, 'auth_host': self.authUrl,
'creds': creds, 'auth_type': 'x509'}

# setup rucio wrapper
testRucioClient = TestRucioClient(logger=self.logger, state='OK')
self.rucioClient = Rucio(self.rucioAccount, configDict=configDict, client=testRucioClient)

expectedRSEs = ['rse1', 'rse2']

# setup pileup data manager
msConfig = {'reqmgr2Url': 'http://localhost',
'validRSEs': expectedRSEs,
'rucioAccount': 'wmcore_mspileup',
'rucioUrl': 'http://cms-rucio-int.cern.ch',
'rucioAuthUrl': 'https://cms-rucio-auth-int.cern.ch',
'mongoDB': 'msPileupDB',
'mongoDBCollection': 'msPileupDBCollection',
'mongoDBServer': 'mongodb://localhost',
'mongoDBReplicaSet': '',
'mongoDBUser': None,
'mongoDBPassword': None,
'mockMongoDB': True}
self.mgr = MSPileupData(msConfig, skipRucio=True)

# setup our pileup data
self.pname = '/primary/processed/PREMIX'
pname = self.pname
fullReplicas = 3
campaigns = ['c1', 'c2']
data = {
'pileupName': pname,
'pileupType': 'classic',
'expectedRSEs': expectedRSEs,
'currentRSEs': expectedRSEs,
'fullReplicas': fullReplicas,
'campaigns': campaigns,
'containerFraction': 0.0,
'replicationGrouping': "ALL",
'active': True,
'pileupSize': 0,
'ruleIds': ['rse1']}
self.data = data

self.mgr.createPileup(data)

# add more docs similar in nature but with different size
data['pileupName'] = pname.replace('processed', 'processed-2')
self.mgr.createPileup(data)
data['pileupName'] = pname.replace('processed', 'processed-3')
self.mgr.createPileup(data)

def testMSPileupTasks(self):
"""
Unit test for MSPileupTasks
"""
self.logger.info("---------- CHECK for state=OK -----------")

obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient)
obj.monitoringTask()

# we added three pileup documents and should have update at least one of them
# in our report, so we check for update pileup message in report
report = obj.getReport()
found = False
for doc in report.getDocuments():
if 'update pileup' in doc['entry']:
found = True
self.assertEqual(found, True)

# at this step the T2_XX_CERN should be added to currentRSEs as it is provided
# by TestRucioClient class via list_replication_rules
spec = {'pileupName': self.pname}
results = self.mgr.getPileup(spec)
self.assertEqual(len(results), 1)
doc = results[0]
self.assertEqual('T2_XX_CERN' in doc['currentRSEs'], True)
obj.activeTask()
obj.inactiveTask()

# get report documents and log them accordingly
report = obj.getReport()
for uuid, entries in report.getReportByUuid().items():
msg = f"-------- task {uuid} --------"
self.logger.info(msg)
for item in entries:
self.logger.info(item)

# now we can test non OK state in Rucio
self.logger.info("---------- CHECK for state=STUCK -----------")
self.rucioClient = TestRucioClient(logger=self.logger, state='STUCK')
obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient)
obj.monitoringTask()
# at this step the T2_XX_CERN should NOT be added to currentRSEs
spec = {'pileupName': self.pname}
results = self.mgr.getPileup(spec)
self.assertEqual(len(results), 1)
doc = results[0]
self.assertEqual('T2_XX_CERN' in doc['currentRSEs'], False)
obj.activeTask()
obj.inactiveTask()

# now we can test how our code will behave with rucio exceptions
self.logger.info("---------- CHECK for state=CustomException -----------")

# we use CustomException for state to check how our code will
# handle Rucio API exceptions
self.rucioClient = TestRucioClient(logger=self.logger, state='CustomException')
obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, self.rucioClient)
obj.monitoringTask()

def testMSPileupTasksWithMockApi(self):
"""
Unit test for MSPileupTasks with RucioMockApi
"""
# we may take some mock data from
# https://github.com/dmwm/WMCore/blob/master/test/data/Mock/RucioMockData.json
# e.g. /MinimumBias/ComissioningHI-v1/RAW' dataset
pname = '/MinimumBias/ComissioningHI-v1/RAW'
data = dict(self.data)
data['pileupName'] = pname
self.mgr.createPileup(data)

# now create mock rucio client
rucioClient = MockRucioApi(self.rucioAccount, hostUrl=self.hostUrl, authUrl=self.authUrl)
obj = MSPileupTasks(self.mgr, self.logger, self.rucioAccount, rucioClient)
obj.monitoringTask()
obj.activeTask()
obj.inactiveTask()

# we added new pileup document and should have update pileup message in report
report = obj.getReport()
found = False
for doc in report.getDocuments():
if 'update pileup' in doc['entry']:
found = True
self.assertEqual(found, True)


if __name__ == '__main__':
unittest.main()
3 changes: 1 addition & 2 deletions test/python/WMCore_t/MicroService_t/MSPileup_t/MSPileup_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def setUp(self):
'mockMongoDB': True}
self.mgr = MSPileup(msConfig)


self.pname = '/lksjdflksdjf/kljsdklfjsldfj/PREMIX'
expectedRSEs = self.validRSEs
fullReplicas = 0
Expand All @@ -51,7 +50,7 @@ def setUp(self):
'replicationGrouping': "ALL",
'active': True,
'pileupSize': 0,
'ruleList': []}
'ruleIds': []}

obj = MSPileupObj(data, validRSEs=self.validRSEs)
for key in ['insertTime', 'lastUpdateTime', 'activatedOn', 'deactivatedOn']:
Expand Down

0 comments on commit f9c2e57

Please sign in to comment.