From cf0370f8604ed27c15eff954be725b16a9d9da82 Mon Sep 17 00:00:00 2001 From: yashashk Date: Wed, 15 Jan 2025 12:49:35 +0530 Subject: [PATCH] feat: #OBS-I321 : Added dataset metric service file --- .../DatasetMetricsController.ts | 21 ++++++++++--------- .../DatasetMetricsService.ts} | 16 +++++++------- 2 files changed, 19 insertions(+), 18 deletions(-) rename api-service/src/{controllers/DatasetMetrics/datasetMetricsHelper.ts => services/DatasetMetricsService.ts} (95%) diff --git a/api-service/src/controllers/DatasetMetrics/DatasetMetricsController.ts b/api-service/src/controllers/DatasetMetrics/DatasetMetricsController.ts index e26c41c3..1af73eea 100644 --- a/api-service/src/controllers/DatasetMetrics/DatasetMetricsController.ts +++ b/api-service/src/controllers/DatasetMetrics/DatasetMetricsController.ts @@ -2,28 +2,29 @@ import { Request, Response } from "express"; import * as _ from "lodash" import { ResponseHandler } from "../../helpers/ResponseHandler"; import dayjs from 'dayjs'; -import { handleConnectors, handleDataFreshness, handleDataLineage, handleDataObservability, handleDataQuality, handleDataVolume } from "./datasetMetricsHelper"; import logger from "../../logger"; import { schemaValidation } from "../../services/ValidationService"; import validationSchema from "./DatasetMetrics.json"; import { config } from "../../configs/Config"; import { datasetService } from "../../services/DatasetService"; +import { getConnectors, getDataFreshness, getDataLineage, getDataObservability, getDataQuality, getDataVolume } from "../../services/DatasetMetricsService"; const apiId = "api.dataset.metrics"; const datasetMetrics = async (req: Request, res: Response) => { const msgid = _.get(req, "body.params.msgid"); const requestBody = req.body; const dataset_id = _.get(req, "body.request.dataset_id"); + const startDateValue = _.get(req, "body.request.start_date") || config?.data_observability?.default_query_time_period; const { category, volume_by_days = 10 }: any = req.body.request; const defaultThreshold = (typeof config?.data_observability?.default_freshness_threshold === 'number' ? config?.data_observability?.default_freshness_threshold : 5) * 60 * 1000; // 5 minutes in milliseconds const dateFormat = 'YYYY-MM-DDTHH:mm:ss'; - const startDate = '2000-01-01T00:00:00+05:30'; - const endDate = dayjs().add(1, 'day').format(dateFormat); + const endDate = dayjs().format(dateFormat); + const startDate = dayjs(endDate).subtract(startDateValue, 'day').format(dateFormat); const intervals = `${startDate}/${endDate}`; const isValidSchema = schemaValidation(requestBody, validationSchema); const results = []; - + if (!isValidSchema?.isValid) { logger.error({ apiId, datasetId: dataset_id, msgid, requestBody, message: isValidSchema?.message, code: "DATA_OUT_INVALID_INPUT" }) return ResponseHandler.errorResponse({ message: isValidSchema?.message, statusCode: 400, errCode: "BAD_REQUEST", code: "DATA_OUT_INVALID_INPUT" }, req, res); @@ -37,32 +38,32 @@ const datasetMetrics = async (req: Request, res: Response) => { try { if (!category || category.includes("data_freshness")) { - const dataFreshnessResult = await handleDataFreshness(dataset_id, intervals, defaultThreshold); + const dataFreshnessResult = await getDataFreshness(dataset_id, intervals, defaultThreshold); results.push(dataFreshnessResult); } if (!category || category.includes("data_observability")) { - const dataObservabilityResult = await handleDataObservability(dataset_id, intervals); + const dataObservabilityResult = await getDataObservability(dataset_id, intervals); results.push(dataObservabilityResult); } if (!category || category.includes("data_volume")) { - const dataVolumeResult = await handleDataVolume(dataset_id, volume_by_days, dateFormat); + const dataVolumeResult = await getDataVolume(dataset_id, volume_by_days, dateFormat); results.push(dataVolumeResult); } if (!category || category.includes("data_lineage")) { - const dataLineageResult = await handleDataLineage(dataset_id, intervals); + const dataLineageResult = await getDataLineage(dataset_id, intervals); results.push(dataLineageResult); } if (!category || category.includes("connectors")) { - const connectorsResult = await handleConnectors(dataset_id, intervals); + const connectorsResult = await getConnectors(dataset_id, intervals); results.push(connectorsResult); } if (!category || category.includes("data_quality")) { - const connectorsResult = await handleDataQuality(dataset_id, intervals); + const connectorsResult = await getDataQuality(dataset_id, intervals); results.push(connectorsResult); } diff --git a/api-service/src/controllers/DatasetMetrics/datasetMetricsHelper.ts b/api-service/src/services/DatasetMetricsService.ts similarity index 95% rename from api-service/src/controllers/DatasetMetrics/datasetMetricsHelper.ts rename to api-service/src/services/DatasetMetricsService.ts index dd93ce78..2c879d64 100644 --- a/api-service/src/controllers/DatasetMetrics/datasetMetricsHelper.ts +++ b/api-service/src/services/DatasetMetricsService.ts @@ -1,14 +1,14 @@ import axios from "axios"; import dayjs from "dayjs"; import _ from "lodash"; -import { config } from "../../configs/Config"; -import { dataLineageSuccessQuery, generateConnectorQuery, generateDatasetQueryCallsQuery, generateDedupFailedQuery, generateDenormFailedQuery, generateTimeseriesQuery, generateTimeseriesQueryEventsPerHour, generateTotalQueryCallsQuery, generateTransformationFailedQuery, processingTimeQuery, totalEventsQuery, totalFailedEventsQuery } from "./queries"; +import { config } from "../configs/Config"; +import { dataLineageSuccessQuery, generateConnectorQuery, generateDatasetQueryCallsQuery, generateDedupFailedQuery, generateDenormFailedQuery, generateTimeseriesQuery, generateTimeseriesQueryEventsPerHour, generateTotalQueryCallsQuery, generateTransformationFailedQuery, processingTimeQuery, totalEventsQuery, totalFailedEventsQuery } from "../controllers/DatasetMetrics/queries"; const druidPort = _.get(config, "query_api.druid.port"); const druidHost = _.get(config, "query_api.druid.host"); const nativeQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.native_query_path}`; const prometheusEndpoint = `${config.query_api.prometheus.url}/api/v1/query_range`; -export const handleDataFreshness = async (dataset_id: string, intervals: string, defaultThreshold: number) => { +export const getDataFreshness = async (dataset_id: string, intervals: string, defaultThreshold: number) => { const queryPayload = processingTimeQuery(intervals, dataset_id); const druidResponse = await axios.post(nativeQueryEndpoint, queryPayload?.query); const avgProcessingTime = _.get(druidResponse, "data[0].average_processing_time", 0); @@ -34,7 +34,7 @@ export const handleDataFreshness = async (dataset_id: string, intervals: string, }; }; -export const handleDataObservability = async (dataset_id: string, intervals: string) => { +export const getDataObservability = async (dataset_id: string, intervals: string) => { const totalEventsPayload = totalEventsQuery(intervals, dataset_id); const totalFailedEventsPayload = totalFailedEventsQuery(intervals, dataset_id); const totalQueryCalls = generateTotalQueryCallsQuery(config?.data_observability?.data_out_query_time_period); @@ -89,7 +89,7 @@ export const handleDataObservability = async (dataset_id: string, intervals: str }; }; -export const handleDataVolume = async (dataset_id: string, volume_by_days: number, dateFormat: string) => { +export const getDataVolume = async (dataset_id: string, volume_by_days: number, dateFormat: string) => { const currentHourIntervals = dayjs().subtract(1, "hour").startOf("hour").toISOString() + "/" + dayjs().startOf("hour").toISOString(); const currentDayIntervals = dayjs().subtract(1, 'day').startOf('day').format(dateFormat) + '/' + dayjs().endOf('day').format(dateFormat); const currentWeekIntervals = dayjs().subtract(1, 'week').startOf('week').format(dateFormat) + '/' + dayjs().endOf('week').format(dateFormat); @@ -144,7 +144,7 @@ export const handleDataVolume = async (dataset_id: string, volume_by_days: numbe }; }; -export const handleDataLineage = async (dataset_id: string, intervals: string) => { +export const getDataLineage = async (dataset_id: string, intervals: string) => { const transformationSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "transformer_status", "success"); const dedupSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "dedup_status", "success"); const denormSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "denorm_status", "success"); @@ -196,7 +196,7 @@ export const handleDataLineage = async (dataset_id: string, intervals: string) = }; -export const handleConnectors = async (dataset_id: string, intervals: string) => { +export const getConnectors = async (dataset_id: string, intervals: string) => { const connectorQueryPayload = generateConnectorQuery(intervals, dataset_id); const connectorResponse = await axios.post(nativeQueryEndpoint, connectorQueryPayload); const connectorsData = _.get(connectorResponse, "data[0].result", []); @@ -215,7 +215,7 @@ export const handleConnectors = async (dataset_id: string, intervals: string) => }; }; -export const handleDataQuality = async (dataset_id: string, intervals: string) => { +export const getDataQuality = async (dataset_id: string, intervals: string) => { const totalValidationPayload = dataLineageSuccessQuery(intervals, dataset_id, "ctx_dataset", dataset_id); const totalValidationFailedPayload = dataLineageSuccessQuery(intervals, dataset_id, "error_pdata_status", "failed"); const [totalValidationResponse, totalValidationFailedResponse,