diff --git a/api-service/package.json b/api-service/package.json index bf0a08fd..e80d65f7 100644 --- a/api-service/package.json +++ b/api-service/package.json @@ -42,6 +42,7 @@ "busboy": "^1.6.0", "compression": "^1.7.4", "dateformat": "2.0.0", + "dayjs": "^1.11.13", "express": "^5.0.0-beta.3", "http-errors": "^2.0.0", "http-status": "^1.5.3", diff --git a/api-service/src/configs/Config.ts b/api-service/src/configs/Config.ts index a2aa79a1..702cc13f 100644 --- a/api-service/src/configs/Config.ts +++ b/api-service/src/configs/Config.ts @@ -119,5 +119,10 @@ export const config = { "enable": process.env.otel_enable || "false", "collector_endpoint": process.env.otel_collector_endpoint || "http://localhost:4318" }, - "storage_types": process.env.storage_types || '{"lake_house":true,"realtime_store":true}' + "storage_types": process.env.storage_types || '{"lake_house":true,"realtime_store":true}', + "data_observability": { + "default_freshness_threshold": process.env.default_freshness_threshold ? parseInt(process.env.default_freshness_threshold) : 5, // in minutes + "data_out_query_time_period": process.env.data_out_query_time_period || "1d", + "default_query_time_period": process.env.default_query_time_period ? parseInt(process.env.default_query_time_period) : 7, // in days + } } diff --git a/api-service/src/controllers/DataExhaust/DataExhaustController.ts b/api-service/src/controllers/DataExhaust/DataExhaustController.ts index ceeb3358..858cf233 100644 --- a/api-service/src/controllers/DataExhaust/DataExhaustController.ts +++ b/api-service/src/controllers/DataExhaust/DataExhaustController.ts @@ -11,7 +11,7 @@ import { cloudProvider } from "../../services/CloudServices"; export const dataExhaust = async (req: Request, res: Response) => { const { params } = req; - const { datasetId } = params; + const { dataset_id } = params; const { type }: any = req.query; const momentFormat = "YYYY-MM-DD"; @@ -28,12 +28,12 @@ export const dataExhaust = async (req: Request, res: Response) => { return resData || {}; } - if (type && config.cloud_config.exclude_exhaust_types.includes(datasetId)) { + if (type && config.cloud_config.exclude_exhaust_types.includes(dataset_id)) { return ResponseHandler.errorResponse({ statusCode: 404, message: "Record not found", errCode: httpStatus["404_NAME"] }, req, res) } - const datasetRecord = await verifyDatasetExists(datasetId); + const datasetRecord = await verifyDatasetExists(dataset_id); if (datasetRecord === null) { - logger.error(`Dataset with ${datasetId} not found in live table`) + logger.error(`Dataset with ${dataset_id} not found in live table`) return ResponseHandler.errorResponse({ statusCode: 404, message: "Record not found", errCode: httpStatus["404_NAME"] }, req, res) } const dateRange = getDateRange(req); @@ -47,7 +47,7 @@ export const dataExhaust = async (req: Request, res: Response) => { return ResponseHandler.errorResponse({ statusCode: 400, message: `Invalid date range! make sure your range cannot be more than ${config.cloud_config.maxQueryDateRange} days`, errCode: "BAD_REQUEST" }, req, res) } - const resData: any = await getFromStorage(type, dateRange, datasetId); + const resData: any = await getFromStorage(type, dateRange, dataset_id); if (_.isEmpty(resData.files)) { logger.error("Date range provided does not have any backup files") return ResponseHandler.errorResponse({ statusCode: 404, message: "Date range provided does not have any backup files", errCode: "NOT_FOUND" }, req, res); diff --git a/api-service/src/controllers/DataIngestion/DataIngestionController.ts b/api-service/src/controllers/DataIngestion/DataIngestionController.ts index 5044bb43..39090dda 100644 --- a/api-service/src/controllers/DataIngestion/DataIngestionController.ts +++ b/api-service/src/controllers/DataIngestion/DataIngestionController.ts @@ -27,7 +27,7 @@ const apiId = "api.data.in"; const dataIn = async (req: Request, res: Response) => { const requestBody = req.body; - const datasetId = req.params.datasetId.trim(); + const datasetId = req.params.dataset_id.trim(); const isValidSchema = schemaValidation(requestBody, validationSchema) if (!isValidSchema?.isValid) { diff --git a/api-service/src/controllers/DataMetrics/DataMetricsController.ts b/api-service/src/controllers/DataMetrics/DataMetricsController.ts new file mode 100644 index 00000000..98314878 --- /dev/null +++ b/api-service/src/controllers/DataMetrics/DataMetricsController.ts @@ -0,0 +1,38 @@ +import { Request, Response } from "express"; +import _ from "lodash"; +import { executeNativeQuery } from "../../connections/druidConnection"; +import { ResponseHandler } from "../../helpers/ResponseHandler"; +import vaidationSchema from "./DataMetricsValidationSchema.json" +import { schemaValidation } from "../../services/ValidationService"; +import logger from "../../logger"; +import { obsrvError } from "../../types/ObsrvError"; +import axios from "axios"; +import { config } from "../../configs/Config"; + +const getBaseUrl = (url: string) => { + if (_.startsWith(url, "/prom")) return config.query_api.prometheus.url + _.replace(url, "/prom", "") +} + +const dataMetrics = async (req: Request, res: Response) => { + const isValidSchema = schemaValidation(req.body, vaidationSchema); + if (!isValidSchema?.isValid) { + logger.error({ message: isValidSchema?.message, code: "INVALID_QUERY" }) + throw obsrvError("", "INVALID_QUERY", isValidSchema.message, "BAD_REQUEST", 400) + } + const { query } = req.body || {}; + const endpoint = query.url; + if (_.startsWith(endpoint, "/prom")) { + query.url = getBaseUrl(endpoint) + const { url, method, headers = {}, body = {}, params = {}, ...rest } = query; + const apiResponse = await axios.request({ url, method, headers, params, data: body, ...rest }) + const data = _.get(apiResponse, "data"); + return res.json(data); + } + else { + const query = _.get(req, ["body", "query", "body", "query"]); + const response = await executeNativeQuery(query); + ResponseHandler.successResponse(req, res, { status: 200, data: _.get(response, "data") }); + } +} + +export default dataMetrics; \ No newline at end of file diff --git a/api-service/src/controllers/DatasetMetrics/DatasetMetricsValidationSchema.json b/api-service/src/controllers/DataMetrics/DataMetricsValidationSchema.json similarity index 100% rename from api-service/src/controllers/DatasetMetrics/DatasetMetricsValidationSchema.json rename to api-service/src/controllers/DataMetrics/DataMetricsValidationSchema.json diff --git a/api-service/src/controllers/DataOut/DataOutController.ts b/api-service/src/controllers/DataOut/DataOutController.ts index dd6f93b6..fa5eee51 100644 --- a/api-service/src/controllers/DataOut/DataOutController.ts +++ b/api-service/src/controllers/DataOut/DataOutController.ts @@ -9,7 +9,7 @@ import { executeNativeQuery, executeSqlQuery } from "../../connections/druidConn export const apiId = "api.data.out"; const dataOut = async (req: Request, res: Response) => { - const datasetId = req.params?.datasetId; + const datasetId = req.params?.dataset_id; const requestBody = req.body; const msgid = _.get(req, "body.params.msgid"); const isValidSchema = schemaValidation(requestBody, validationSchema); diff --git a/api-service/src/controllers/DatasetMetrics/DatasetMetrics.json b/api-service/src/controllers/DatasetMetrics/DatasetMetrics.json new file mode 100644 index 00000000..70f14029 --- /dev/null +++ b/api-service/src/controllers/DatasetMetrics/DatasetMetrics.json @@ -0,0 +1,61 @@ +{ + "type": "object", + "properties": { + "id": { + "type": "string", + "enum": [ + "api.dataset.metrics" + ] + }, + "ver": { + "type": "string" + }, + "ts": { + "type": "string" + }, + "params": { + "type": "object", + "properties": { + "msgid": { + "type": "string" + } + }, + "required": [ + "msgid" + ], + "additionalProperties": false + }, + "request": { + "type": "object", + "properties": { + "dataset_id": { + "type": "string" + }, + "category": { + "type": "array", + "items": { + "type": "string", + "enum": [ + "data_freshness", + "data_observability", + "data_volume", + "data_lineage", + "connectors", + "data_quality" + ] + }, + "minItems": 1 + }, + "query_time_period":{ + "type": "integer", + "minimum": 1 + } + }, + "required": [ + "category", + "dataset_id" + ] + } + }, + "required": ["id", "ver", "ts", "params", "request"] +} \ No newline at end of file diff --git a/api-service/src/controllers/DatasetMetrics/DatasetMetricsController.ts b/api-service/src/controllers/DatasetMetrics/DatasetMetricsController.ts index 903d393c..f84d0b43 100644 --- a/api-service/src/controllers/DatasetMetrics/DatasetMetricsController.ts +++ b/api-service/src/controllers/DatasetMetrics/DatasetMetricsController.ts @@ -1,38 +1,81 @@ import { Request, Response } from "express"; -import _ from "lodash"; -import { executeNativeQuery } from "../../connections/druidConnection"; +import * as _ from "lodash" import { ResponseHandler } from "../../helpers/ResponseHandler"; -import vaidationSchema from "./DatasetMetricsValidationSchema.json" -import { schemaValidation } from "../../services/ValidationService"; +import dayjs from 'dayjs'; import logger from "../../logger"; -import { obsrvError } from "../../types/ObsrvError"; -import axios from "axios"; +import { schemaValidation } from "../../services/ValidationService"; +import validationSchema from "./DatasetMetrics.json"; import { config } from "../../configs/Config"; +import { datasetService } from "../../services/DatasetService"; +import { getConnectors, getDataFreshness, getDataLineage, getDataObservability, getDataQuality, getDataVolume } from "../../services/DatasetMetricsService"; -const getBaseUrl = (url: string) => { - if (_.startsWith(url, "/prom")) return config.query_api.prometheus.url + _.replace(url, "/prom", "") -} - +const apiId = "api.dataset.metrics"; const datasetMetrics = async (req: Request, res: Response) => { - const isValidSchema = schemaValidation(req.body, vaidationSchema); + const msgid = _.get(req, "body.params.msgid"); + const requestBody = req.body; + const dataset_id = _.get(req, "body.request.dataset_id"); + const timePeriod = _.get(req, "body.request.query_time_period") || config?.data_observability?.default_query_time_period; + + const { category }: any = req.body.request; + const defaultThreshold = (typeof config?.data_observability?.default_freshness_threshold === 'number' ? config?.data_observability?.default_freshness_threshold : 5) * 60 * 1000; // 5 minutes in milliseconds + const dateFormat = 'YYYY-MM-DDTHH:mm:ss'; + const endDate = dayjs().add(1, 'day').format(dateFormat); + const startDate = dayjs(endDate).subtract(timePeriod, 'day').format(dateFormat); + const intervals = `${startDate}/${endDate}`; + const isValidSchema = schemaValidation(requestBody, validationSchema); + const results = []; + if (!isValidSchema?.isValid) { - logger.error({ message: isValidSchema?.message, code: "INVALID_QUERY" }) - throw obsrvError("", "INVALID_QUERY", isValidSchema.message, "BAD_REQUEST", 400) + logger.error({ apiId, datasetId: dataset_id, msgid, requestBody, message: isValidSchema?.message, code: "DATA_OUT_INVALID_INPUT" }) + return ResponseHandler.errorResponse({ message: isValidSchema?.message, statusCode: 400, errCode: "BAD_REQUEST", code: "DATA_OUT_INVALID_INPUT" }, req, res); } - const { query } = req.body || {}; - const endpoint = query.url; - if (_.startsWith(endpoint, "/prom")) { - query.url = getBaseUrl(endpoint) - const { url, method, headers = {}, body = {}, params = {}, ...rest } = query; - const apiResponse = await axios.request({ url, method, headers, params, data: body, ...rest }) - const data = _.get(apiResponse, "data"); - return res.json(data); + + const dataset = await datasetService.getDataset(dataset_id, ["id"], true) + if (!dataset) { + logger.error({ apiId, message: `Dataset with id ${dataset_id} not found in live table`, code: "DATASET_NOT_FOUND" }) + return ResponseHandler.errorResponse({ message: `Dataset with id ${dataset_id} not found in live table`, code: "DATASET_NOT_FOUND", statusCode: 404, errCode: "NOT_FOUND" }, req, res); } - else { - const query = _.get(req, ["body", "query", "body", "query"]); - const response = await executeNativeQuery(query); - ResponseHandler.successResponse(req, res, { status: 200, data: _.get(response, "data") }); + + try { + if (!category || category.includes("data_freshness")) { + const dataFreshnessResult = await getDataFreshness(dataset_id, intervals, defaultThreshold); + results.push(dataFreshnessResult); + } + + if (!category || category.includes("data_observability")) { + const dataObservabilityResult = await getDataObservability(dataset_id, intervals); + results.push(dataObservabilityResult); + } + + if (!category || category.includes("data_volume")) { + const dataVolumeResult = await getDataVolume(dataset_id, timePeriod, dateFormat); + results.push(dataVolumeResult); + } + + if (!category || category.includes("data_lineage")) { + const dataLineageResult = await getDataLineage(dataset_id, intervals); + results.push(dataLineageResult); + } + + if (!category || category.includes("connectors")) { + const connectorsResult = await getConnectors(dataset_id, intervals); + results.push(connectorsResult); + } + + if (!category || category.includes("data_quality")) { + const connectorsResult = await getDataQuality(dataset_id, intervals); + results.push(connectorsResult); + } + + logger.info({ apiId, msgid, requestBody, datasetId: dataset_id, message: "Metrics fetched successfully" }) + return ResponseHandler.successResponse(req, res, { status: 200, data: results }); + } + catch (error: any) { + logger.error({ apiId, msgid, requestBody: req?.body, datasetId: dataset_id, message: "Error while fetching metrics", code: "FAILED_TO_FETCH_METRICS", error }); + return ResponseHandler.errorResponse({ message: "Error while fetching metrics", statusCode: 500, errCode: "FAILED", code: "FAILED_TO_FETCH_METRICS" }, req, res); + } + } export default datasetMetrics; \ No newline at end of file diff --git a/api-service/src/controllers/DatasetMetrics/queries.ts b/api-service/src/controllers/DatasetMetrics/queries.ts new file mode 100644 index 00000000..ae02e1ac --- /dev/null +++ b/api-service/src/controllers/DatasetMetrics/queries.ts @@ -0,0 +1,402 @@ +import dayjs from "dayjs"; + +export const processingTimeQuery = (intervals: string, dataset_id: string) => ({ + query: { + queryType: "groupBy", + dataSource: "system-events", + intervals: intervals, + granularity: { + type: "all", + timeZone: "UTC" + }, + filter: { + type: "and", + fields: [ + { type: "selector", dimension: "ctx_module", value: "processing" }, + { type: "selector", dimension: "ctx_dataset", value: dataset_id }, + { type: "selector", dimension: "ctx_pdata_pid", value: "router" }, + { type: "selector", dimension: "error_code", value: null } + ] + }, + aggregations: [ + { type: "longSum", name: "processing_time", fieldName: "total_processing_time" }, + { type: "longSum", name: "count", fieldName: "count" } + ], + postAggregations: [ + { + type: "expression", + name: "average_processing_time", + expression: "case_searched((count > 0),(processing_time/count),0)" + } + ] + } +}); + +export const totalEventsQuery = (intervals: string, dataset_id: string) => ({ + queryType: "timeseries", + dataSource: { + type: "table", + name: "system-events" + }, + intervals: { + type: "intervals", + intervals: [intervals] + }, + filter: { + type: "equals", + column: "ctx_dataset", + matchValueType: "STRING", + matchValue: dataset_id + }, + granularity: { + type: "all", + timeZone: "UTC" + }, + aggregations: [ + { + type: "longSum", + name: "total_events_count", + fieldName: "count" + } + ] +}); + +export const totalFailedEventsQuery = (intervals: string, dataset_id: string) => ({ + queryType: "timeseries", + dataSource: { + type: "table", + name: "system-events" + }, + intervals: { + type: "intervals", + intervals: [intervals] + }, + filter: { + type: "equals", + column: "ctx_dataset", + matchValueType: "STRING", + matchValue: dataset_id + }, + granularity: { + type: "all", + timeZone: "UTC" + }, + aggregations: [ + { + type: "filtered", + aggregator: { + type: "longSum", + name: "total_failed_events", + fieldName: "count" + }, + filter: { + type: "and", + fields: [ + { + type: "equals", + column: "ctx_pdata_pid", + matchValueType: "STRING", + matchValue: "validator" + }, + { + type: "equals", + column: "error_pdata_status", + matchValueType: "STRING", + matchValue: "failed" + } + ] + }, + name: "total_failed_events" + } + ] +}); + +export const generateTimeseriesQuery = (intervals: string, dataset_id: string) => ({ + queryType: "timeseries", + dataSource: "system-events", + intervals: intervals, + granularity: { + type: "all", + timeZone: "UTC" + }, + filter: { + type: "and", + fields: [ + { type: "selector", dimension: "ctx_module", value: "processing" }, + { type: "selector", dimension: "ctx_dataset", value: dataset_id }, + { type: "selector", dimension: "ctx_pdata_pid", value: "router" }, + { type: "selector", dimension: "error_code", value: null } + ] + }, + aggregations: [ + { type: "longSum", name: "count", fieldName: "count" } + ] +}); + +export const generateTimeseriesQueryEventsPerHour = (intervals: string, dataset_id: string) => ({ + queryType: "timeseries", + dataSource: "system-events", + intervals: intervals, + granularity: { + type: "all", + timeZone: "UTC" + }, + filter: { + type: "and", + fields: [ + { type: "selector", dimension: "ctx_module", value: "processing" }, + { type: "selector", dimension: "ctx_dataset", value: dataset_id }, + { type: "selector", dimension: "ctx_pdata_pid", value: "router" }, + { type: "selector", dimension: "error_code", value: null } + ] + }, + aggregations: [ + { type: "longSum", name: "count", fieldName: "count" } + ] +}); + +export const dataLineageSuccessQuery = (intervals: string, dataset_id: string, column: string, value: string) => ({ + queryType: "timeseries", + dataSource: { + type: "table", + name: "system-events" + }, + intervals: { + type: "intervals", + intervals: [intervals] + }, + filter: { + type: "and", + fields: [ + { + type: "equals", + column: column, + matchValueType: "STRING", + matchValue: value + }, + { + type: "equals", + column: "ctx_dataset", + matchValueType: "STRING", + matchValue: dataset_id + } + ] + }, + granularity: { + type: "all", + timeZone: "UTC" + }, + aggregations: [ + { + type: "longSum", + name: "count", + fieldName: "count" + } + ] +}); + +export const generateTransformationFailedQuery = (intervals: string, dataset_id: string) => ({ + queryType: "timeseries", + dataSource: { + type: "table", + name: "system-events" + }, + intervals: { + type: "intervals", + intervals: [intervals] + }, + filter: { + type: "equals", + column: "ctx_dataset", + matchValueType: "STRING", + matchValue: dataset_id + }, + granularity: { + type: "all", + timeZone: "UTC" + }, + aggregations: [ + { + type: "filtered", + aggregator: { + type: "longSum", + name: "count", + fieldName: "count" + }, + filter: { + type: "and", + fields: [ + { + type: "equals", + column: "ctx_pdata_id", + matchValueType: "STRING", + matchValue: "TransformerJob" + }, + { + type: "equals", + column: "error_pdata_status", + matchValueType: "STRING", + matchValue: "failed" + } + ] + }, + name: "count" + } + ] +}); + +export const generateDedupFailedQuery = (intervals: string, dataset_id: string) => ({ + queryType: "timeseries", + dataSource: { + type: "table", + name: "system-events" + }, + intervals: { + type: "intervals", + intervals: [intervals] + }, + filter: { + type: "equals", + column: "ctx_dataset", + matchValueType: "STRING", + matchValue: dataset_id + }, + granularity: { + type: "all", + timeZone: "UTC" + }, + aggregations: [ + { + type: "filtered", + aggregator: { + type: "longSum", + name: "count", + fieldName: "count" + }, + filter: { + type: "and", + fields: [ + { + type: "equals", + column: "ctx_pdata_pid", + matchValueType: "STRING", + matchValue: "dedup" + }, + { + type: "equals", + column: "error_type", + matchValueType: "STRING", + matchValue: "DedupFailed" + } + ] + }, + name: "count" + } + ] +}); + +export const generateDenormFailedQuery = (intervals: string, dataset_id: string) => ({ + queryType: "timeseries", + dataSource: { + type: "table", + name: "system-events" + }, + intervals: { + type: "intervals", + intervals: [intervals] + }, + filter: { + type: "equals", + column: "ctx_dataset", + matchValueType: "STRING", + matchValue: dataset_id + }, + granularity: { + type: "all", + timeZone: "UTC" + }, + aggregations: [ + { + type: "filtered", + aggregator: { + type: "longSum", + name: "count", + fieldName: "count" + }, + filter: { + type: "and", + fields: [ + { + type: "equals", + column: "ctx_pdata_pid", + matchValueType: "STRING", + matchValue: "denorm" + }, + { + type: "equals", + column: "error_type", + matchValueType: "STRING", + matchValue: "DenormDataNotFound" + } + ] + }, + name: "count" + } + ] +}); + +export const generateConnectorQuery = (intervals: string, dataset_id: string) => ({ + queryType: "topN", + dataSource: { + type: "table", + name: "system-events" + }, + dimension: { + type: "default", + dimension: "ctx_source_connector", + outputName: "name", + outputType: "STRING" + }, + metric: { + type: "dimension", + ordering: { + type: "lexicographic" + } + }, + threshold: 1001, + intervals: { + type: "intervals", + intervals: [intervals] + }, + filter: { + type: "equals", + column: "ctx_dataset", + matchValueType: "STRING", + matchValue: dataset_id + }, + granularity: { + type: "all", + timeZone: "UTC" + }, + aggregations: [ + { + type: "longSum", + name: "count", + fieldName: "count" + } + ] +}); + +export const generateTotalQueryCallsQuery = (time_period: string) => ({ + end: dayjs().unix(), + query: `sum(sum_over_time(node_total_api_calls{entity="data-out"}[${time_period}]))`, + step: `${time_period}`, + start: dayjs().subtract(1, 'day').unix() +}); + +export const generateDatasetQueryCallsQuery = (dataset: string, time_period: string) => ({ + end: dayjs().unix(), + step: `${time_period}`, + query: `sum(sum_over_time(node_total_api_calls{dataset_id="${dataset}",entity="data-out"}[${time_period}]))`, + start: dayjs().subtract(1, 'day').unix(), +}); \ No newline at end of file diff --git a/api-service/src/controllers/DatasetReset/DatasetReset.ts b/api-service/src/controllers/DatasetReset/DatasetReset.ts index 68b272f1..cc5aef3c 100644 --- a/api-service/src/controllers/DatasetReset/DatasetReset.ts +++ b/api-service/src/controllers/DatasetReset/DatasetReset.ts @@ -18,7 +18,7 @@ const validateRequest = async (req: Request) => { if (!isRequestValid.isValid) { throw obsrvError("", "DATASET_INVALID_INPUT", isRequestValid.message, "BAD_REQUEST", 400) } - const datasetId = _.get(req, ["params", "datasetId"]) + const datasetId = _.get(req, ["params", "dataset_id"]) const isDataSetExists = await datasetService.checkDatasetExists(datasetId); if (!isDataSetExists) { throw obsrvError(datasetId, "DATASET_NOT_FOUND", `Dataset not exists with id:${datasetId}`, httpStatus[httpStatus.NOT_FOUND], 404) @@ -28,7 +28,7 @@ const validateRequest = async (req: Request) => { const datasetReset = async (req: Request, res: Response) => { const category = _.get(req, ["body", "request", "category"]); - const datasetId = _.get(req, ["params"," datasetId"]); + const datasetId = _.get(req, ["params"," dataset_id"]); await validateRequest(req); if (category == "processing") { diff --git a/api-service/src/middlewares/userPermissions.json b/api-service/src/middlewares/userPermissions.json index eda444c4..88393ba8 100644 --- a/api-service/src/middlewares/userPermissions.json +++ b/api-service/src/middlewares/userPermissions.json @@ -13,7 +13,8 @@ "api.alert.silence.list", "api.alert.silence.get", "api.alert.notification.list", - "api.alert.notification.get" + "api.alert.notification.get", + "api.dataset.metrics" ], "restricted_dataset_api": [ "api.datasets.reset", diff --git a/api-service/src/routes/Router.ts b/api-service/src/routes/Router.ts index 1e39d942..60930f05 100644 --- a/api-service/src/routes/Router.ts +++ b/api-service/src/routes/Router.ts @@ -29,19 +29,20 @@ import ConnectorsRead from "../controllers/ConnectorsRead/ConnectorsRead"; import DatasetImport from "../controllers/DatasetImport/DatasetImport"; import { OperationType, telemetryAuditStart } from "../services/telemetry"; import telemetryActions from "../telemetry/telemetryActions"; -import datasetMetrics from "../controllers/DatasetMetrics/DatasetMetricsController"; import checkRBAC from "../middlewares/RBAC_middleware"; import connectorRegisterController from "../controllers/ConnectorRegister/ConnectorRegisterController"; +import dataMetrics from "../controllers/DataMetrics/DataMetricsController"; +import datasetMetrics from "../controllers/DatasetMetrics/DatasetMetricsController"; export const router = express.Router(); -router.post("/data/in/:datasetId", setDataToRequestObject("api.data.in"), onRequest({ entity: Entity.Data_in }), telemetryAuditStart({action: telemetryActions.createDataset, operationType: OperationType.CREATE}), checkRBAC.handler(), dataIn); -router.post("/data/query/:datasetId", setDataToRequestObject("api.data.out"), onRequest({ entity: Entity.Data_out }), checkRBAC.handler(), dataOut); +router.post("/data/in/:dataset_id", setDataToRequestObject("api.data.in"), onRequest({ entity: Entity.Data_in }), telemetryAuditStart({action: telemetryActions.createDataset, operationType: OperationType.CREATE}), checkRBAC.handler(), dataIn); +router.post("/data/query/:dataset_id", setDataToRequestObject("api.data.out"), onRequest({ entity: Entity.Data_out }), checkRBAC.handler(), dataOut); router.post("/datasets/create", setDataToRequestObject("api.datasets.create"), onRequest({ entity: Entity.Management }),telemetryAuditStart({action: telemetryActions.createDataset, operationType: OperationType.CREATE}), checkRBAC.handler(),DatasetCreate) router.patch("/datasets/update", setDataToRequestObject("api.datasets.update"), onRequest({ entity: Entity.Management }),telemetryAuditStart({action: telemetryActions.updateDataset, operationType: OperationType.UPDATE}), checkRBAC.handler(), DatasetUpdate) router.get("/datasets/read/:dataset_id", setDataToRequestObject("api.datasets.read"), onRequest({ entity: Entity.Management }), telemetryAuditStart({action: telemetryActions.readDataset, operationType: OperationType.GET}), checkRBAC.handler(), DatasetRead) router.post("/datasets/list", setDataToRequestObject("api.datasets.list"), onRequest({ entity: Entity.Management }), telemetryAuditStart({action: telemetryActions.listDatasets, operationType: OperationType.LIST}), checkRBAC.handler(), DatasetList) -router.get("/data/exhaust/:datasetId", setDataToRequestObject("api.data.exhaust"), onRequest({ entity: Entity.Management }), telemetryAuditStart({action: telemetryActions.datasetExhaust, operationType: OperationType.GET}), checkRBAC.handler(), dataExhaust); +router.get("/data/exhaust/:dataset_id", setDataToRequestObject("api.data.exhaust"), onRequest({ entity: Entity.Management }), telemetryAuditStart({action: telemetryActions.datasetExhaust, operationType: OperationType.GET}), checkRBAC.handler(), dataExhaust); router.post("/template/create", setDataToRequestObject("api.query.template.create"), checkRBAC.handler(), createQueryTemplate); router.get("/template/read/:templateId", setDataToRequestObject("api.query.template.read"), checkRBAC.handler(), readQueryTemplate); router.delete("/template/delete/:templateId", setDataToRequestObject("api.query.template.delete"), checkRBAC.handler(), deleteQueryTemplate); @@ -52,7 +53,7 @@ router.post("/template/query/:templateId", setDataToRequestObject("api.query.tem router.post("/files/generate-url", setDataToRequestObject("api.files.generate-url"), onRequest({ entity: Entity.Management }), checkRBAC.handler(), GenerateSignedURL); router.post("/datasets/status-transition", setDataToRequestObject("api.datasets.status-transition"), onRequest({ entity: Entity.Management }), telemetryAuditStart({action: telemetryActions.createTransformation, operationType: OperationType.CREATE}), checkRBAC.handler(), DatasetStatusTansition); router.post("/datasets/health", setDataToRequestObject("api.dataset.health"), onRequest({ entity: Entity.Management }), checkRBAC.handler(), datasetHealth); -router.post("/datasets/reset/:datasetId", setDataToRequestObject("api.dataset.reset"), onRequest({ entity: Entity.Management }), checkRBAC.handler(), datasetReset); +router.post("/datasets/reset/:dataset_id", setDataToRequestObject("api.dataset.reset"), onRequest({ entity: Entity.Management }), checkRBAC.handler(), datasetReset); router.post("/datasets/dataschema", setDataToRequestObject("api.datasets.dataschema"), onRequest({ entity: Entity.Management }), checkRBAC.handler(), DataSchemaGenerator); router.get("/datasets/export/:dataset_id", setDataToRequestObject("api.datasets.export"), onRequest({ entity: Entity.Management }), checkRBAC.handler(), DatasetExport); router.post("/datasets/copy", setDataToRequestObject("api.datasets.copy"), onRequest({ entity: Entity.Management }), telemetryAuditStart({action: telemetryActions.copyDataset, operationType: OperationType.CREATE}), checkRBAC.handler(), DatasetCopy); @@ -62,5 +63,5 @@ router.post("/datasets/import", setDataToRequestObject("api.datasets.import"), o router.post("/connector/register", setDataToRequestObject("api.connector.register"), onRequest({ entity: Entity.Management }), connectorRegisterController); //Wrapper Service router.post("/obsrv/data/sql-query", setDataToRequestObject("api.obsrv.data.sql-query"), onRequest({ entity: Entity.Data_out }), checkRBAC.handler(), sqlQuery); -router.post("/data/metrics", setDataToRequestObject("api.data.metrics"), onRequest({ entity: Entity.Data_out }), datasetMetrics) - +router.post("/data/metrics", setDataToRequestObject("api.data.metrics"), onRequest({ entity: Entity.Data_out }), dataMetrics) +router.post("/dataset/metrics", setDataToRequestObject("api.dataset.metrics"), onRequest({ entity: Entity.Management }), checkRBAC.handler(), datasetMetrics); diff --git a/api-service/src/services/DatasetMetricsService.ts b/api-service/src/services/DatasetMetricsService.ts new file mode 100644 index 00000000..2c879d64 --- /dev/null +++ b/api-service/src/services/DatasetMetricsService.ts @@ -0,0 +1,238 @@ +import axios from "axios"; +import dayjs from "dayjs"; +import _ from "lodash"; +import { config } from "../configs/Config"; +import { dataLineageSuccessQuery, generateConnectorQuery, generateDatasetQueryCallsQuery, generateDedupFailedQuery, generateDenormFailedQuery, generateTimeseriesQuery, generateTimeseriesQueryEventsPerHour, generateTotalQueryCallsQuery, generateTransformationFailedQuery, processingTimeQuery, totalEventsQuery, totalFailedEventsQuery } from "../controllers/DatasetMetrics/queries"; +const druidPort = _.get(config, "query_api.druid.port"); +const druidHost = _.get(config, "query_api.druid.host"); +const nativeQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.native_query_path}`; +const prometheusEndpoint = `${config.query_api.prometheus.url}/api/v1/query_range`; + +export const getDataFreshness = async (dataset_id: string, intervals: string, defaultThreshold: number) => { + const queryPayload = processingTimeQuery(intervals, dataset_id); + const druidResponse = await axios.post(nativeQueryEndpoint, queryPayload?.query); + const avgProcessingTime = _.get(druidResponse, "data[0].average_processing_time", 0); + const freshnessStatus = avgProcessingTime < defaultThreshold ? "Healthy" : "Unhealthy"; + + return { + category: "data_freshness", + status: freshnessStatus, + components: [ + { + type: "average_time_difference_in_min", + threshold: defaultThreshold / 60000, // convert to minutes + value: avgProcessingTime / 60000, + status: freshnessStatus + }, + { + type: "freshness_query_time_in_min", + threshold: 10, + value: avgProcessingTime / 60000, // convert to minutes + status: freshnessStatus + } + ] + }; +}; + +export const getDataObservability = async (dataset_id: string, intervals: string) => { + const totalEventsPayload = totalEventsQuery(intervals, dataset_id); + const totalFailedEventsPayload = totalFailedEventsQuery(intervals, dataset_id); + const totalQueryCalls = generateTotalQueryCallsQuery(config?.data_observability?.data_out_query_time_period); + const totalQueryCallsAtDatasetLevel = generateDatasetQueryCallsQuery(dataset_id, config?.data_observability?.data_out_query_time_period); + + const [totalEventsResponse, totalFailedEventsResponse, totalApiCallsResponse, totalCallsAtDatasetLevelResponse] = await Promise.all([ + axios.post(nativeQueryEndpoint, totalEventsPayload), + axios.post(nativeQueryEndpoint, totalFailedEventsPayload), + axios.request({ url: prometheusEndpoint, method: "GET", params: totalQueryCalls }), + axios.request({ url: prometheusEndpoint, method: "GET", params: totalQueryCallsAtDatasetLevel }) + ]); + + const totalApiCalls = _.map(_.get(totalApiCallsResponse, 'data.data.result'), payload => { + return _.floor(_.get(payload, 'values[0][1]'), 3) || 0 + }) + + const totalApiCallsAtDatasetLevel = _.map(_.get(totalCallsAtDatasetLevelResponse, 'data.data.result'), payload => { + return _.floor(_.get(payload, 'values[0][1]'), 3) || 0 + }) + + const importanceScore = (totalApiCallsAtDatasetLevel[0] / totalApiCalls[0]) * 100; + + const totalEventsCount = _.get(totalEventsResponse, "data[0].result.total_events_count") || 0; + const totalFailedEventsCount = _.get(totalFailedEventsResponse, "data[0].result.total_failed_events") || 0; + let failurePercentage = 0; + if (totalEventsCount > 0) { + failurePercentage = (totalFailedEventsCount / totalEventsCount) * 100; + } + const observabilityStatus = failurePercentage > 5 ? "Unhealthy" : "Healthy"; + + return { + category: "data_observability", + status: observabilityStatus, + components: [ + { + type: "data_observability_health", + status: observabilityStatus + }, + { + type: "failure_percentage", + value: failurePercentage + }, + { + type: "threshold_percentage", + value: 5 + }, + { + type: "importance_score", + value: importanceScore || 0 + } + ] + }; +}; + +export const getDataVolume = async (dataset_id: string, volume_by_days: number, dateFormat: string) => { + const currentHourIntervals = dayjs().subtract(1, "hour").startOf("hour").toISOString() + "/" + dayjs().startOf("hour").toISOString(); + const currentDayIntervals = dayjs().subtract(1, 'day').startOf('day').format(dateFormat) + '/' + dayjs().endOf('day').format(dateFormat); + const currentWeekIntervals = dayjs().subtract(1, 'week').startOf('week').format(dateFormat) + '/' + dayjs().endOf('week').format(dateFormat); + const previousHourIntervals = dayjs().subtract(2, "hour").startOf("hour").toISOString() + '/' + dayjs().startOf("hour").toISOString(); + const previousDayIntervals = dayjs().subtract(2, 'day').startOf('day').format(dateFormat) + '/' + dayjs().subtract(1, 'day').endOf('day').format(dateFormat); + const previousWeekIntervals = dayjs().subtract(2, 'week').startOf('week').format(dateFormat) + '/' + dayjs().subtract(1, 'week').endOf('week').format(dateFormat); + const nDaysIntervals = dayjs().subtract(volume_by_days, 'day').startOf('day').format(dateFormat) + '/' + dayjs().endOf('day').format(dateFormat); + + const currentHourPayload = generateTimeseriesQueryEventsPerHour(currentHourIntervals, dataset_id); + const currentDayPayload = generateTimeseriesQuery(currentDayIntervals, dataset_id); + const currentWeekPayload = generateTimeseriesQuery(currentWeekIntervals, dataset_id); + const previousHourPayload = generateTimeseriesQueryEventsPerHour(previousHourIntervals, dataset_id); + const previousDayPayload = generateTimeseriesQuery(previousDayIntervals, dataset_id); + const previousWeekPayload = generateTimeseriesQuery(previousWeekIntervals, dataset_id); + const nDaysPayload = generateTimeseriesQuery(nDaysIntervals, dataset_id); + const [ + currentHourResponse, currentDayResponse, currentWeekResponse, + previousHourResponse, previousDayResponse, previousWeekResponse, + nDaysResponse + ] = await Promise.all([ + axios.post(nativeQueryEndpoint, currentHourPayload), + axios.post(nativeQueryEndpoint, currentDayPayload), + axios.post(nativeQueryEndpoint, currentWeekPayload), + axios.post(nativeQueryEndpoint, previousHourPayload), + axios.post(nativeQueryEndpoint, previousDayPayload), + axios.post(nativeQueryEndpoint, previousWeekPayload), + axios.post(nativeQueryEndpoint, nDaysPayload) + ]); + const currentHourCount = _.get(currentHourResponse, "data[0].result.count") || 0; + const currentDayCount = _.get(currentDayResponse, "data[0].result.count") || 0; + const currentWeekCount = _.get(currentWeekResponse, "data[0].result.count") || 0; + const previousHourCount = _.get(previousHourResponse, "data[0].result.count") || 0; + const previousDayCount = _.get(previousDayResponse, "data[0].result.count") || 0; + const previousWeekCount = _.get(previousWeekResponse, "data[0].result.count") || 0; + const nDaysCount = _.get(nDaysResponse, "data[0].result.count") || 0; + + const volumePercentageByHour = previousHourCount > 0 ? ((currentHourCount - previousHourCount) / previousHourCount) * 100 : 0; + const volumePercentageByDay = previousDayCount > 0 ? ((currentDayCount - previousDayCount) / previousDayCount) * 100 : 0; + const volumePercentageByWeek = previousWeekCount > 0 ? ((currentWeekCount - previousWeekCount) / previousWeekCount) * 100 : 0; + + return { + category: "data_volume", + components: [ + { type: "events_per_hour", value: currentHourCount }, + { type: "events_per_day", value: currentDayCount }, + { type: "events_per_n_day", value: nDaysCount }, + { type: "volume_percentage_by_hour", value: volumePercentageByHour }, + { type: "volume_percentage_by_day", value: volumePercentageByDay }, + { type: "volume_percentage_by_week", value: volumePercentageByWeek }, + { type: "growth_rate_percentage", value: volumePercentageByHour } + ] + }; +}; + +export const getDataLineage = async (dataset_id: string, intervals: string) => { + const transformationSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "transformer_status", "success"); + const dedupSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "dedup_status", "success"); + const denormSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "denorm_status", "success"); + const totalValidationPayload = dataLineageSuccessQuery(intervals, dataset_id, "ctx_dataset", dataset_id); + const totalValidationFailedPayload = dataLineageSuccessQuery(intervals, dataset_id, "error_pdata_status", "failed"); + const transformationFailedPayload = generateTransformationFailedQuery(intervals, dataset_id); + const dedupFailedPayload = generateDedupFailedQuery(intervals, dataset_id); + const denormFailedPayload = generateDenormFailedQuery(intervals, dataset_id); + + const [ + transformationSuccessResponse, dedupSuccessResponse, denormSuccessResponse, + totalValidationResponse, totalValidationFailedResponse, transformationFailedResponse, dedupFailedResponse, denormFailedResponse + ] = await Promise.all([ + axios.post(nativeQueryEndpoint, transformationSuccessPayload), + axios.post(nativeQueryEndpoint, dedupSuccessPayload), + axios.post(nativeQueryEndpoint, denormSuccessPayload), + axios.post(nativeQueryEndpoint, totalValidationPayload), + axios.post(nativeQueryEndpoint, totalValidationFailedPayload), + axios.post(nativeQueryEndpoint, transformationFailedPayload), + axios.post(nativeQueryEndpoint, dedupFailedPayload), + axios.post(nativeQueryEndpoint, denormFailedPayload) + ]); + + // success at each level + const transformationSuccessCount = _.get(transformationSuccessResponse, "data[0].result.count") || 0; + const dedupSuccessCount = _.get(dedupSuccessResponse, "data[0].result.count") || 0; + const denormSuccessCount = _.get(denormSuccessResponse, "data[0].result.count") || 0; + const totalValidationCount = _.get(totalValidationResponse, "data[0].result.count") || 0; + const totalValidationFailedCount = _.get(totalValidationFailedResponse, "data[0].result.count") || 0; + const storageSuccessCount = totalValidationCount - totalValidationFailedCount; + + // failure at each level + const transformationFailedCount = _.get(transformationFailedResponse, "data[0].result.count") || 0; + const dedupFailedCount = _.get(dedupFailedResponse, "data[0].result.count") || 0; + const denormFailedCount = _.get(denormFailedResponse, "data[0].result.count") || 0; + return { + category: "data_lineage", + components: [ + { type: "transformation_success", value: transformationSuccessCount }, + { type: "dedup_success", value: dedupSuccessCount }, + { type: "denormalization_success", value: denormSuccessCount }, + { type: "total_success", value: storageSuccessCount }, + { type: "total_failed", value: totalValidationFailedCount }, + { type: "transformation_failed", value: transformationFailedCount }, + { type: "dedup_failed", value: dedupFailedCount }, + { type: "denorm_failed", value: denormFailedCount } + ] + }; +}; + + +export const getConnectors = async (dataset_id: string, intervals: string) => { + const connectorQueryPayload = generateConnectorQuery(intervals, dataset_id); + const connectorResponse = await axios.post(nativeQueryEndpoint, connectorQueryPayload); + const connectorsData = _.get(connectorResponse, "data[0].result", []); + const result = { + category: "connectors", + components: connectorsData.map((item: any) => ({ + id: item.name || "failed", + type: item.name === null ? "failed" : "success", + value: item.count + })) + }; + + return { + category: "connectors", + components: result + }; +}; + +export const getDataQuality = async (dataset_id: string, intervals: string) => { + const totalValidationPayload = dataLineageSuccessQuery(intervals, dataset_id, "ctx_dataset", dataset_id); + const totalValidationFailedPayload = dataLineageSuccessQuery(intervals, dataset_id, "error_pdata_status", "failed"); + const [totalValidationResponse, totalValidationFailedResponse, + ] = await Promise.all([ + axios.post(nativeQueryEndpoint, totalValidationPayload), + axios.post(nativeQueryEndpoint, totalValidationFailedPayload), + ]); + const totalValidationCount = _.get(totalValidationResponse, "data[0].result.count") || 0; + const totalValidationFailedCount = _.get(totalValidationFailedResponse, "data[0].result.count") || 0; + const storageSuccessCount = totalValidationCount - totalValidationFailedCount; + + return { + category: "data_quality", + "components": [ + { type: "incidents_failed", value: totalValidationFailedCount }, + { type: "incidents_success", value: storageSuccessCount }, + { type: "total_incidents", value: totalValidationCount } + ] + }; +} \ No newline at end of file