diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..89d2ab4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,32 @@ +#Official Ubuntu Image +FROM ubuntu:20.04 +#Install necessary packages +RUN apt-get update && \ + DEBIAN_FRONTEND=noninteractive \ + apt-get install -y \ + openjdk-8-jdk \ + software-properties-common \ + python3-pip \ + python3-venv \ + python3-virtualenv \ + zip \ + unzip \ + acl +#Create the User +RUN useradd -m -s /bin/bash analytics +RUN mkdir -p /opt/sparkjobs/ml-analytics-service/logs/observation/evidence +RUN mkdir -p /opt/sparkjobs/ml-analytics-service/logs/observation/status +RUN mkdir -p /opt/sparkjobs/ml-analytics-service/logs/project +RUN mkdir -p /opt/sparkjobs/ml-analytics-service/logs/project/evidence +RUN mkdir -p /opt/sparkjobs/ml-analytics-service/logs/survey +RUN mkdir -p /opt/sparkjobs/ml-analytics-service/logs/survey/evidence && chown -R analytics:analytics /opt/sparkjobs/ml-analytics-service +RUN chmod +rwx -R /opt/sparkjobs/ml-analytics-service/ +COPY . /opt/sparkjobs/ml-analytics-service/ +COPY faust.sh /opt/sparkjobs/faust_as_service/faust.sh +RUN chown -R analytics:analytics /opt/sparkjobs/ml-analytics-service +USER analytics +WORKDIR /opt/sparkjobs/ml-analytics-service +RUN virtualenv spark_venv +RUN /opt/sparkjobs/ml-analytics-service/spark_venv/bin/pip install --upgrade -r /opt/sparkjobs/ml-analytics-service/requirements.txt +COPY start-services.sh . +CMD ./start-services.sh diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000..348ec1c --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,42 @@ +node('build-slave') { + try { + String ANSI_GREEN = "\u001B[32m" + String ANSI_NORMAL = "\u001B[0m" + String ANSI_BOLD = "\u001B[1m" + String ANSI_RED = "\u001B[31m" + String ANSI_YELLOW = "\u001B[33m" + ansiColor('xterm') { + timestamps { + stage('Checkout') { + if (!env.hub_org) { + println(ANSI_BOLD + ANSI_RED + "Uh Oh! Please set a Jenkins environment variable named hub_org with value as registery/sunbidrded" + ANSI_NORMAL) + error 'Please resolve the errors and rerun..' + } else + println(ANSI_BOLD + ANSI_GREEN + "Found environment variable named hub_org with value as: " + hub_org + ANSI_NORMAL) + } + // cleanWs() + checkout scm + commit_hash = sh(script: 'git rev-parse --short HEAD', returnStdout: true).trim() + build_tag = sh(script: "echo " + params.github_release_tag.split('/')[-1] + "_" + commit_hash + "_" + env.BUILD_NUMBER, returnStdout: true).trim() + echo "build_tag: " + build_tag + } + stage('Build') { + env.NODE_ENV = "build" + print "Environment will be : ${env.NODE_ENV}" + sh('git submodule update --init') + sh('git submodule update --init --recursive --remote') + sh('chmod 777 build.sh') + sh("./build.sh ${build_tag} ${env.NODE_NAME} ${hub_org}") + } + + stage('ArchiveArtifacts') { + archiveArtifacts "metadata.json" + currentBuild.description = "${build_tag}" + } + } + } + catch (err) { + currentBuild.result = "FAILURE" + throw err + } +} diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..092ee5c --- /dev/null +++ b/build.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# Build script +set -eo pipefail +build_tag=$1 +name=ml-analytics-service +node=$2 +org=$3 +docker build -f ./Dockerfile --label commitHash=$(git rev-parse --short HEAD) -t ${org}/${name}:${build_tag} . +echo {\"image_name\" : \"${name}\", \"image_tag\" : \"${build_tag}\", \"node_name\" : \"$node\"} > metadata.json diff --git a/faust.sh b/faust.sh new file mode 100755 index 0000000..2628a7b --- /dev/null +++ b/faust.sh @@ -0,0 +1,4 @@ +#!/bin/sh +export LANG=C.UTF-8 +export LC_ALL=C.UTF-8 +/opt/sparkjobs/ml-analytics-service/spark_venv/bin/python /opt/sparkjobs/ml-analytics-service/$1.py --workdir /opt/sparkjobs/ml-analytics-service/$2 worker -l info diff --git a/observations/observation_realtime_streaming.py b/observations/observation_realtime_streaming.py new file mode 100755 index 0000000..a82f4c8 --- /dev/null +++ b/observations/observation_realtime_streaming.py @@ -0,0 +1,1080 @@ +# ----------------------------------------------------------------- +# Name : observation_realtime_streaming.py +# Author : Prashant +# Description : Program to read data from one kafka topic and +# produce it to another kafka topic +# ----------------------------------------------------------------- + +import faust +import time, re +import logging +import os, json +import datetime +import requests +from bson.objectid import ObjectId +from kafka import KafkaConsumer, KafkaProducer +from configparser import ConfigParser,ExtendedInterpolation +from logging.handlers import TimedRotatingFileHandler, RotatingFileHandler +from pydruid.client import * +from pydruid.db import connect +from pydruid.query import QueryBuilder +from pydruid.utils.aggregators import * +from pydruid.utils.filters import Dimension +from urllib.parse import urlparse + +config_path = os.path.split(os.path.dirname(os.path.abspath(__file__))) +config = ConfigParser(interpolation=ExtendedInterpolation()) +config.read(config_path[0] + "/config.ini") + +# date formating +current_date = datetime.date.today() +formatted_current_date = current_date.strftime("%d-%B-%Y") +number_of_days_logs_kept = current_date - datetime.timedelta(days=7) +number_of_days_logs_kept = number_of_days_logs_kept.strftime("%d-%B-%Y") + +# file path for log +file_path_for_output_and_debug_log = config.get('LOGS', 'observation_streaming_success_error') +file_name_for_output_log = f"{file_path_for_output_and_debug_log}{formatted_current_date}-output.log" +file_name_for_debug_log = f"{file_path_for_output_and_debug_log}{formatted_current_date}-debug.log" + +# Remove old log entries +files_with_date_pattern = [file +for file in os.listdir(file_path_for_output_and_debug_log) +if re.match(r"\d{2}-\w+-\d{4}-*", +file)] + +for file_name in files_with_date_pattern: + file_path = os.path.join(file_path_for_output_and_debug_log, file_name) + if os.path.isfile(file_path): + file_date = file_name.split('.')[0] + date = file_date.split('-')[0] + '-' + file_date.split('-')[1] + '-' + file_date.split('-')[2] + if date < number_of_days_logs_kept: + os.remove(file_path) + + +formatter = logging.Formatter('%(asctime)s - %(levelname)s') + +# Handler for output and debug Log +output_logHandler = RotatingFileHandler(f"{file_name_for_output_log}") +output_logHandler.setFormatter(formatter) + +debug_logHandler = RotatingFileHandler(f"{file_name_for_debug_log}") +debug_logHandler.setFormatter(formatter) + +# Add the successLoger +successLogger = logging.getLogger('success log') +successLogger.setLevel(logging.DEBUG) +successBackuphandler = TimedRotatingFileHandler(f"{file_name_for_output_log}", when="w0",backupCount=1) +successLogger.addHandler(output_logHandler) +successLogger.addHandler(successBackuphandler) + +# Add the Errorloger +errorLogger = logging.getLogger('error log') +errorLogger.setLevel(logging.ERROR) +errorBackuphandler = TimedRotatingFileHandler(f"{file_name_for_output_log}",when="w0",backupCount=1) +errorLogger.addHandler(output_logHandler) +errorLogger.addHandler(errorBackuphandler) + +# Add the Infologer +infoLogger = logging.getLogger('info log') +infoLogger.setLevel(logging.INFO) +debug_logBackuphandler = TimedRotatingFileHandler(f"{file_name_for_debug_log}",when="w0",backupCount=1) +infoLogger.addHandler(debug_logHandler) +infoLogger.addHandler(debug_logBackuphandler) + +domArr = [] + +kafka_url = config.get("KAFKA", "url") +#consume the message from kafka topic +app = faust.App( + 'ml_observation_faust', + broker='kafka://'+kafka_url, + value_serializer='raw', + web_port=7001, + broker_max_poll_records=500 +) +rawTopicName = app.topic(config.get("KAFKA", "observation_raw_topic")) +producer = KafkaProducer(bootstrap_servers=[kafka_url]) + +# # Define function to check if observation submission Id exists in Druid +def check_observation_submission_id_existance(observationId,column_name,table_name): + try: + # Establish connection to Druid + url = config.get("DRUID","sql_url") + url = str(url) + parsed_url = urlparse(url) + + host = parsed_url.hostname + port = int(parsed_url.port) + path = parsed_url.path + scheme = parsed_url.scheme + + conn = connect(host=host, port=port, path=path, scheme=scheme) + cur = conn.cursor() + response = check_datasource_existence(table_name) + if response == True: + # Query to check existence of observation submission Id in Druid table + query = f"SELECT COUNT(*) FROM \"{table_name}\" WHERE \"{column_name}\" = '{observationId}'" + cur.execute(query) + result = cur.fetchone() + count = result[0] + infoLogger.info(f"Found {count} entires in {table_name}") + # if count == 0 means observation_submission_id not exits in the datasource + # if count > 0 means observation_submission_id exits in datasource + if count == 0: + return True + else: + return False + else: + # Since the table doesn't exist, return True to allow data insertion initially + return True + except Exception as e: + # Log any errors that occur during Druid query execution + errorLogger.error(f"Error checking observation_submission_id existence in Druid: {e}") + +def check_datasource_existence(datasource_name): + try : + host = config.get('DRUID', 'datasource_url') + response = requests.get(host) + if response.status_code == 200: + datasources = response.json() + if datasource_name in datasources : + return True + else : + return False + except requests.RequestException as e: + errorLogger.error(f"Error fetching datasources: {e}") + + +def flatten_json(y): + out = {} + + def flatten(x, name=''): + # If the Nested key-value pair is of dict type + if isinstance(x, dict): + for a in x: + flatten(x[a], name + a + '-') + + # If the Nested key-value pair is of list type + elif isinstance(x, list): + if not x: # Check if the list is empty + out[name[:-1]] = "null" + else: + for i, a in enumerate(x): + flatten(a, name + str(i) + '-') + + # If the Nested key-value pair is of other types + else: + # Replace None, empty string, or empty list with "null" + if x is None or x == '' or x == []: + out[name[:-1]] = "null" + else: + out[name[:-1]] = x + + flatten(y) + return out + +def orgName(val): + orgarr = [] + if val is not None: + for org in val: + orgObj = {} + if org["isSchool"] == False: + orgObj['orgId'] = org['organisationId'] + orgObj['orgName'] = org["orgName"] + orgarr.append(orgObj) + return orgarr + +try: + #initialising the values + class node: + #Construction of Node with component,status and children + def _init_(self, type=None, externalId=None, name=None, children=None): + self.type = type + self.externalId = externalId + self.name = name + if children is None: + self.children = [] + else: + self.children = children + + + #Construction of tree through recursion + class implementation: + def buildnode(self, ob, parent, ansCriteriaId): + node1= node() + node1.type=ob['type'] + node1.externalId=ob['externalId'] + node1.name=ob['name'] + node1.parent = parent + node1.children=[] + + if (node1.type == 'criteria') and (node1.externalId == ansCriteriaId ): + criteriaObj = {} + criteriaObj['type'] = node1.type + criteriaObj['externalId'] = str(node1.externalId) + criteriaObj['name'] = node1.name + criteriaObj['parent'] = parent + domArr.append(criteriaObj) + + try: + for children in ob['children']: + parent = ob['name'] + node1.children.append(self.buildnode(children,parent,ansCriteriaId)) + except KeyError: + if ob['criteria']: + for cri in ob['criteria']: + if str(cri['criteriaId']) == ansCriteriaId : + criObj = {} + criObj['type'] = 'criteria' + criObj['externalId'] = str(cri['criteriaId']) + criObj['name']='' + criObj['parent']=ob['name'] + domArr.append(criObj) + val = len(domArr) + arr = domArr[0:val] + return arr +except Exception as e: + errorLogger.error(e, exc_info=True) + +try: + def obj_creation(obSub): + # Debug log for survey submission ID + infoLogger.info(f"Started to process kafka event for the observation Submission Id : {obSub['_id']}. For Observation Question report") + observationSubmissionId = str(obSub['_id']) + if check_observation_submission_id_existance(observationSubmissionId,"observationSubmissionId","sl-observation"): + infoLogger.info(f"No data duplection for the Submission ID : {observationSubmissionId} in sl-observation ") + if obSub['status'] == 'completed': + if 'isAPrivateProgram' in obSub : + completedDate = None + try: + completedDate = obSub['completedDate'] + except KeyError: + pass + createdAt = obSub['createdAt'] + updatedAt = obSub['updatedAt'] + evidencesArr = [ v for v in obSub['evidences'].values() ] + evidence_sub_count = 0 + entityId = obSub['entityId'] + + userSubType = None + try : + if 'userRoleInformation' in obSub: + userSubType = obSub["userRoleInformation"]["role"] + except KeyError: + userSubType = '' + + rootOrgId = None + boardName = None + user_type = None + try: + if obSub["userProfile"] : + if "rootOrgId" in obSub["userProfile"] and obSub["userProfile"]["rootOrgId"]: + rootOrgId = obSub["userProfile"]["rootOrgId"] + if "framework" in obSub["userProfile"] and obSub["userProfile"]["framework"]: + if "board" in obSub["userProfile"]["framework"] and len(obSub["userProfile"]["framework"]["board"]) > 0: + boardName = ",".join(obSub["userProfile"]["framework"]["board"]) + try: + temp_userType = set([types["type"] for types in obSub["userProfile"]["profileUserTypes"]]) + user_type = ", ".join(temp_userType) + except KeyError: + pass + + except KeyError : + pass + obsAppName = None + try : + obsAppName = obSub["appInformation"]["appName"].lower() + except KeyError : + obsAppName = '' + userRolesArrUnique = [] + roleObj = {} + roleObj["roleTitle"] = userSubType + roleObj["userBoardName"] = boardName + roleObj["userType"] = user_type + userRolesArrUnique.append(roleObj) + try: + orgArr = orgName(obSub["userProfile"]["organisations"]) + if len(orgArr) >0: + for org in orgArr: + for obj in userRolesArrUnique: + obj["organisationId"] = org["orgId"] + obj["organisationName"] = org["orgName"] + except KeyError: + pass + + if 'answers' in obSub.keys() : + answersArr = [ v for v in obSub['answers'].values()] + for ans in answersArr: + try: + if len(ans['fileName']): + evidence_sub_count = evidence_sub_count + len(ans['fileName']) + except KeyError: + evidence_sub_count = 0 + for ans in answersArr: + def sequenceNumber(externalId, answer, answerSection, solutionObj): + try: + for num in range(len(solutionObj['questionSequenceByEcm']['OB']['S1'])): + if solutionObj['questionSequenceByEcm']['OB']['S1'][num] == externalId: + return num + 1 + except KeyError: + return '' + + def creatingObj(answer, quesexternalId, ans_val, instNumber, responseLabel, usrRolFn): + observationSubQuestionsObj = {} + observationSubQuestionsObj['observationSubmissionId'] = str(obSub['_id']) + observationSubQuestionsObj['appName'] = obsAppName + try: + if obSub["isRubricDriven"] == True and obSub["criteriaLevelReport"] == True: + observationSubQuestionsObj['solutionType'] = "observation_with_rubric" + elif obSub["isRubricDriven"] == True and obSub["criteriaLevelReport"] == False: + observationSubQuestionsObj['solutionType'] = "observation_with_rubric_no_criteria_level_report" + else: + observationSubQuestionsObj['solutionType'] = "observation_with_out_rubric" + except KeyError: + observationSubQuestionsObj['solutionType'] = "observation_with_out_rubric" + observationSubQuestionsObj['entity'] = str(obSub['entityId']) + observationSubQuestionsObj['entityExternalId'] = obSub['entityExternalId'] + # observationSubQuestionsObj['entityName'] = obSub['entityInformation']['name'] + + entityType =obSub['entityType'] + observationSubQuestionsObj[entityType] = str(obSub['entityId']) + observed_entities = {} + try: + for values in observed_entities: + observationSubQuestionsObj[f'{values["type"]}Name'] = values['name'] + observationSubQuestionsObj[f'{values["type"]}ExternalId'] = values['code'] + observationSubQuestionsObj[f'{values["type"]}'] = values['id'] + except KeyError: + pass + + observationSubQuestionsObj['createdBy'] = obSub['createdBy'] + + try: + observationSubQuestionsObj['isAPrivateProgram'] = obSub['isAPrivateProgram'] + except KeyError: + observationSubQuestionsObj['isAPrivateProgram'] = True + + try: + observationSubQuestionsObj['programExternalId'] = obSub['programExternalId'] + except KeyError : + observationSubQuestionsObj['programExternalId'] = '' + + try: + observationSubQuestionsObj['programId'] = str(obSub['programId']) + except KeyError : + observationSubQuestionsObj['programId'] = '' + + try: + observationSubQuestionsObj['programName'] = obSub['programInfo']['name'] + observationSubQuestionsObj['programDescription'] = obSub['programInfo']['description'] + except KeyError : + observationSubQuestionsObj['programName'] = '' + observationSubQuestionsObj['programDescription'] = '' + observationSubQuestionsObj['solutionExternalId'] = obSub['solutionExternalId'] + observationSubQuestionsObj['solutionId'] = str(obSub['solutionId']) + observationSubQuestionsObj['observationId'] = str(obSub['observationId']) + for critQues in obSub['criteria']: + if critQues['_id'] == answer['criteriaId'] : + observationSubQuestionsObj['criteriaExternalId'] = critQues['externalId'] + observationSubQuestionsObj['criteriaName'] = critQues['name'] + observationSubQuestionsObj['criteriaDescription'] = critQues['description'] + observationSubQuestionsObj['section'] = '' + solutionObj = {} + try : + if 'solutionInfo' in obSub.keys(): + solutionObj = obSub['solutionInfo'] + observationSubQuestionsObj['solutionName'] = str(solutionObj.get('name','')) + observationSubQuestionsObj['scoringSystem'] = str(solutionObj.get('scoringSystem','')) + observationSubQuestionsObj['solutionDescription'] = str(solutionObj.get('description','')) + observationSubQuestionsObj['questionSequenceByEcm'] = sequenceNumber(quesexternalId,answer,observationSubQuestionsObj['section'],obSub['solutionInfo']) + except KeyError: + observationSubQuestionsObj['solutionName'] = '' + observationSubQuestionsObj['scoringSystem'] = '' + observationSubQuestionsObj['solutionDescription'] = '' + observationSubQuestionsObj['questionSequenceByEcm'] = '' + + solutionObj = obSub['solutionInfo'] + try: + if solutionObj['scoringSystem'] == 'pointsBasedScoring': + try: + observationSubQuestionsObj['totalScore'] = obSub['pointsBasedMaxScore'] + except KeyError : + observationSubQuestionsObj['totalScore'] = '' + try: + observationSubQuestionsObj['scoreAchieved'] = obSub['pointsBasedScoreAchieved'] + except KeyError : + observationSubQuestionsObj['scoreAchieved'] = '' + try: + observationSubQuestionsObj['totalpercentage'] = obSub['pointsBasedPercentageScore'] + except KeyError : + observationSubQuestionsObj['totalpercentage'] = '' + + try: + observationSubQuestionsObj['maxScore'] = answer['maxScore'] + except KeyError : + observationSubQuestionsObj['maxScore'] = '' + + try: + observationSubQuestionsObj['minScore'] = answer['scoreAchieved'] + except KeyError : + observationSubQuestionsObj['minScore'] = '' + + try: + observationSubQuestionsObj['percentageScore'] = answer['percentageScore'] + except KeyError : + observationSubQuestionsObj['percentageScore'] = '' + + try: + observationSubQuestionsObj['pointsBasedScoreInParent'] = answer['pointsBasedScoreInParent'] + except KeyError : + observationSubQuestionsObj['pointsBasedScoreInParent'] = '' + except KeyError: + observationSubQuestionsObj['totalScore'] = '' + observationSubQuestionsObj['scoreAchieved'] = '' + observationSubQuestionsObj['totalpercentage'] = '' + observationSubQuestionsObj['maxScore'] = '' + observationSubQuestionsObj['minScore'] = '' + observationSubQuestionsObj['percentageScore'] = '' + observationSubQuestionsObj['pointsBasedScoreInParent'] = '' + + observationSubQuestionsObj['entityType'] = obSub['entityType'] + if 'observationInformation' in obSub : + if 'name' in obSub['observationInformation']: + observationSubQuestionsObj['observationName'] = obSub['observationInformation']['name'] + else : + observationSubQuestionsObj['observationName'] = '' + else : + observationSubQuestionsObj['observationName'] = '' + + observationSubQuestionsObj['questionId'] = str(answer['qid']) + observationSubQuestionsObj['questionAnswer'] = ans_val + observationSubQuestionsObj['questionResponseType'] = answer['responseType'] + if answer['responseType'] == 'number': + if responseLabel: + observationSubQuestionsObj['questionResponseLabelNumber'] = responseLabel + else: + observationSubQuestionsObj['questionResponseLabelNumber'] = 0 + try: + if responseLabel: + if answer['responseType'] == 'text': + observationSubQuestionsObj['questionResponseLabel'] = "'"+ re.sub("\n|\"","",responseLabel) +"'" + else : + observationSubQuestionsObj['questionResponseLabel'] = responseLabel + else: + observationSubQuestionsObj['questionResponseLabel'] = '' + except KeyError : + observationSubQuestionsObj['questionResponseLabel'] = '' + observationSubQuestionsObj['questionExternalId'] = quesexternalId + observationSubQuestionsObj['questionName'] = answer['question'][0] + observationSubQuestionsObj['questionECM'] = answer['evidenceMethod'] + observationSubQuestionsObj['criteriaId'] = str(answer['criteriaId']) + observationSubQuestionsObj['completedDate'] = completedDate + observationSubQuestionsObj['createdAt'] = createdAt + observationSubQuestionsObj['updatedAt'] = updatedAt + if answer['remarks'] : + observationSubQuestionsObj['remarks'] = "'"+ re.sub("\n|\"","",answer['remarks']) +"'" + else : + observationSubQuestionsObj['remarks'] = '' + if len(answer['fileName']): + multipleFiles = None + fileCnt = 1 + for filedetail in answer['fileName']: + if fileCnt == 1: + multipleFiles = filedetail['sourcePath'] + fileCnt = fileCnt + 1 + else: + multipleFiles = multipleFiles + ' , ' + filedetail['sourcePath'] + observationSubQuestionsObj['evidences'] = multipleFiles + observationSubQuestionsObj['evidenceCount'] = str(len(answer['fileName'])) + observationSubQuestionsObj['totalEvidences'] = evidence_sub_count + # to fetch the parent question of matrix + if ans['responseType']=='matrix': + observationSubQuestionsObj['instanceParentQuestion'] = ans['question'][0] + observationSubQuestionsObj['instanceParentId'] = ans['qid'] + observationSubQuestionsObj['instanceParentResponsetype'] =ans['responseType'] + observationSubQuestionsObj['instanceParentCriteriaId'] =ans['criteriaId'] + #here ans[criteriaId] == criteria['criteriaId] + for critQuesInst in obSub['criteria']: + if critQuesInst['_id'] == ans['criteriaId']: + observationSubQuestionsObj['instanceParentCriteriaExternalId'] = critQuesInst['externalId'] + observationSubQuestionsObj['instanceParentCriteriaExternalId'] = critQuesInst['name'] + observationSubQuestionsObj['instanceParentSection'] = '' + observationSubQuestionsObj['instanceId'] = instNumber + observationSubQuestionsObj['instanceParentExternalId'] = quesexternalId + observationSubQuestionsObj['instanceParentEcmSequence']= sequenceNumber(observationSubQuestionsObj['instanceParentExternalId'], answer, + observationSubQuestionsObj['instanceParentSection'], obSub['solutionInfo']) + else: + observationSubQuestionsObj['instanceParentQuestion'] = '' + observationSubQuestionsObj['instanceParentId'] = '' + observationSubQuestionsObj['instanceParentResponsetype'] ='' + observationSubQuestionsObj['instanceId'] = instNumber + observationSubQuestionsObj['instanceParentExternalId'] = '' + observationSubQuestionsObj['instanceParentEcmSequence'] = '' + observationSubQuestionsObj['channel'] = rootOrgId + observationSubQuestionsObj['parentChannel'] = "SHIKSHALOKAM" + ### Assessment Domain Logic - Start ### + domainArr = [] + for domain in obSub['themes']: + parent = None + builder = None + parent = domain['name'] + builder = implementation() + domObj = {} + try : + domObj['name'] = domain['name'] + domObj['type'] = domain['type'] + domObj['externalId']=str(domain['externalId']) + except KeyError: + domObj['name'] = '' + domObj['type'] = '' + domObj['externalId']='' + + try: + if domain['criteria']: + domObj['theme']=builder.buildnode(domain, parent, str(answer['criteriaId'])) + except KeyError: + domObj['theme'] = builder.buildnode(domain, parent, str(answer['criteriaId'])) + + domainArr.append(domObj) + domArr.clear() + + for dom in domainArr: + if dom['theme']: + for obj in dom['theme']: + try: + if obj['type'] == 'criteria': + if (str(obj['externalId']) == str(answer['criteriaId'])): + for criteria in obSub['criteria'] : + if str(criteria["_id"]) == str(answer['criteriaId']) : + obj['name'] = criteria['name'] + obj['score'] = criteria['score'] + try: + obj['score_achieved'] = criteria['scoreAchieved'] + except KeyError : + obj['score_achieved'] = '' + obj['description'] = criteria['description'] + try: + levelArray = [] + levelArray = criteria['rubric']['levels'].values() + for labelValue in levelArray: + if (str((criteria['score'])) == labelValue['level']): + obj['label'] = labelValue['label'] + except Exception: + obj['label'] = '' + + try: + prj_id = [] + title = [] + goal = [] + externalId =[] + + try: + for prj in criteria['improvement-projects']: + try: + prj_id.append(str(prj['_id'])) + except: + prj_id.append("") + try: + title.append(prj['title']) + except: + title.append("") + try: + goal.append(prj['goal']) + except: + goal.append("") + try: + externalId.append(prj['externalId']) + except: + externalId.append("") + except: + prj_id = [] + title = [] + goal = [] + externalId =[] + + try: + obj['imp_project_id'] = prj_id + except KeyError: + obj['imp_project_id'] = [] + try: + obj['imp_project_title'] = title + except KeyError: + obj['imp_project_title'] = [] + try : + obj['imp_project_goal'] = goal + except KeyError: + obj['imp_project_goal'] = [] + try: + obj['imp_project_externalId'] = externalId + except KeyError: + obj['imp_project_externalId'] = [] + except KeyError: + pass + if type(obj['externalId']) != str: + for cri in obSub['criteria']: + obj['externalId'] = cri['externalId'] + obj['name']=cri['name'] + obj['score']=cri['score'] + obj['score_achieved'] = criteria['scoreAchieved'] + obj['description'] = cri['description'] + try: + levelArray = [] + levelArray = cri['rubric']['levels'].values() + for labelValue in levelArray: + if (str((cri['score'])) == labelValue['level']): + obj['label'] = labelValue['label'] + except Exception: + obj['label'] = '' + except KeyError: + pass + for themes in domainArr: + for st in themes["theme"]: + if (st["type"] == "criteria") and (observationSubQuestionsObj['criteriaId'] == str(st["externalId"])): + observationSubQuestionsObj['domainName'] = themes['name'] + observationSubQuestionsObj['domainExternalId'] = themes['externalId'] + try : + for submTheme in obSub["themes"]: + if submTheme["externalId"] == themes['externalId'] : + observationSubQuestionsObj['domainLevel'] = submTheme["pointsBasedLevel"] + observationSubQuestionsObj['domainScore'] = submTheme["scoreAchieved"] + except KeyError : + observationSubQuestionsObj['domainLevel'] = '' + observationSubQuestionsObj['domainScore'] = '' + try : + for theme in obSub['themes']: + observationSubQuestionsObj['childName'] = theme['name'] + observationSubQuestionsObj['ancestorName'] = theme['parent'] + observationSubQuestionsObj['childType'] = theme['type'] + observationSubQuestionsObj['childExternalid'] = theme['externalId'] + except KeyError : + observationSubQuestionsObj['childName'] = '' + observationSubQuestionsObj['ancestorName'] = '' + observationSubQuestionsObj['childType'] = '' + observationSubQuestionsObj['childExternalid'] = '' + + try: + observationSubQuestionsObj['level'] = theme['score'] + except KeyError: + observationSubQuestionsObj['level'] = '' + + try: + observationSubQuestionsObj['criteriaScore'] = theme['score_achieved'] + except KeyError: + observationSubQuestionsObj['criteriaScore'] = '' + + try: + observationSubQuestionsObj['label'] = theme['label'] + except KeyError: + observationSubQuestionsObj['label'] = '' + + try: + if (len(theme['imp_project_id']) >=0): + for i in range(len(theme['imp_project_id'])): + observationSubQuestionsObj['imp_project_id'] = theme['imp_project_id'][i] + observationSubQuestionsObj['imp_project_title'] = theme['imp_project_title'][i] + observationSubQuestionsObj['imp_project_goal'] = theme['imp_project_goal'][i] + observationSubQuestionsObj['imp_project_externalId'] = theme['imp_project_externalId'][i] + except KeyError: + observationSubQuestionsObj['imp_project_id'] = "" + observationSubQuestionsObj['imp_project_title'] = "" + observationSubQuestionsObj['imp_project_goal'] = "" + observationSubQuestionsObj['imp_project_externalId'] = "" + + if usrRolFn : + observationSubQuestionsObj = {**usrRolFn, **observationSubQuestionsObj} + observationSubQuestionsObj["submissionNumber"] = obSub["submissionNumber"] + observationSubQuestionsObj["submissionTitle"] = obSub["title"] + try: + observationSubQuestionsObj["criteriaLevelReport"] = str(obSub["criteriaLevelReport"]) + except KeyError : + observationSubQuestionsObj["criteriaLevelReport"] = False + + try: + observationSubQuestionsObj["isRubricDriven"] = obSub["isRubricDriven"] + except KeyError : + observationSubQuestionsObj["isRubricDriven"] = False + + flatten_userprofile = flatten_json(obSub['userProfile']) + new_dict = {} + for key in flatten_userprofile: + string_without_integer = re.sub(r'\d+', '', key) + updated_string = string_without_integer.replace("--", "-") + # Check if the value associated with the key is not None + if flatten_userprofile[key] is not None: + if updated_string in new_dict: + # Perform addition only if both values are not None + if new_dict[updated_string] is not None: + new_dict[updated_string] += "," + str(flatten_userprofile[key]) + else: + new_dict[updated_string] = str(flatten_userprofile[key]) + else: + new_dict[updated_string] = str(flatten_userprofile[key]) + + observationSubQuestionsObj['userProfile'] = str(new_dict) + return observationSubQuestionsObj + + def fetchingQuestiondetails(ansFn, instNumber): + if (len(ansFn['options']) == 0) or (('options' in ansFn.keys()) == False): + try: + if(len(userRolesArrUnique)) > 0: + for usrRol in userRolesArrUnique : + finalObj = {} + finalObj = creatingObj( + ansFn,ansFn['externalId'], + ansFn['value'],instNumber, + ansFn['value'], + usrRol + ) + if finalObj["completedDate"]: + print(f"at line 730 == {finalObj}") + producer.send( + (config.get("KAFKA", "observation_druid_topic")), + json.dumps(finalObj).encode('utf-8') + ) + producer.flush() + infoLogger.info(f"Data for observationId ({finalObj['observationId']}) and questionId ({finalObj['questionId']}) inserted into sl-observation datasource") + else : + finalObj = {} + finalObj = creatingObj( + ansFn,ansFn['externalId'], + ansFn['value'], + instNumber, + ansFn['value'], + None + ) + if finalObj["completedDate"]: + print(f"at line 747 == {finalObj}") + producer.send( + (config.get("KAFKA", "observation_druid_topic")), + json.dumps(finalObj).encode('utf-8') + ) + producer.flush() + infoLogger.info(f"Data for observationId ({finalObj['observationId']}) and questionId ({finalObj['questionId']}) inserted into sl-observation datasource") + except KeyError: + pass + else: + labelIndex = 0 + for quesOpt in ansFn['options']: + try: + if type(ansFn['value']) == str or type(ansFn['value']) == int: + if quesOpt['value'] == ansFn['value'] : + if(len(userRolesArrUnique)) > 0: + for usrRol in userRolesArrUnique : + finalObj = {} + finalObj = creatingObj( + ansFn, + ansFn['externalId'], + ansFn['value'], + instNumber, + quesOpt['label'], + usrRol + ) + if finalObj["completedDate"]: + print(f"at line 774 == {finalObj}") + producer.send( + (config.get("KAFKA", "observation_druid_topic")), + json.dumps(finalObj).encode('utf-8') + ) + producer.flush() + infoLogger.info(f"Data for observationId ({finalObj['observationId']}) and questionId ({finalObj['questionId']}) inserted into sl-observation datasource") + else : + finalObj = {} + finalObj = creatingObj( + ansFn,ansFn['externalId'], + ansFn['value'], + instNumber, + quesOpt['label'], + None + ) + if finalObj["completedDate"]: + print(f"at line 791 == {finalObj}") + producer.send( + (config.get("KAFKA", "observation_druid_topic")), + json.dumps(finalObj).encode('utf-8') + ) + producer.flush() + infoLogger.info(f"Data for observationId ({finalObj['observationId']}) and questionId ({finalObj['questionId']}) inserted into sl-observation datasource") + + elif type(ansFn['value']) == list: + for ansArr in ansFn['value']: + if quesOpt['value'] == ansArr: + if(len(userRolesArrUnique)) > 0: + for usrRol in userRolesArrUnique : + finalObj = {} + finalObj = creatingObj( + ansFn, + ansFn['externalId'], + ansArr, + instNumber, + quesOpt['label'], + usrRol + ) + if finalObj["completedDate"]: + print(f"at line 814 == {finalObj}") + producer.send( + (config.get("KAFKA", "observation_druid_topic")), + json.dumps(finalObj).encode('utf-8') + ) + producer.flush() + infoLogger.info(f"Data for observationId ({finalObj['observationId']}) and questionId ({finalObj['questionId']}) inserted into sl-observation datasource") + else : + finalObj = {} + finalObj = creatingObj( + ansFn, + ansFn['externalId'], + ansArr, + instNumber, + quesOpt['label'], + None + ) + if finalObj["completedDate"]: + print(f"at line 832 == {finalObj}") + producer.send( + (config.get("KAFKA", "observation_druid_topic")), + json.dumps(finalObj).encode('utf-8') + ) + producer.flush() + infoLogger.info(f"Data for observationId ({finalObj['observationId']}) and questionId ({finalObj['questionId']}) inserted into sl-observation datasource") + labelIndex = labelIndex + 1 + except KeyError: + pass + try: + if ( + ans['responseType'] == 'text' or ans['responseType'] == 'radio' or + ans['responseType'] == 'multiselect' or ans['responseType'] == 'slider' or + ans['responseType'] == 'number' or ans['responseType'] == 'date'): + inst_cnt = '' + fetchingQuestiondetails(ans,inst_cnt) + elif ans['responseType'] == 'matrix' and len(ans['value']) > 0: + inst_cnt = 0 + for instances in ans['value']: + inst_cnt = inst_cnt + 1 + if type(instances) == list : + for instance in instances: + fetchingQuestiondetails(instance, inst_cnt) + else : + for instance in instances.values(): + fetchingQuestiondetails(instance, inst_cnt) + except KeyError: + pass + else: + infoLogger.info(f"Observation Submission is not in completed status" ) + else: + infoLogger.info(f"observation_Submission_id {observationSubmissionId} is already exists in the sl-observation datasource.") + infoLogger.info(f"Completed processing kafka event for the Observation Submission Id : {obSub['_id']}. For Observation Question report ") +except Exception as e: + errorLogger.error(e, exc_info=True) + +# Main data extraction function +try: + def main_data_extraction(obSub): + '''Function to process observation submission data before sending it to Kafka topics''' + try: + infoLogger.info(f"Starting to process kafka event for the observation Submission Id : {obSub['_id']}. For Observation Status report") + # Initialize dictionary for storing observation submission data + observationSubQuestionsObj = {} + observation_status = {} + + # Extract various attributes from observation submission object + observationSubQuestionsObj['observationId'] = str(obSub.get('observationId', '')) + observationSubQuestionsObj['observationName'] = str(obSub.get('observationInformation', {}).get('name', '')) + observationSubQuestionsObj['observationSubmissionId'] = obSub.get('_id', '') + try: + observationSubQuestionsObj['createdBy'] = obSub['createdBy'] + except KeyError: + observationSubQuestionsObj['createdBy'] = '' + observationSubQuestionsObj['entity'] = str(obSub['entityId']) + observationSubQuestionsObj['entityExternalId'] = obSub['entityExternalId'] + observationSubQuestionsObj['entityType'] =obSub['entityType'] + observationSubQuestionsObj["solutionId"] = obSub["solutionId"], + observationSubQuestionsObj["solutionExternalId"] = obSub["solutionExternalId"] + try : + if 'solutionInfo' in obSub.keys(): + solutionObj = obSub['solutionInfo'] + observationSubQuestionsObj['solutionName'] = str(solutionObj.get('name','')) + except KeyError: + observationSubQuestionsObj['solutionName'] = '' + try: + if obSub["isRubricDriven"] == True and obSub["criteriaLevelReport"] == True: + observationSubQuestionsObj['solutionType'] = "observation_with_rubric" + elif obSub["isRubricDriven"] == True and obSub["criteriaLevelReport"] == False: + observationSubQuestionsObj['solutionType'] = "observation_with_rubric_no_criteria_level_report" + else: + observationSubQuestionsObj['solutionType'] = "observation_with_out_rubric" + except KeyError: + observationSubQuestionsObj['solutionType'] = "observation_with_out_rubric" + + try: + observationSubQuestionsObj['completedDate'] = obSub['completedDate'] + except KeyError: + observationSubQuestionsObj['completedDate'] = obSub['createdAt'] + # Check if 'isAPrivateProgram' key exists + try: + observationSubQuestionsObj['isAPrivateProgram'] = obSub['isAPrivateProgram'] + except KeyError: + observationSubQuestionsObj['isAPrivateProgram'] = True + # user profile creation + flatten_userprofile = flatten_json(obSub['userProfile']) + new_dict = {} + for key in flatten_userprofile: + string_without_integer = re.sub(r'\d+', '', key) + updated_string = string_without_integer.replace("--", "-") + # Check if the value associated with the key is not None + if flatten_userprofile[key] is not None: + if updated_string in new_dict: + # Perform addition only if both values are not None + if new_dict[updated_string] is not None: + new_dict[updated_string] += "," + str(flatten_userprofile[key]) + else: + new_dict[updated_string] = str(flatten_userprofile[key]) + else: + new_dict[updated_string] = str(flatten_userprofile[key]) + + observationSubQuestionsObj['userProfile'] = str(new_dict) + + # Before attempting to access the list, check if it is non-empty + profile_user_types = obSub.get('userProfile', {}).get('profileUserTypes', []) + if profile_user_types: + # Access the first element of the list if it exists + user_type = profile_user_types[0].get('type', None) + else: + # Handle the case when the list is empty + user_type = None + observationSubQuestionsObj['userType'] = user_type + + observationSubQuestionsObj['solutionExternalId'] = obSub.get('solutionExternalId', '') + observationSubQuestionsObj['solutionId'] = obSub.get('solutionId', '') + + for location in obSub.get('userProfile', {}).get('userLocations', []): + name = location.get('name') + type_ = location.get('type') + if name and type_: + observationSubQuestionsObj[type_] = name + + + orgArr = orgName(obSub.get('userProfile', {}).get('organisations',None)) + if orgArr: + # observationSubQuestionsObj['schoolId'] = orgArr[0].get("organisation_id") + observationSubQuestionsObj['organisationName'] = orgArr[0].get("organisation_name") + else: + # observationSubQuestionsObj['schoolId'] = None + observationSubQuestionsObj['organisationName'] = None + + # Insert data to sl-observation-meta druid datasource if status is anything + _id = observationSubQuestionsObj.get('observationSubmissionId', None) + try: + if _id: + if check_observation_submission_id_existance(_id,"observationSubmissionId","sl-observation-meta"): + infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-observation-meta datasource") + # Upload observation submission data to Druid topic + print(f"at line 971 == {observationSubQuestionsObj}") + producer.send((config.get("KAFKA", "observation_meta_druid_topic")), json.dumps(observationSubQuestionsObj).encode('utf-8')) + producer.flush() + infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-observation-meta datasource.") + else: + infoLogger.info(f"Data with submission_id {_id} is already exists in the sl-observation-meta datasource.") + except Exception as e : + # Log any errors that occur during data ingestion + errorLogger.error("====== An error was found during data ingestion in the sl-observation-meta datasource ======") + errorLogger.error(e,exc_info=True) + + + # Insert data to sl-observation-status-started druid datasource if status is started + if obSub['status'] == 'started': + observation_status['observationSubmissionId'] = obSub['_id'] + try: + observation_status['startedAt'] = obSub['completedDate'] + except KeyError: + observation_status['startedAt'] = '' + _id = observation_status.get('observationSubmissionId', None) + try : + if _id: + if check_observation_submission_id_existance(_id,"observationSubmissionId","sl-observation-status-started"): + infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-observation-status-started datasource") + # Upload observation status data to Druid topic + print(f"at line 996 == {observation_status}") + producer.send((config.get("KAFKA", "observation_started_druid_topic")), json.dumps(observation_status).encode('utf-8')) + producer.flush() + infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-observation-status-started datasource.") + else: + infoLogger.info(f"Data with submission_id {_id} is already exists in the sl-observation-status-started datasource.") + except Exception as e : + # Log any errors that occur during data ingestion + errorLogger.error("====== An error was found during data ingestion in the sl-observation-status-started datasource ======") + errorLogger.error(e,exc_info=True) + + + # Insert data to sl-observation-status-started druid datasource if status is inprogress + elif obSub['status'] == 'inprogress': + observation_status['observationSubmissionId'] = obSub['_id'] + observation_status['inprogressAt'] = obSub['completedDate'] + _id = observation_status.get('observationSubmissionId', None) + try : + if _id: + if check_observation_submission_id_existance(_id,"observationSubmissionId","sl-observation-status-inprogress"): + infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-observation-status-inprogress datasource") + # Upload observation status data to Druid topic + print(f"at line 996 == {observation_status}") + producer.send((config.get("KAFKA", "observation_inprogress_druid_topic")), json.dumps(observation_status).encode('utf-8')) + producer.flush() + infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-observation-status-inprogress datasource.") + else: + infoLogger.info(f"Data with submission_id {_id} is already exists in the sl-observation-status-inprogress datasource.") + except Exception as e : + # Log any errors that occur during data ingestion + errorLogger.error("====== An error was found during data ingestion in the sl-observation-status-inprogress datasource ======") + errorLogger.error(e,exc_info=True) + + + elif obSub['status'] == 'completed': + observation_status['observationSubmissionId'] = obSub['_id'] + observation_status['completedAt'] = obSub['completedDate'] + _id = observation_status.get('observationSubmissionId', None) + try : + if _id: + if check_observation_submission_id_existance(_id,"observationSubmissionId","sl-observation-status-completed"): + infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-observation-status-completed datasource") + # Upload observation status data to Druid topic + print(f"at line 996 == {observation_status}") + producer.send((config.get("KAFKA", "observation_completed_druid_topic")), json.dumps(observation_status).encode('utf-8')) + producer.flush() + infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-observation-status-completed datasource") + else: + infoLogger.info(f"Data with submission_id {_id} is already exists in the sl-observation-status-completed datasource") + except Exception as e : + # Log any errors that occur during data ingestion + errorLogger.error("====== An error was found during data ingestion in the sl-observation-status-inprogress datasource ======") + errorLogger.error(e,exc_info=True) + + infoLogger.info(f"Completed processing kafka event for the observation Submission Id : {obSub['_id']}. For observation Status report") + except Exception as e: + # Log any errors that occur during data extraction + errorLogger.error(e, exc_info=True) +except Exception as e: + # Log any errors that occur during data extraction + errorLogger.error(e, exc_info=True) + + +try: + @app.agent(rawTopicName) + async def surveyFaust(consumer): + '''Faust agent to consume messages from Kafka and process them''' + async for msg in consumer: + try: + msg_val = msg.decode('utf-8') + msg_data = json.loads(msg_val) + infoLogger.info("========== START OF OBSERVATION SUBMISSION EVENT PROCESSING ==========") + obj_creation(msg_data) + main_data_extraction(msg_data) + infoLogger.info("********** END OF OBSERVATION SUBMISSION EVENT PROCESSING **********") + except KeyError as ke: + # Log KeyError + errorLogger.error(f"KeyError occurred: {ke}") +except Exception as e: + # Log any other exceptions + errorLogger.error(f"Error in observationFaust function: {e}") + +if __name__ == '__main__': + app.main() + diff --git a/run.sh b/run.sh old mode 100644 new mode 100755 diff --git a/start-services.sh b/start-services.sh new file mode 100755 index 0000000..8b7603f --- /dev/null +++ b/start-services.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +#start survey_realtime_streaming +/opt/sparkjobs/faust_as_service/faust.sh survey/survey_realtime_streaming survey/ & + +#start observation_realtime_streaming +/opt/sparkjobs/faust_as_service/faust.sh observations/observation_realtime_streaming observations/ & + +#start observations +#/opt/sparkjobs/faust_as_service/faust.sh observations/py_observation_streaming observations/ & + +#start observation_evidence +#/opt/sparkjobs/faust_as_service/faust.sh observations/py_observation_evidence_streaming observations/ & + +#start survey +#/opt/sparkjobs/faust_as_service/faust.sh survey/py_survey_streaming survey/ & + +#start survey_evidence +#/opt/sparkjobs/faust_as_service/faust.sh survey/py_survey_evidence_streaming survey/ & + +wait -n + +exit $? diff --git a/survey/survey_realtime_streaming.py b/survey/survey_realtime_streaming.py new file mode 100755 index 0000000..c408633 --- /dev/null +++ b/survey/survey_realtime_streaming.py @@ -0,0 +1,776 @@ +# ----------------------------------------------------------------- +# Name : survey_realtime_streaming.py +# Author : Prashanth, Vivek +# Description : Program to read data from one kafka topic and +# produce it to another kafka topic +# ----------------------------------------------------------------- + +# Import necessary libraries +import sys, os, json, re +import datetime +from datetime import date +import kafka +import faust +import logging +import time, re +import requests +from kafka import KafkaConsumer, KafkaProducer +from kafka.admin import KafkaAdminClient, NewTopic +from configparser import ConfigParser,ExtendedInterpolation +from logging.handlers import TimedRotatingFileHandler, RotatingFileHandler +from pydruid.client import * +from pydruid.db import connect +from pydruid.query import QueryBuilder +from pydruid.utils.aggregators import * +from pydruid.utils.filters import Dimension +from urllib.parse import urlparse + +config_path = os.path.split(os.path.dirname(os.path.abspath(__file__))) +config = ConfigParser(interpolation=ExtendedInterpolation()) +config.read(config_path[0] + "/config.ini") + + +# date formating +current_date = datetime.date.today() +formatted_current_date = current_date.strftime("%d-%B-%Y") +number_of_days_logs_kept = current_date - datetime.timedelta(days=7) +number_of_days_logs_kept = number_of_days_logs_kept.strftime("%d-%B-%Y") + +# file path for log +file_path_for_output_and_debug_log = config.get('LOGS', 'survey_streaming_success_error') +file_name_for_output_log = f"{file_path_for_output_and_debug_log}{formatted_current_date}-output.log" +file_name_for_debug_log = f"{file_path_for_output_and_debug_log}{formatted_current_date}-debug.log" + +# Remove old log entries +files_with_date_pattern = [file +for file in os.listdir(file_path_for_output_and_debug_log) +if re.match(r"\d{2}-\w+-\d{4}-*", +file)] + +for file_name in files_with_date_pattern: + file_path = os.path.join(file_path_for_output_and_debug_log, file_name) + if os.path.isfile(file_path): + file_date = file_name.split('.')[0] + date = file_date.split('-')[0] + '-' + file_date.split('-')[1] + '-' + file_date.split('-')[2] + if date < number_of_days_logs_kept: + os.remove(file_path) + +# Add loggers +formatter = logging.Formatter('%(asctime)s - %(levelname)s') + +# handler for output and debug Log +output_logHandler = RotatingFileHandler(f"{file_name_for_output_log}") +output_logHandler.setFormatter(formatter) + +debug_logHandler = RotatingFileHandler(f"{file_name_for_debug_log}") +debug_logHandler.setFormatter(formatter) + +# Add the successLoger +successLogger = logging.getLogger('success log') +successLogger.setLevel(logging.DEBUG) +successBackuphandler = TimedRotatingFileHandler(f"{file_name_for_output_log}", when="w0",backupCount=1) +successLogger.addHandler(output_logHandler) +successLogger.addHandler(successBackuphandler) + +# Add the Errorloger +errorLogger = logging.getLogger('error log') +errorLogger.setLevel(logging.ERROR) +errorBackuphandler = TimedRotatingFileHandler(f"{file_name_for_output_log}",when="w0",backupCount=1) +errorLogger.addHandler(output_logHandler) +errorLogger.addHandler(errorBackuphandler) + +# Add the Infologer +infoLogger = logging.getLogger('info log') +infoLogger.setLevel(logging.INFO) +debug_logBackuphandler = TimedRotatingFileHandler(f"{file_name_for_debug_log}",when="w0",backupCount=1) +infoLogger.addHandler(debug_logHandler) +infoLogger.addHandler(debug_logBackuphandler) + + +# Initialize Kafka producer and Faust app +try: + kafka_url = (config.get("KAFKA", "url")) + app = faust.App( + 'ml_survey_faust', + broker='kafka://'+kafka_url, + value_serializer='raw', + web_port=7003, + broker_max_poll_records=500 + ) + rawTopicName = app.topic(config.get("KAFKA", "survey_raw_topic")) + producer = KafkaProducer(bootstrap_servers=[config.get("KAFKA", "url")]) + +except Exception as e: + errorLogger.error(e, exc_info=True) + +# Function to extract user data +def userDataCollector(val): + '''Finds the Profile type, locations and framework(board) of an user''' + if val is not None: + dataobj = {} + # Get user Sub type + if val["userRoleInformation"]: + try: + dataobj["user_subtype"] = val["userRoleInformation"]["role"] + except KeyError: + pass + # Get user type + if val["userProfile"]["profileUserTypes"]: + try: + temp_userType = set([types["type"] for types in val["userProfile"]["profileUserTypes"]]) + dataobj["user_type"] = ", ".join(temp_userType) + except KeyError: + pass + # Get locations + if val["userProfile"]["userLocations"]: + for loc in val["userProfile"]["userLocations"]: + dataobj[f'{loc["type"]}_code'] = loc["code"] + dataobj[f'{loc["type"]}_name'] = loc["name"] + dataobj[f'{loc["type"]}_externalId'] = loc["id"] + # Get board + if "framework" in val["userProfile"] and val["userProfile"]["framework"]: + if "board" in val["userProfile"]["framework"] and len(val["userProfile"]["framework"]["board"]) > 0: + boardName = ",".join(val["userProfile"]["framework"]["board"]) + dataobj["board_name"] = boardName + return dataobj + +# Function to create organization data +def orgCreator(val): + '''Finds the data for organisation''' + orgarr = [] + if val is not None: + for org in val: + orgObj = {} + if org["isSchool"] == False: + orgObj['organisationId'] = org['organisationId'] + orgObj['organisationName'] = org["orgName"] + orgarr.append(orgObj) + return orgarr + +# # Define function to check if survey submission Id exists in Druid +def check_survey_submission_id_existance(key,column_name,table_name): + try: + # Establish connection to Druid + url = config.get("DRUID","sql_url") + url = str(url) + parsed_url = urlparse(url) + + host = parsed_url.hostname + port = int(parsed_url.port) + path = parsed_url.path + scheme = parsed_url.scheme + + conn = connect(host=host, port=port, path=path, scheme=scheme) + cur = conn.cursor() + response = check_datasource_existence(table_name) + if response == True: + # Query to check existence of survey submission Id in Druid table + query = f"SELECT COUNT(*) FROM \"{table_name}\" WHERE \"{column_name}\" = '{key}'" + cur.execute(query) + result = cur.fetchone() + count = result[0] + infoLogger.info(f"Found {count} entires in {table_name}") + if count == 0: + return True + else: + return False + else: + # Since the table doesn't exist, return True to allow data insertion initially + return True + except Exception as e: + # Log any errors that occur during Druid query execution + errorLogger.error(f"Error checking survey_submission_id existence in Druid: {e}") + +def check_datasource_existence(datasource_name): + host = config.get('DRUID', 'datasource_url') + try : + response = requests.get(host) + if response.status_code == 200: + datasources = response.json() + if datasource_name in datasources : + return True + else : + return False + except requests.RequestException as e: + errorLogger.error(f"Error fetching datasources: {e}") +def flatten_json(y): + out = {} + + def flatten(x, name=''): + # If the Nested key-value pair is of dict type + if isinstance(x, dict): + for a in x: + flatten(x[a], name + a + '-') + + # If the Nested key-value pair is of list type + elif isinstance(x, list): + if not x: # Check if the list is empty + out[name[:-1]] = "null" + else: + for i, a in enumerate(x): + flatten(a, name + str(i) + '-') + + # If the Nested key-value pair is of other types + else: + # Replace None, empty string, or empty list with "null" + if x is None or x == '' or x == []: + out[name[:-1]] = "null" + else: + out[name[:-1]] = x + + flatten(y) + return out + +# Worker class to send data to Kafka +class FinalWorker: + '''Class that takes necessary inputs and sends the correct object into Kafka''' + def __init__(self, answer, quesexternalId, ans_val, instNumber, responseLabel, orgarr, createObj): + self.answer = answer + self.quesexternalId = quesexternalId + self.ans_val = ans_val + self.instNum = instNumber + self.responseLabel = responseLabel + self.orgArr = orgarr + self.creatingObj = createObj + + def run(self): + if len(self.orgArr) >0: + for org in range(len(self.orgArr)): + finalObj = {} + finalObj = self.creatingObj(self.answer,self.quesexternalId,self.ans_val,self.instNum,self.responseLabel) + finalObj.update(self.orgArr[org]) + survey_id = finalObj["surveyId"] + question_id = finalObj["questionId"] + producer.send((config.get("KAFKA", "survey_druid_topic")), json.dumps(finalObj).encode('utf-8')) + producer.flush() + infoLogger.info(f"Data for surveyId ({survey_id}) and questionId ({question_id}) inserted into sl-survey datasource") + else: + finalObj = {} + finalObj = self.creatingObj(self.answer,self.quesexternalId,self.ans_val,self.instNum,self.responseLabel) + survey_id = finalObj["surveyId"] + question_id = finalObj["questionId"] + producer.send((config.get("KAFKA", "survey_druid_topic")), json.dumps(finalObj).encode('utf-8')) + producer.flush() + infoLogger.info(f"Data for surveyId ({survey_id}) and questionId ({question_id}) inserted into sl-survey datasource") + +try: + def obj_creation(obSub): + '''Function to process survey submission data before sending it to Kafka''' + try: + # Debug log for survey submission ID + infoLogger.info(f"Started to process kafka event for the Survey Submission Id : {obSub['_id']}. For Survey Question report") + surveySubmissionId = str(obSub['_id']) + if check_survey_submission_id_existance(surveySubmissionId,"surveySubmissionId","sl-survey"): + infoLogger.info(f"No data duplection for the Submission ID : {surveySubmissionId} in sl-survey ") + if obSub['status'] == 'completed': + if 'isAPrivateProgram' in obSub : + surveySubQuestionsArr = [] + completedDate = str(obSub['completedDate']) + createdAt = str(obSub['createdAt']) + updatedAt = str(obSub['updatedAt']) + evidencesArr = [v for v in obSub['evidences'].values()] + evidence_sub_count = 0 + rootOrgId = None + + # Extract root organization ID from user profile if available + try: + if obSub["userProfile"]: + if "rootOrgId" in obSub["userProfile"] and obSub["userProfile"]["rootOrgId"]: + rootOrgId = obSub["userProfile"]["rootOrgId"] + except KeyError: + pass + if 'answers' in obSub.keys() : + answersArr = [v for v in obSub['answers'].values()] + for ans in answersArr: + try: + if len(ans['fileName']): + evidence_sub_count = evidence_sub_count + len(ans['fileName']) + except KeyError: + pass + for ans in answersArr: + def sequenceNumber(externalId,answer): + if 'solutions' in obSub.keys(): + solutionsArr = [v for v in obSub['solutions'].values()] + for solu in solutionsArr: + section = [k for k in solu['sections'].keys()] + # parsing through questionSequencebyecm to get the sequence number + try: + for num in range( + len(solu['questionSequenceByEcm'][answer['evidenceMethod']][section[0]]) + ): + if solu['questionSequenceByEcm'][answer['evidenceMethod']][section[0]][num] == externalId: + return num + 1 + except KeyError: + pass + + # Function to create object for each answer + def creatingObj(answer,quesexternalId,ans_val,instNumber,responseLabel): + surveySubQuestionsObj = {} + + # Extracting various attributes from submission object + # try: + # surveySubQuestionsObj['appName'] = obSub["appInformation"]["appName"].lower() + # except KeyError : + # surveySubQuestionsObj['appName'] = config.get("ML_APP_NAME", "survey_app") + + surveySubQuestionsObj['surveySubmissionId'] = str(obSub['_id']) + surveySubQuestionsObj['createdBy'] = obSub['createdBy'] + + # Check if 'isAPrivateProgram' key exists + try: + surveySubQuestionsObj['isAPrivateProgram'] = obSub['isAPrivateProgram'] + except KeyError: + surveySubQuestionsObj['isAPrivateProgram'] = True + + # Extract program related information + # try: + # surveySubQuestionsObj['programExternalId'] = obSub['programExternalId'] + # except KeyError : + # surveySubQuestionsObj['programExternalId'] = None + # try: + # surveySubQuestionsObj['programId'] = str(obSub['programId']) + # except KeyError : + # surveySubQuestionsObj['programId'] = None + # try: + # if 'programInfo' in obSub: + # surveySubQuestionsObj['programName'] = obSub['programInfo']['name'] + # else: + # surveySubQuestionsObj['programName'] = '' + # except KeyError: + # surveySubQuestionsObj['programName'] = '' + + # Extract solution related information + surveySubQuestionsObj['solutionExternalId'] = obSub['solutionExternalId'] + surveySubQuestionsObj['surveyId'] = str(obSub['surveyId']) + surveySubQuestionsObj['solutionId'] = str(obSub["solutionId"]) + try: + if 'solutionInfo' in obSub: + surveySubQuestionsObj['solutionName'] = obSub['solutionInfo']['name'] + else: + surveySubQuestionsObj['solutionName'] = '' + except KeyError: + surveySubQuestionsObj['solutionName'] = '' + + # Extract section information + # try: + # section = [k for k in obSub['solutionInfo']['sections'].keys()] + # surveySubQuestionsObj['section'] = section[0] + # except KeyError: + # surveySubQuestionsObj['section'] = '' + + # Get sequence number for the question + # surveySubQuestionsObj['questionSequenceByEcm'] = sequenceNumber(quesexternalId, answer) + + # Extract scoring related information + # try: + # if obSub['solutionInformation']['scoringSystem'] == 'pointsBasedScoring': + # # try: + # # surveySubQuestionsObj['totalScore'] = obSub['pointsBasedMaxScore'] + # # except KeyError : + # # surveySubQuestionsObj['totalScore'] = '' + # try: + # surveySubQuestionsObj['scoreAchieved'] = obSub['pointsBasedScoreAchieved'] + # except KeyError : + # surveySubQuestionsObj['scoreAchieved'] = '' + # try: + # surveySubQuestionsObj['totalpercentage'] = obSub['pointsBasedPercentageScore'] + # except KeyError : + # surveySubQuestionsObj['totalpercentage'] = '' + # try: + # surveySubQuestionsObj['maxScore'] = answer['maxScore'] + # except KeyError : + # surveySubQuestionsObj['maxScore'] = '' + # try: + # surveySubQuestionsObj['minScore'] = answer['scoreAchieved'] + # except KeyError : + # surveySubQuestionsObj['minScore'] = '' + # try: + # surveySubQuestionsObj['percentageScore'] = answer['percentageScore'] + # except KeyError : + # surveySubQuestionsObj['percentageScore'] = '' + # try: + # surveySubQuestionsObj['pointsBasedScoreInParent'] = answer['pointsBasedScoreInParent'] + # except KeyError : + # surveySubQuestionsObj['pointsBasedScoreInParent'] = '' + # except KeyError: + # surveySubQuestionsObj['totalScore'] = '' + # surveySubQuestionsObj['scoreAchieved'] = '' + # surveySubQuestionsObj['totalpercentage'] = '' + # surveySubQuestionsObj['maxScore'] = '' + # surveySubQuestionsObj['minScore'] = '' + # surveySubQuestionsObj['percentageScore'] = '' + # surveySubQuestionsObj['pointsBasedScoreInParent'] = '' + + # Extract survey name + if 'surveyInformation' in obSub : + if 'name' in obSub['surveyInformation']: + surveySubQuestionsObj['surveyName'] = obSub['surveyInformation']['name'] + else: + surveySubQuestionsObj['surveyName'] = '' + + # Extract question related information + surveySubQuestionsObj['questionId'] = str(answer['qid']) + surveySubQuestionsObj['questionAnswer'] = ans_val + surveySubQuestionsObj['questionResponseType'] = answer['responseType'] + + # Extract response label for number response type + if answer['responseType'] == 'number': + if responseLabel: + surveySubQuestionsObj['questionResponseLabelNumber'] = responseLabel + else: + surveySubQuestionsObj['questionResponseLabelNumber'] = 0 + else: + surveySubQuestionsObj['questionResponseLabelNumber'] = 0 + + # Extract response label for other response types + try: + if responseLabel: + if answer['responseType'] == 'text': + surveySubQuestionsObj['questionResponseLabel'] = "'"+ re.sub("\n|\"","",responseLabel) +"'" + else: + surveySubQuestionsObj['questionResponseLabel'] = responseLabel + else: + surveySubQuestionsObj['questionResponseLabel'] = '' + except KeyError : + surveySubQuestionsObj['questionResponseLabel'] = '' + + # Extract question details + surveySubQuestionsObj['questionExternalId'] = quesexternalId + surveySubQuestionsObj['questionName'] = answer['question'][0] + surveySubQuestionsObj['questionECM'] = answer['evidenceMethod'] + surveySubQuestionsObj['criteriaId'] = str(answer['criteriaId']) + + # Extract criteria details + try: + if 'criteria' in obSub.keys(): + for criteria in obSub['criteria']: + surveySubQuestionsObj['criteriaExternalId'] = criteria['externalId'] + surveySubQuestionsObj['criteriaName'] = criteria['name'] + else: + surveySubQuestionsObj['criteriaExternalId'] = '' + surveySubQuestionsObj['criteriaName'] = '' + + except KeyError: + surveySubQuestionsObj['criteriaExternalId'] = '' + surveySubQuestionsObj['criteriaName'] = '' + + # Extract completion dates + surveySubQuestionsObj['completedDate'] = completedDate + surveySubQuestionsObj['createdAt'] = createdAt + surveySubQuestionsObj['updatedAt'] = updatedAt + + # Extract remarks and evidence details + if answer['remarks'] : + surveySubQuestionsObj['remarks'] = "'"+ re.sub("\n|\"","",answer['remarks']) +"'" + else : + surveySubQuestionsObj['remarks'] = None + if len(answer['fileName']): + multipleFiles = None + fileCnt = 1 + for filedetail in answer['fileName']: + if fileCnt == 1: + multipleFiles = filedetail['sourcePath'] + fileCnt = fileCnt + 1 + else: + multipleFiles = multipleFiles + ' , ' + filedetail['sourcePath'] + surveySubQuestionsObj['evidences'] = multipleFiles + surveySubQuestionsObj['evidenceCount'] = len(answer['fileName']) + else: + surveySubQuestionsObj['evidences'] = '' + surveySubQuestionsObj['evidenceCount'] = 0 + surveySubQuestionsObj['totalEvidences'] = evidence_sub_count + + # Extract parent question details for matrix response type + # if ans['responseType']=='matrix': + # surveySubQuestionsObj['instanceParentQuestion'] = ans['question'][0] + # surveySubQuestionsObj['instanceParentId'] = ans['qid'] + # surveySubQuestionsObj['instanceParentResponsetype'] =ans['responseType'] + # surveySubQuestionsObj['instanceParentCriteriaId'] =ans['criteriaId'] + # surveySubQuestionsObj['instanceParentCriteriaExternalId'] = ans['criteriaId'] + # surveySubQuestionsObj['instanceParentCriteriaName'] = None + # surveySubQuestionsObj['instanceId'] = instNumber + # surveySubQuestionsObj['instanceParentExternalId'] = quesexternalId + # surveySubQuestionsObj['instanceParentEcmSequence']= sequenceNumber( + # surveySubQuestionsObj['instanceParentExternalId'], answer + # ) + # else: + # surveySubQuestionsObj['instanceParentQuestion'] = '' + # surveySubQuestionsObj['instanceParentId'] = '' + # surveySubQuestionsObj['instanceParentResponsetype'] ='' + # surveySubQuestionsObj['instanceId'] = instNumber + # surveySubQuestionsObj['instanceParentExternalId'] = '' + # surveySubQuestionsObj['instanceParentEcmSequence'] = '' + + # Extract channel and parent channel + # surveySubQuestionsObj['channel'] = rootOrgId + # surveySubQuestionsObj['parent_channel'] = "SHIKSHALOKAM" + # user profile creation + flatten_userprofile = flatten_json(obSub['userProfile']) + new_dict = {} + for key in flatten_userprofile: + string_without_integer = re.sub(r'\d+', '', key) + updated_string = string_without_integer.replace("--", "-") + # Check if the value associated with the key is not None + if flatten_userprofile[key] is not None: + if updated_string in new_dict: + # Perform addition only if both values are not None + if new_dict[updated_string] is not None: + new_dict[updated_string] += "," + str(flatten_userprofile[key]) + else: + new_dict[updated_string] = str(flatten_userprofile[key]) + else: + new_dict[updated_string] = str(flatten_userprofile[key]) + + surveySubQuestionsObj['userProfile'] = str(new_dict) + # Update object with additional user data + # Commented the bellow line as we don't need userRoleInso in KB + # surveySubQuestionsObj.update(userDataCollector(obSub)) + return surveySubQuestionsObj + + # Function to fetch question details + def fetchingQuestiondetails(ansFn,instNumber): + try: + # if (len(ansFn['options']) == 0) or (('options' in ansFn.keys()) == False): + if (len(ansFn['options']) == 0) or (('options' not in ansFn.keys())): + try: + orgArr = orgCreator(obSub["userProfile"]["organisations"]) + final_worker = FinalWorker(ansFn,ansFn['externalId'], ansFn['value'], instNumber, ansFn['value'], orgArr, creatingObj) + final_worker.run() + except KeyError : + pass + else: + labelIndex = 0 + for quesOpt in ansFn['options']: + try: + if type(ansFn['value']) == str or type(ansFn['value']) == int: + if quesOpt['value'] == ansFn['value'] : + orgArr = orgCreator(obSub["userProfile"]["organisations"]) + final_worker = FinalWorker(ansFn,ansFn['externalId'], ansFn['value'], instNumber, quesOpt['label'], orgArr, creatingObj) + final_worker.run() + elif type(ansFn['value']) == list: + for ansArr in ansFn['value']: + if quesOpt['value'] == ansArr: + orgArr = orgCreator(obSub["userProfile"]["organisations"]) + final_worker = FinalWorker(ansFn,ansFn['externalId'], ansArr, instNumber, quesOpt['label'], orgArr, creatingObj) + final_worker.run() + except KeyError: + pass + except KeyError: + pass + + # Check response type and call function to fetch question details + if ( + ans['responseType'] == 'text' or ans['responseType'] == 'radio' or + ans['responseType'] == 'multiselect' or ans['responseType'] == 'slider' or + ans['responseType'] == 'number' or ans['responseType'] == 'date' + ): + inst_cnt = '' + fetchingQuestiondetails(ans, inst_cnt) + elif ans['responseType'] == 'matrix' and len(ans['value']) > 0: + inst_cnt =0 + for instances in ans['value']: + inst_cnt = inst_cnt + 1 + for instance in instances.values(): + fetchingQuestiondetails(instance,inst_cnt) + else: + infoLogger.info(f"Survey Submission is not in completed status" ) + else: + infoLogger.info(f"survey_Submission_id {surveySubmissionId} is already exists in the sl-survey datasource.") + + infoLogger.info(f"Completed processing kafka event for the Survey Submission Id : {obSub['_id']}. For Survey Question report ") + + except Exception as e: + # Log any errors that occur during processing + errorLogger.error(e, exc_info=True) +except Exception as e: + # Log any errors that occur during processing + errorLogger.error(e, exc_info=True) + +# Main data extraction function +try: + def main_data_extraction(obSub): + '''Function to process survey submission data before sending it to Kafka topics''' + try: + infoLogger.info(f"Starting to process kafka event for the Survey Submission Id : {obSub['_id']}. For Survey Status report") + # Initialize dictionary for storing survey submission data + surveySubQuestionsObj = {} + survey_status = {} + + # Extract various attributes from survey submission object + surveySubQuestionsObj['surveyId'] = str(obSub.get('surveyId', '')) + surveySubQuestionsObj['surveyName'] = str(obSub.get('surveyInformation', {}).get('name', '')) + surveySubQuestionsObj['surveySubmissionId'] = obSub.get('_id', '') + try: + if 'solutionInfo' in obSub: + surveySubQuestionsObj['solutionName'] = obSub['solutionInfo']['name'] + else: + surveySubQuestionsObj['solutionName'] = '' + except KeyError: + surveySubQuestionsObj['solutionName'] = '' + + surveySubQuestionsObj['createdBy'] = obSub['createdBy'] + surveySubQuestionsObj['completedDate'] = obSub['completedDate'] + # Check if 'isAPrivateProgram' key exists + try: + surveySubQuestionsObj['isAPrivateProgram'] = obSub['isAPrivateProgram'] + except KeyError: + surveySubQuestionsObj['isAPrivateProgram'] = True + # user profile creation + flatten_userprofile = flatten_json(obSub['userProfile']) + new_dict = {} + for key in flatten_userprofile: + string_without_integer = re.sub(r'\d+', '', key) + updated_string = string_without_integer.replace("--", "-") + # Check if the value associated with the key is not None + if flatten_userprofile[key] is not None: + if updated_string in new_dict: + # Perform addition only if both values are not None + if new_dict[updated_string] is not None: + new_dict[updated_string] += "," + str(flatten_userprofile[key]) + else: + new_dict[updated_string] = str(flatten_userprofile[key]) + else: + new_dict[updated_string] = str(flatten_userprofile[key]) + + surveySubQuestionsObj['userProfile'] = str(new_dict) + + # Before attempting to access the list, check if it is non-empty + # profile_user_types = obSub.get('userProfile', {}).get('profileUserTypes', []) + # if profile_user_types: + # # Access the first element of the list if it exists + # user_type = profile_user_types[0].get('type', None) + # else: + # # Handle the case when the list is empty + # user_type = None + # surveySubQuestionsObj['user_type'] = user_type + + surveySubQuestionsObj['solutionExternalId'] = obSub.get('solutionExternalId', '') + surveySubQuestionsObj['solutionId'] = obSub.get('solutionId', '') + + # for location in obSub.get('userProfile', {}).get('userLocations', []): + # name = location.get('name') + # type_ = location.get('type') + # if name and type_: + # surveySubQuestionsObj[type_] = name + + # surveySubQuestionsObj['board_name'] = obSub.get('userProfile', {}).get('framework', {}).get('board', [''])[0] + + orgArr = orgCreator(obSub.get('userProfile', {}).get('organisations',None)) + if orgArr: + # surveySubQuestionsObj['schoolId'] = orgArr[0].get("organisation_id") + surveySubQuestionsObj['organisationName'] = orgArr[0].get("organisationName") + else: + # surveySubQuestionsObj['schoolId'] = None + surveySubQuestionsObj['organisationName'] = None + + # Insert data to sl-survey-meta druid datasource if status is anything + _id = surveySubQuestionsObj.get('surveySubmissionId', None) + try: + if _id: + if check_survey_submission_id_existance(_id,"surveySubmissionId","sl-survey-meta"): + infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-survey-meta datasource") + # Upload survey submission data to Druid topic + producer.send((config.get("KAFKA", "survey_meta_druid_topic")), json.dumps(surveySubQuestionsObj).encode('utf-8')) + producer.flush() + infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-survey-meta datasource.") + else: + infoLogger.info(f"Data with submission_id {_id} is already exists in the sl-survey-meta datasource.") + except Exception as e : + # Log any errors that occur during data ingestion + errorLogger.error("====== An error was found during data ingestion in the sl-survey-meta datasource ======") + errorLogger.error(e,exc_info=True) + + + # Insert data to sl-survey-status-started druid datasource if status is started + if obSub['status'] == 'started': + survey_status['surveySubmissionId'] = obSub['_id'] + survey_status['startedAt'] = obSub['completedDate'] + _id = survey_status.get('surveySubmissionId', None) + try : + if _id: + if check_survey_submission_id_existance(_id,"surveySubmissionId","sl-survey-status-started"): + infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-survey-status-started datasource") + # Upload survey status data to Druid topic + producer.send((config.get("KAFKA", "survey_started_druid_topic")), json.dumps(survey_status).encode('utf-8')) + producer.flush() + infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-survey-status-started datasource.") + else: + infoLogger.info(f"Data with submission_id {_id} is already exists in the sl-survey-status-started datasource.") + except Exception as e : + # Log any errors that occur during data ingestion + errorLogger.error("====== An error was found during data ingestion in the sl-survey-status-started datasource ======") + errorLogger.error(e,exc_info=True) + + + # Insert data to sl-survey-status-started druid datasource if status is inprogress + elif obSub['status'] == 'inprogress': + survey_status['surveySubmissionId'] = obSub['_id'] + survey_status['inprogressAt'] = obSub['completedDate'] + _id = survey_status.get('surveySubmissionId', None) + try : + if _id: + if check_survey_submission_id_existance(_id,"surveySubmissionId","sl-survey-status-inprogress"): + infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-survey-status-inprogress datasource") + # Upload survey status data to Druid topic + producer.send((config.get("KAFKA", "survey_inprogress_druid_topic")), json.dumps(survey_status).encode('utf-8')) + producer.flush() + infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-survey-status-inprogress datasource.") + else: + infoLogger.info(f"Data with submission_id {_id} is already exists in the sl-survey-status-inprogress datasource.") + except Exception as e : + # Log any errors that occur during data ingestion + errorLogger.error("====== An error was found during data ingestion in the sl-survey-status-inprogress datasource ======") + errorLogger.error(e,exc_info=True) + + + elif obSub['status'] == 'completed': + survey_status['surveySubmissionId'] = obSub['_id'] + survey_status['completedAt'] = obSub['completedDate'] + _id = survey_status.get('surveySubmissionId', None) + try : + if _id: + if check_survey_submission_id_existance(_id,"surveySubmissionId","sl-survey-status-completed"): + infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-survey-status-completed datasource") + # Upload survey status data to Druid topic + producer.send((config.get("KAFKA", "survey_completed_druid_topic")), json.dumps(survey_status).encode('utf-8')) + producer.flush() + infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-survey-status-completed datasource") + else: + infoLogger.info(f"Data with submission_id {_id} is already exists in the sl-survey-status-completed datasource") + except Exception as e : + # Log any errors that occur during data ingestion + errorLogger.error("====== An error was found during data ingestion in the sl-survey-status-inprogress datasource ======") + errorLogger.error(e,exc_info=True) + + infoLogger.info(f"Completed processing kafka event for the Survey Submission Id : {obSub['_id']}. For Survey Status report") + except Exception as e: + # Log any errors that occur during data extraction + errorLogger.error(e, exc_info=True) +except Exception as e: + # Log any errors that occur during data extraction + errorLogger.error(e, exc_info=True) + + +try: + @app.agent(rawTopicName) + async def surveyFaust(consumer): + '''Faust agent to consume messages from Kafka and process them''' + async for msg in consumer: + try: + msg_val = msg.decode('utf-8') + msg_data = json.loads(msg_val) + + infoLogger.info("========== START OF SURVEY SUBMISSION EVENT PROCESSING ==========") + obj_creation(msg_data) + main_data_extraction(msg_data) + infoLogger.info("********** END OF SURVEY SUBMISSION EVENT PROCESSING **********") + except KeyError as ke: + # Log KeyError + errorLogger.error(f"KeyError occurred: {ke}") +except Exception as e: + # Log any other exceptions + errorLogger.error(f"Error in surveyFaust function: {e}") + + +if __name__ == '__main__': + app.main() diff --git a/urgent_data_metrics/constants.py b/urgent_data_metrics/constants.py new file mode 100644 index 0000000..d4c5d0b --- /dev/null +++ b/urgent_data_metrics/constants.py @@ -0,0 +1 @@ +location_search = "api/data/v1/location/search" \ No newline at end of file diff --git a/urgent_data_metrics/imp_project_metrics.py b/urgent_data_metrics/imp_project_metrics.py index fdd6c7b..2fe6f85 100644 --- a/urgent_data_metrics/imp_project_metrics.py +++ b/urgent_data_metrics/imp_project_metrics.py @@ -6,7 +6,7 @@ # entity information # ----------------------------------------------------------------- -import json, sys, re, time +import json, sys, re, time , constants from configparser import ConfigParser,ExtendedInterpolation from pymongo import MongoClient from bson.objectid import ObjectId @@ -25,16 +25,18 @@ import logging import logging.handlers from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler -import glob +import glob , requests config_path = os.path.split(os.path.dirname(os.path.abspath(__file__))) config = ConfigParser(interpolation=ExtendedInterpolation()) config.read(config_path[0] + "/config.ini") -sys.path.append(config.get("COMMON", "cloud_module_path")) -from cloud import MultiCloud -cloud_init = MultiCloud() +root_path = config_path[0] +sys.path.append(root_path) + +from cloud_storage.cloud import MultiCloud +cloud_init = MultiCloud() # Date formating current_date = datetime.date.today() @@ -137,7 +139,7 @@ def melt(df: DataFrame,id_vars: Iterable[str], value_vars: Iterable[str], sc = spark.sparkContext -clientProd = MongoClient(config.get('MONGO', 'mongo_url')) +clientProd = MongoClient(config.get('MONGO', 'url')) db = clientProd[config.get('MONGO', 'database_name')] projectsCollec = db[config.get('MONGO', 'projects_collection')] @@ -149,7 +151,8 @@ def melt(df: DataFrame,id_vars: Iterable[str], value_vars: Iterable[str], "status": 1, "attachments":1, "tasks": {"attachments":1,"_id": {"$toString": "$_id"}}, - "userProfile": 1 + "userProfile": 1, + "userRoleInformation" : {"district":1,"state": 1}, } }] ) @@ -186,10 +189,62 @@ def melt(df: DataFrame,id_vars: Iterable[str], value_vars: Iterable[str], ]),True) ) ]) - ) + ), + StructField("userRoleInformation", StructType([ + StructField("district", StringType(), True), + StructField("state", StringType(), True) + ]), True), ]) +def searchEntities(url,ids_list): + try: + returnData = {} + apiSuccessFlag = False + headers = { + 'Authorization': config.get('API_HEADERS', 'authorization_access_token'), + 'content-Type': 'application/json' + } + # prepare api body + payload = json.dumps({ + "request": { + "filters": { + "id": ids_list + } + } + }) + response = requests.request("POST", url, headers=headers, data=payload) + delta_ids = [] + entity_name_mapping = {} + + if response.status_code == 200: + # convert the response to dictionary + response = response.json() + + data = response['result']['response'] + + entity_name_mapping = {} + # prepare entity name - id mapping + for index in data: + entity_name_mapping[index['id']] = index['name'] + + # fetch the ids from the mapping + ids_from_api = list(entity_name_mapping.keys()) + + # check with the input data to make sure there are no missing data from loc search + delta_ids = list(set(ids_list) - set(ids_from_api)) + apiSuccessFlag = True + else : + delta_ids = ids_list + returnData['mapping'] = entity_name_mapping + returnData['apiSuccessFlag'] = apiSuccessFlag + returnData['delta'] = delta_ids + return returnData + + except Exception as e: + errorLogger.error(e,exc_info=True) + projects_df = spark.createDataFrame(projects_cursorMongo,projects_schema) + projects_df = projects_df.withColumn( "project_evidence_status", F.when( @@ -215,59 +270,101 @@ def melt(df: DataFrame,id_vars: Iterable[str], value_vars: Iterable[str], projects_df = projects_df.withColumn( "exploded_userLocations",F.explode_outer(projects_df["userProfile"]["userLocations"]) ) + entities_df = melt(projects_df, - id_vars=["_id","exploded_userLocations.name","exploded_userLocations.type","exploded_userLocations.id"], + id_vars=["_id","exploded_userLocations.name","exploded_userLocations.type","exploded_userLocations.id","userRoleInformation.district","userRoleInformation.state"], value_vars=["exploded_userLocations.code"] - ).select("_id","name","value","type","id").dropDuplicates() -entities_df = entities_df.withColumn("variable",F.concat(F.col("type"),F.lit("_externalId"))) -entities_df = entities_df.withColumn("variable1",F.concat(F.col("type"),F.lit("_name"))) -entities_df = entities_df.withColumn("variable2",F.concat(F.col("type"),F.lit("_code"))) - -entities_df_id=entities_df.groupBy("_id").pivot("variable").agg(first("id")) + ).select("_id","name","value","type","id","district","state").dropDuplicates() -entities_df_name=entities_df.groupBy("_id").pivot("variable1").agg(first("name")) +projects_df = projects_df.join(entities_df,projects_df["_id"]==entities_df["_id"],how='left')\ + .drop(entities_df["_id"]) +projects_df = projects_df.filter(F.col("status") != "null") -entities_df_value=entities_df.groupBy("_id").pivot("variable2").agg(first("value")) +entities_df.unpersist() -entities_df_med=entities_df_id.join(entities_df_name,["_id"],how='outer') -entities_df_res=entities_df_med.join(entities_df_value,["_id"],how='outer') -entities_df_res=entities_df_res.drop('null') -projects_df = projects_df.join(entities_df_res,projects_df["_id"]==entities_df_res["_id"],how='left')\ - .drop(entities_df_res["_id"]) -projects_df = projects_df.filter(F.col("status") != "null") -entities_df.unpersist() projects_df_final = projects_df.select( projects_df["_id"].alias("project_id"), projects_df["status"], projects_df["evidence_status"], - projects_df["school_name"], - projects_df["school_externalId"], - projects_df["school_code"], - projects_df["block_name"], - projects_df["block_externalId"], - projects_df["block_code"], - projects_df["state_name"], - projects_df["state_externalId"], - projects_df["state_code"], - projects_df["district_name"], - projects_df["district_externalId"], - projects_df["district_code"] + projects_df["district"], + projects_df["state"], ) +# DataFrame for user locations values of State and Districts only +userLocations_df = melt(projects_df, + id_vars=["_id","exploded_userLocations.name","exploded_userLocations.type","exploded_userLocations.id"], + value_vars=["exploded_userLocations.code"] + ).select("_id","id","name","value","type").filter((col("type") == "state") | (col("type") == "district")).dropDuplicates() + +# Fetch only Latest Data of Locations from the DF +userLocations_df = userLocations_df.groupBy("id").agg( + first("_id", ignorenulls=True).alias("projectId"), + first("name", ignorenulls=True).alias("name"), + first("value", ignorenulls=True).alias("value"), + first("type", ignorenulls=True).alias("type") +) projects_df_final = projects_df_final.dropDuplicates() -district_final_df = projects_df_final.groupBy("state_name","district_name").agg(countDistinct(F.col("project_id")).alias("Total_Micro_Improvement_Projects"),countDistinct(when(F.col("status") == "started",True),F.col("project_id")).alias("Total_Micro_Improvement_Started"),countDistinct(when(F.col("status") == "inProgress",True),F.col("project_id")).alias("Total_Micro_Improvement_InProgress"),countDistinct(when(F.col("status") == "submitted",True),F.col("project_id")).alias("Total_Micro_Improvement_Submitted"),countDistinct(when((F.col("evidence_status") == True)&(F.col("status") == "submitted"),True),F.col("project_id")).alias("Total_Micro_Improvement_Submitted_With_Evidence")).sort("state_name","district_name") +district_final_df = projects_df_final.groupBy("state","district")\ + .agg(countDistinct(F.col("project_id")).alias("Total_Micro_Improvement_Projects"),countDistinct(when(F.col("status") == "started",True)\ + ,F.col("project_id")).alias("Total_Micro_Improvement_Started"),countDistinct(when(F.col("status") == "inProgress",True),\ + F.col("project_id")).alias("Total_Micro_Improvement_InProgress"),countDistinct(when(F.col("status") == "submitted",True),\ + F.col("project_id")).alias("Total_Micro_Improvement_Submitted"),\ + countDistinct(when((F.col("evidence_status") == True)&(F.col("status") == "submitted"),True),\ + F.col("project_id")).alias("Total_Micro_Improvement_Submitted_With_Evidence")).sort("state","district") + +# select only district ids from the Dataframe +district_to_list = projects_df_final.select("district").rdd.flatMap(lambda x: x).collect() +# select only state ids from the Dataframe +state_to_list = projects_df_final.select("state").rdd.flatMap(lambda x: x).collect() + +# merge the list of district and state ids , remove the duplicates +ids_list = list(set(district_to_list)) + list(set(state_to_list)) -state_final_df = projects_df_final.groupBy("state_name").agg(countDistinct(F.col("project_id")).alias("Total_Micro_Improvement_Projects")).sort("state_name") +# remove the None values from the list +ids_list = [value for value in ids_list if value is not None] + +# call function to get the entity from location master +response = searchEntities(config.get("API_ENDPOINTS", "base_url") + str(constants.location_search) ,ids_list) + +data_tuples = [] #empty List for creating the DF + +# if Location search API is success get the mapping details from API +if response['apiSuccessFlag']: + # Convert dictionary to list of tuples + data_tuples = list(response['mapping'].items()) + +# if any delta ids found , fetch the details from DF +if response['delta']: + delta_ids_from_response = userLocations_df.filter(col("id").isin(response['delta'])) + for row in delta_ids_from_response.collect() : + data_tuples.append((row['id'],row['name'])) + +# Define the schema for State details +state_schema = StructType([StructField("id", StringType(), True), StructField("state_name", StringType(), True)]) + +# Define the schema for District details +district_schema = StructType([StructField("id", StringType(), True), StructField("district_name", StringType(), True)]) + +# Create a DataFrame for State +state_id_mapping = spark.createDataFrame(data_tuples, schema=state_schema) + +# Create a DataFrame for District +district_id_mapping = spark.createDataFrame(data_tuples, schema=district_schema) + +# Join to get the State names from State ids +district_final_df = district_final_df.join(state_id_mapping, district_final_df["state"] == state_id_mapping["id"], "left") +# Join to get the State names from District ids +district_final_df = district_final_df.join(district_id_mapping, district_final_df["district"] == district_id_mapping["id"], "left") +# Select only relevant fields to prepare the final DF , Sort it wrt state names +final_data_to_csv = district_final_df.select("state_name","district_name","Total_Micro_Improvement_Projects","Total_Micro_Improvement_Started","Total_Micro_Improvement_InProgress","Total_Micro_Improvement_Submitted","Total_Micro_Improvement_Submitted_With_Evidence").sort("state_name","district_name") # DF To file local_path = config.get("COMMON", "nvsk_imp_projects_data_local_path") blob_path = config.get("COMMON", "nvsk_imp_projects_data_blob_path") -district_final_df.coalesce(1).write.format("csv").option("header",True).mode("overwrite").save(local_path) -district_final_df.unpersist() - - +final_data_to_csv.coalesce(1).write.format("csv").option("header",True).mode("overwrite").save(local_path) +final_data_to_csv.unpersist() # Renaming a file path = local_path extension = 'csv' @@ -275,27 +372,16 @@ def melt(df: DataFrame,id_vars: Iterable[str], value_vars: Iterable[str], result = glob.glob(f'*.{extension}') os.rename(f'{path}' + f'{result[0]}', f'{path}' + 'data.csv') - # Uploading file to Cloud -cloud_init.upload_to_cloud(blob_Path = blob_path, local_Path = local_path, file_Name = 'data.csv') - -# DF To file - State -state_local_path = config.get("COMMON", "nvsk_imp_projects_state_data_local_path") -state_blob_path = config.get("COMMON", "nvsk_imp_projects_state_data_blob_path") -state_final_df.coalesce(1).write.format("csv").option("header",True).mode("overwrite").save(state_local_path) -state_final_df.unpersist() - - -# Renaming a file - State -state_path = state_local_path -extension = 'csv' -os.chdir(state_path) -state_result = glob.glob(f'*.{extension}') -os.rename(f'{state_path}' + f'{state_result[0]}', f'{state_path}' + 'state_data.csv') - - -# Uploading file to Cloud - State -cloud_init.upload_to_cloud(blob_Path = state_blob_path, local_Path = state_local_path, file_Name = 'state_data.csv') - -print("file got uploaded to AWS") -print("DONE") +fileList = ["data.json"] + +uploadResponse = cloud_init.upload_to_cloud(filesList = fileList,folderPathName = "nvsk_imp_projects_data_blob_path", local_Path = local_path ) +successLogger.debug("cloud upload response : " + str(uploadResponse)) + +if uploadResponse['success'] == False: + errorLogger.error("Cloud Upload Failed.", exc_info=True) + errorLogger.error("Cloud Upload Response : "+ str(uploadResponse), exc_info=True) + sys.exit() +print("Cloud upload Success") +print("file got uploaded to Cloud.") +print("DONE") \ No newline at end of file