-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
118 lines (98 loc) · 4.64 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import argparse
import os
import sys
import json
import time
import requests
import csv
from configparser import ConfigParser, ExtendedInterpolation
config = ConfigParser(interpolation=ExtendedInterpolation())
config.read('common_config/config.ini')
#Below function is used to check the input file is in csv format or not
def valid_file(param):
base, ext = os.path.splitext(param)
if ext.lower() not in ('.csv'):
raise argparse.ArgumentTypeError('File must have a csv extension')
return param
# creating the parser arguments
parser = argparse.ArgumentParser()
parser.add_argument('--env', required=True, help='Specify the environment')
parser.add_argument('--inputCsvFile', '--inputCsvFile', type=valid_file)
argument = parser.parse_args()
environment = argument.env
inputCsvFile = csv.DictReader(open(argument.inputCsvFile, 'rt'))
# Below function is used for environment check
def envCheck(environment):
try:
config[environment]
return True
except Exception as e:
print(e)
return False
# Below function is used to give the termination msg
def terminatingMessage(msg):
print(msg)
sys.exit()
# Cross checking the environment if not valid it wil give error as in else part
if envCheck(environment):
print("=================== Environment set to " + str(environment) + " =====================")
else:
terminatingMessage(str(environment) + " is an invalid environment")
#This function is used to generate the access token
def generateAccessToken():
try:
loginapibody = {
'email': config.get(environment,'email'),
'password': config.get(environment,'Password')
}
responseKeyClockUser = requests.post(url=config.get(environment, 'UserDevBaseUrl') + config.get(environment, 'login'), data=loginapibody)
if responseKeyClockUser.status_code == 200:
print("-------->Access token gereated successfully<--------")
responseJson = responseKeyClockUser.json()
accessTokenUser = responseJson['result']['access_token']
return accessTokenUser
else:
print("Error in generating Access token")
except requests.exceptions.RequestException as e:
print("Exception during API request:", e)
terminatingMessage("An error occurred during Access Token generation.")
if __name__ == "__main__":
access_token = generateAccessToken()
# this function is used to delete the user for mentoruser side
def mentoruserdeletion(requestedID, access_token):
headerKeyClockUser = {'X-auth-token': "bearer " + access_token}
mentoreuserurl = config.get(environment, 'UserDevBaseUrl') + config.get(environment, 'mentoreduser') + str(requestedID)
print("-------->mentorED user deletion user API calling<--------")
responsementoruerdel = requests.post(url=mentoreuserurl, headers=headerKeyClockUser)
return responsementoruerdel
# this function is used to delete the user for mentoruser mentoring side
def mentormentoringdeletion(requestedID, access_token):
headerKeyClockUser = {'X-auth-token': "bearer " + access_token}
mentoreuserurl = config.get(environment, 'MentoringBaseUrl') + config.get(environment, 'mentoring') + str(requestedID)
print("------->mentorED mentoring deletion user API calling<-------")
responsementoruerdel = requests.post(url=mentoreuserurl, headers=headerKeyClockUser)
return responsementoruerdel
# creating empty list
output_data = []
#iterating the requestedID
for row in inputCsvFile:
requested_id = row['requestedID']
# here we are calling the 2 functions which we defined and storing in respecting variable's
mentor_user_response = mentoruserdeletion(requested_id, access_token)
mentor_mentoring_response = mentormentoringdeletion(requested_id, access_token)
# Store response data in output_data list
if mentor_mentoring_response.status_code and mentor_user_response.status_code == 200:
print("------>mentorED user deletion success <------")
print("------->mentorED mentoring deletion success< ------")
output_data.append([
requested_id,mentor_user_response.text ,mentor_mentoring_response.text ,
print("---------->Execution completed. Output saved in outPutFile<--------")
])
else:
print("------->ERROR : please cross check the UUID which you have provided<-------")
# Write output_data to output CSV file
output_csv_filename = 'outPutFile.csv'
with open(output_csv_filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["requestedID", "mentorUserResponse", "mentorMentoringResponse"])
writer.writerows(output_data)